Source code for aiowamp.transport
from __future__ import annotations
import abc
import dataclasses
import ssl
import urllib.parse as urlparse
from typing import Awaitable, Callable, Dict, Optional
import aiowamp
__all__ = ["TransportABC",
"SerializerABC",
"CommonTransportConfig", "TransportFactory",
"register_transport_factory",
"get_transport_factory", "connect_transport"]
[docs]class TransportABC(abc.ABC):
"""Abstract transport type.
A Transport connects two WAMP Peers and provides a channel over which WAMP
messages for a WAMP Session can flow in both directions.
WAMP can run over any Transport which is message-based, bidirectional,
reliable and ordered.
"""
__slots__ = ()
[docs] def __str__(self) -> str:
return f"{type(self).__qualname__} {id(self):x}"
@property
@abc.abstractmethod
def open(self) -> bool:
"""Whether the transport is open and usable."""
...
[docs] @abc.abstractmethod
async def close(self) -> None:
"""Close the transport."""
...
[docs] @abc.abstractmethod
async def send(self, msg: aiowamp.MessageABC) -> None:
"""Send a message.
Args:
msg: Message to send.
"""
...
[docs] @abc.abstractmethod
async def recv(self) -> aiowamp.MessageABC:
"""Receive a message.
Returns:
Received message.
"""
...
[docs]class SerializerABC(abc.ABC):
"""Serializer for messages.
Normally an `aiowamp.TransportABC` keeps a serializer which it uses.
Deliberately doesn't adhere to Python's dump / load interface because the
objects in question are always `aiowamp.MessageABC` and the serialized
format is always `bytes`.
"""
__slots__ = ()
[docs] def __str__(self) -> str:
return f"{type(self).__qualname__} {id(self):x}"
[docs] @abc.abstractmethod
def serialize(self, msg: aiowamp.MessageABC) -> bytes:
"""Serialise a message.
Args:
msg: Message to serialise.
Returns:
`bytes` representation of the message.
Raises:
Exception: If the message couldn't be serialised.
"""
...
[docs] @abc.abstractmethod
def deserialize(self, data: bytes) -> aiowamp.MessageABC:
"""Deserialise the `bytes` representation of a message.
Args:
data: `bytes` representation of a message.
Returns:
Deserialised message.
Raises:
aiowamp.InvalidMessage: If the deserialised data isn't a message.
Exception: If the message couldn't be deserialised.
"""
...
[docs]@dataclasses.dataclass()
class CommonTransportConfig:
"""Transport configuration used by transport factories."""
url: urlparse.ParseResult
"""URL to connect to."""
serializer: Optional["aiowamp.SerializerABC"] = None
"""Serializer to use.
If `None` a sensible default will be used.
"""
ssl_context: Optional[ssl.SSLContext] = None
"""SSL options.
If `None`, TLS will still be used if the `.url` specifies a secure scheme
(ex: "wss").
"""
TransportFactory = Callable[[CommonTransportConfig], Awaitable[TransportABC]]
"""Callable creating a transport from a config."""
SCHEME_TRANSPORT_MAP: Dict[str, TransportFactory] = {}
[docs]def register_transport_factory(*schemes: str, overwrite: bool = False):
"""Decorator for registering transport factories.
Registered transport factories can be retrieved by
`aiowamp.get_transport_factory`.
Args:
*schemes: Schemes to register to the given transport factory.
overwrite: Whether to overwrite existing registrations for a scheme.
Defaults to `False`.
Raises:
ValueError: If a scheme is already registered to another factory and
overwrite is `False`.
"""
def decorator(fn: TransportFactory):
for scheme in schemes:
if not overwrite and scheme in SCHEME_TRANSPORT_MAP:
raise ValueError(f"cannot register scheme {scheme!r} for {fn!r}. "
f"Already registered by {SCHEME_TRANSPORT_MAP[scheme]!r}")
SCHEME_TRANSPORT_MAP[scheme] = fn
return fn
return decorator
[docs]def get_transport_factory(scheme: str) -> TransportFactory:
"""Get the registered transport factory for the scheme.
Args:
scheme: URL scheme to get factory for.
Returns:
A transport factory callable.
Raises:
KeyError: If no transport is registered for the given scheme.
"""
try:
return SCHEME_TRANSPORT_MAP[scheme]
except KeyError:
raise KeyError(f"no transport registered for scheme {scheme!r}") from None
[docs]async def connect_transport(config: CommonTransportConfig) -> TransportABC:
"""Connect a transport based on the config.
Args:
config: Config to create the transport with.
Returns:
A connected transport.
Raises:
KeyError: If no transport factory could be found for the config's url.
Exception: If the connection fails.
Notes:
Uses `aiowamp.get_transport_factory` to get the transport factory from
which the transport is created.
"""
connect = get_transport_factory(config.url.scheme)
return await connect(config)