Source code for aiowamp.client.event

"""Provides subscription event classes."""

from __future__ import annotations

import abc
from typing import Generic, Optional, Tuple, TypeVar

import aiowamp
from aiowamp import URI
from aiowamp.args_mixin import ArgsMixin

__all__ = ["SubscriptionEventABC", "SubscriptionEvent"]

ClientT = TypeVar("ClientT", bound="aiowamp.ClientABC")


[docs]class SubscriptionEventABC(ArgsMixin, abc.ABC, Generic[ClientT]): """Subscription event.""" __slots__ = ()
[docs] def __str__(self) -> str: return f"{type(self).__qualname__} {self.publication_id}"
@property @abc.abstractmethod def client(self) -> ClientT: """Underlying client that received the event.""" ... @property @abc.abstractmethod def publication_id(self) -> int: """ID of the publication.""" ... @property @abc.abstractmethod def subscribed_topic(self) -> aiowamp.URI: """URI of the subscription. This is the uri that was passed to the subscribe method. See `.topic` for the uri that the event was published to. """ ... @property def topic(self) -> aiowamp.URI: """Concrete topic that caused the event. This will be the same as `subscribed_topic` unless the handler subscribed with a pattern-based matching policy. """ try: return URI(self.details["topic"]) except KeyError: return self.subscribed_topic @property @abc.abstractmethod def args(self) -> Tuple[aiowamp.WAMPType, ...]: """Event arguments.""" ... @property @abc.abstractmethod def kwargs(self) -> aiowamp.WAMPDict: """Event keyword arguments.""" ... @property @abc.abstractmethod def details(self) -> aiowamp.WAMPDict: """Additional event details.""" ... @property def publisher_id(self) -> Optional[int]: """Get the publisher's id. Returns: WAMP id of the caller, or `None` if not disclosed. """ return self.details.get("caller") @property def trust_level(self) -> Optional[int]: """Get the router assigned trust level. The trust level 0 means lowest trust, and higher integers represent (application-defined) higher levels of trust. Returns: The trust level, or `None` if not specified. """ return self.details.get("trustlevel")
[docs] @abc.abstractmethod async def unsubscribe(self) -> None: """Unsubscribe from the topic.""" ...
[docs]class SubscriptionEvent(SubscriptionEventABC[ClientT], Generic[ClientT]): __slots__ = ("__client", "__topic", "__publication_id", "__args", "__kwargs", "__details") __client: ClientT __topic: aiowamp.URI __publication_id: int __args: Tuple[aiowamp.WAMPType, ...] __kwargs: aiowamp.WAMPDict __details: aiowamp.WAMPDict
[docs] def __init__(self, client: ClientT, msg: aiowamp.msg.Event, *, topic: aiowamp.URI) -> None: """Create a new SubscriptionEven instance. There shouldn't be a need to create these yourself, unless you're creating your own `aiowamp.ClientABC`. Unlike `aiowamp.Invocation` it doesn't require to be managed though. Args: client: Client used to unsubscribe. msg: Event message. topic: Registered topic URI. """ self.__client = client self.__topic = topic self.__publication_id = msg.publication_id self.__args = tuple(msg.args) if msg.args else () self.__kwargs = msg.kwargs or {} self.__details = msg.details
@property def client(self) -> ClientT: return self.__client @property def publication_id(self) -> int: return self.__publication_id @property def subscribed_topic(self) -> aiowamp.URI: return self.__topic @property def args(self) -> Tuple[aiowamp.WAMPType, ...]: return self.__args @property def kwargs(self) -> aiowamp.WAMPDict: return self.__kwargs @property def details(self) -> aiowamp.WAMPDict: return self.__details
[docs] async def unsubscribe(self) -> None: await self.__client.unsubscribe(self.__topic)