from __future__ import annotations
import abc
import asyncio
import logging
from typing import Any, Callable, Dict, List, Optional, Set, Awaitable
import aiowamp
from .maybe_awaitable import MaybeAwaitable, call_async_fn_background
from .message import message_as_type
from .msg import Goodbye as GoodbyeMsg
from .uri import CLOSE_NORMAL, GOODBYE_AND_OUT
__all__ = ["MessageHandler",
"SessionABC", "Session"]
log = logging.getLogger(__name__)
MessageHandler = Callable[["aiowamp.MessageABC"], MaybeAwaitable[Any]]
[docs]class SessionABC(abc.ABC):
"""Abstract session type.
A Session is a transient conversation between two Peers attached to a Realm
and running over a Transport.
"""
__slots__ = ()
[docs] def __str__(self) -> str:
return f"{type(self).__qualname__} {self.session_id}"
@property
@abc.abstractmethod
def session_id(self) -> int:
"""Session ID."""
...
@property
@abc.abstractmethod
def realm(self) -> str:
"""Name of the realm the session is attached to."""
...
@property
@abc.abstractmethod
def details(self) -> aiowamp.WAMPDict:
...
@property
@abc.abstractmethod
def goodbye(self) -> Optional[aiowamp.msg.Goodbye]:
"""Goodbye message sent by the remote."""
...
@property
@abc.abstractmethod
def roles(self) -> Set[str]:
...
@abc.abstractmethod
async def close(self, details: aiowamp.WAMPDict = None, *,
reason: aiowamp.URI = None) -> None:
...
[docs] @abc.abstractmethod
async def send(self, msg: aiowamp.MessageABC) -> None:
"""Send a message using the underlying transport."""
...
@abc.abstractmethod
async def wait_until_done(self) -> Optional[aiowamp.msg.Goodbye]:
...
@abc.abstractmethod
def add_message_handler(self, handler: aiowamp.MessageHandler) -> None:
...
@abc.abstractmethod
def remove_message_handler(self, handler: aiowamp.MessageHandler = None) -> None:
...
@abc.abstractmethod
def get_features(self, role: str) -> Set[str]:
...
def has_role(self, role: str) -> bool:
return role in self.roles
def has_feature(self, role: str, feature: str) -> bool:
return feature in self.get_features(role)
[docs]class Session(SessionABC):
__slots__ = ("transport",
"control_transport",
"__session_id", "__realm", "__details",
"__roles",
"__goodbye_fut",
"__msg_handlers", "__receive_task")
transport: aiowamp.TransportABC
"""Transport used by the session."""
control_transport: bool
"""Whether the session controls the underlying transport.
If this is `True`, the session will close the transport when it
"""
__session_id: int
__realm: str
__details: aiowamp.WAMPDict
__roles: Dict[str, Set[str]]
__goodbye_fut: Optional[asyncio.Future]
__msg_handlers: List["aiowamp.MessageHandler"]
__receive_task: Optional[asyncio.Task]
[docs] def __init__(self, transport: aiowamp.TransportABC, session_id: int, realm: str, details: aiowamp.WAMPDict, *,
control_transport: bool = True) -> None:
if not transport.open:
raise RuntimeError(f"transport {transport} must be open")
self.transport = transport
self.control_transport = control_transport
self.__session_id = session_id
self.__realm = realm
self.__details = details
self.__roles = get_role_map(details)
self.__goodbye_fut = None
self.__msg_handlers = []
self.__receive_task = None
[docs] def __repr__(self) -> str:
return f"{type(self).__qualname__}({self.transport}, " \
f"session_id={self.session_id!r}, realm={self.realm!r}, " \
f"details={self.details!r})"
@property
def session_id(self) -> int:
return self.__session_id
@property
def realm(self) -> str:
return self.__realm
@property
def details(self) -> aiowamp.WAMPDict:
return self.__details
@property
def goodbye(self) -> Optional[aiowamp.msg.Goodbye]:
try:
return self.__goodbye_fut.result()
except Exception:
return None
@property
def roles(self) -> Set[str]:
return set(self.__roles.keys())
def __receive_loop_running(self) -> bool:
return bool(self.__receive_task and not self.__receive_task.done())
def start(self) -> None:
if self.__receive_loop_running():
raise RuntimeError("receive loop already running.")
self.__receive_task = asyncio.create_task(self.__receive_loop())
def __get_goodbye_fut(self) -> asyncio.Future:
if not self.__goodbye_fut:
loop = asyncio.get_running_loop()
self.__goodbye_fut = loop.create_future()
return self.__goodbye_fut
async def __handle_goodbye(self, goodbye: aiowamp.msg.Goodbye) -> None:
# remote initiated goodbye
if not self.__goodbye_fut:
if goodbye.reason == GOODBYE_AND_OUT:
log.warning(f"received {goodbye} confirmation before closing.")
await self.send(GoodbyeMsg(
{},
GOODBYE_AND_OUT,
))
self.__get_goodbye_fut().set_result(goodbye)
async def __receive_loop(self) -> None:
log.debug("%s: starting receive loop", self)
while True:
msg = await self.transport.recv()
goodbye = message_as_type(msg, GoodbyeMsg)
if goodbye:
await self.__handle_goodbye(goodbye)
break
# goodbye messages are not sent to handlers
for handler in self.__msg_handlers:
call_async_fn_background(handler, "message handler raised an exception",
msg)
log.debug("%s: exiting receive loop", self)
[docs] async def send(self, msg: aiowamp.MessageABC) -> None:
await self.transport.send(msg)
async def close(self, details: aiowamp.WAMPDict = None, *,
reason: aiowamp.URI = None) -> None:
_ = self.__get_goodbye_fut()
await self.send(GoodbyeMsg(
details or {},
reason or CLOSE_NORMAL,
))
try:
await self.wait_until_done()
except Exception:
log.exception("receive loop raised exception")
async def wait_until_done(self) -> Optional[aiowamp.msg.Goodbye]:
if not self.__receive_loop_running():
self.start()
try:
await self.__receive_task
finally:
if self.control_transport:
await self.transport.close()
return self.goodbye
def add_message_handler(self, handler: aiowamp.MessageHandler) -> None:
if handler in self.__msg_handlers:
raise ValueError(f"{handler} already exists")
self.__msg_handlers.append(handler)
if not self.__receive_loop_running():
self.start()
def remove_message_handler(self, handler: aiowamp.MessageHandler = None) -> None:
if handler is None:
self.__msg_handlers.clear()
else:
try:
self.__msg_handlers.remove(handler)
except ValueError:
raise ValueError(f"handler {handler} doesn't exist") from None
def has_role(self, role: str) -> bool:
# faster than `role in set(self.__roles)`
return role in self.__roles
def get_features(self, role: str) -> Set[str]:
try:
return self.__roles[role]
except KeyError:
return set()
def get_role_map(details: aiowamp.WAMPDict) -> Dict[str, Set[str]]:
try:
roles = details["roles"]
except KeyError:
return {}
if not isinstance(roles, dict):
return {}
role_map: Dict[str, Set[str]] = {}
for role, role_dict in roles.items():
feature_set: Set[str] = set()
role_map[role] = feature_set
try:
features = role_dict["features"]
except KeyError:
continue
if not isinstance(features, dict):
continue
for feature, supported in features.items():
if supported:
feature_set.add(feature)
return role_map