Source code for aiowamp.client.invocation

"""Provides invocation classes."""

from __future__ import annotations

import abc
import contextlib
import datetime
import logging
import time
from typing import Generic, Optional, Tuple, TypeVar, Union

import aiowamp
from aiowamp import InvocationError, URI, set_invocation_error
from aiowamp.args_mixin import ArgsMixin
from aiowamp.msg import Error as ErrorMsg, Invocation as InvocationMsg, Yield as YieldMsg
from aiowamp.uri import INVALID_ARGUMENT
from .enum import CANCEL_KILL_NO_WAIT

__all__ = ["InvocationABC", "Invocation",
           "InvocationResult", "InvocationProgress"]

log = logging.getLogger(__name__)

ClientT = TypeVar("ClientT", bound="aiowamp.ClientABC")


[docs]class InvocationABC(ArgsMixin, abc.ABC, Generic[ClientT]): """Invocation context passed to a procedure.""" __slots__ = ()
[docs] def __str__(self) -> str: return f"{type(self).__qualname__} {self.request_id}"
@property @abc.abstractmethod def client(self) -> ClientT: """Underlying client that received the invocation.""" ... @property @abc.abstractmethod def request_id(self) -> int: """ID of the invocation.""" ... @property @abc.abstractmethod def registered_procedure(self) -> aiowamp.URI: """URI that was used to register the procedure. See `.procedure` for the uri that was called. """ ... @property def procedure(self) -> aiowamp.URI: """Concrete procedure that caused the invocation. This will be the same as `registered_procedure` unless the procedure was registered with a pattern-based matching policy. """ try: return URI(self.details["procedure"]) except KeyError: return self.registered_procedure @property @abc.abstractmethod def args(self) -> Tuple[aiowamp.WAMPType, ...]: """Call arguments.""" ... @property @abc.abstractmethod def kwargs(self) -> aiowamp.WAMPDict: """Call keyword arguments.""" ... @property @abc.abstractmethod def details(self) -> aiowamp.WAMPDict: """Additional call details.""" ... @property def may_send_progress(self) -> bool: """Whether or not the caller is willing to receive progressive results.""" try: return bool(self.details["receive_progress"]) except KeyError: return False @property def caller_id(self) -> Optional[int]: """Get the caller's id. You can specify the "disclose_caller" option when registering a procedure to force disclosure. Returns: WAMP id of the caller, or `None` if not specified. """ return self.details.get("caller") @property def trust_level(self) -> Optional[int]: """Get the router assigned trust level. The trust level 0 means lowest trust, and higher integers represent (application-defined) higher levels of trust. Returns: The trust level, or `None` if not specified. """ return self.details.get("trustlevel") @property @abc.abstractmethod def timeout(self) -> float: """Timeout in seconds. `0` if no timeout was provided which means the call doesn't time out. """ ... @property @abc.abstractmethod def timeout_at(self) -> Optional[datetime.datetime]: """Time at which the invocation will be cancelled. `None` if the call doesn't time out. Trying to send a message (result, progress, error) after the call is cancelled will raise an exception. Use `.done` to check whether the invocation can send messages. """ ... @property @abc.abstractmethod def done(self) -> bool: """Whether the invocation is done.""" ... @property @abc.abstractmethod def interrupt(self) -> Optional[aiowamp.Interrupt]: """Interrupt message sent by the caller. `None` if the message hasn't been interrupted. """ ...
[docs] @abc.abstractmethod async def send_progress(self, *args: aiowamp.WAMPType, kwargs: aiowamp.WAMPDict = None, options: aiowamp.WAMPDict = None) -> None: """Send a progress result. Use `.may_send_progress` to check whether the caller is willing to receive progress results. Args: *args: Arguments for the result. kwargs: Keyword arguments for the result. options: Additional options to send. """ ...
[docs] @abc.abstractmethod async def send_result(self, *args: aiowamp.WAMPType, kwargs: aiowamp.WAMPDict = None, options: aiowamp.WAMPDict = None) -> None: """Send the final result. This completes the invocation. Args: *args: Arguments for the result. kwargs: Keyword arguments for the result. options: Additional options to send. """ ...
[docs] @abc.abstractmethod async def send_error(self, error: str, *args: aiowamp.WAMPType, kwargs: aiowamp.WAMPDict = None, details: aiowamp.WAMPDict = None) -> None: """Send an error. This completes the invocation. Args: error: URI of the error. *args: Arguments for the result. kwargs: Keyword arguments for the result. details: Additional details to send. """ ...
@abc.abstractmethod def _cancel(self) -> None: """Cancel the invocation.""" ... @abc.abstractmethod async def _receive_interrupt(self, interrupt: aiowamp.Interrupt) -> None: """Receive an interrupt from the caller. Args: interrupt: Interrupt error that was received. """ ...
[docs]class Invocation(InvocationABC[ClientT], Generic[ClientT]): __slots__ = ("session", "__client", "__timeout", "__timeout_at", "__done", "__interrupt", "__procedure", "__request_id", "__args", "__kwargs", "__details") session: aiowamp.SessionABC """Session used to send messages.""" __client: ClientT __done: bool __interrupt: Optional[aiowamp.Interrupt] __procedure: aiowamp.URI __request_id: int __args: Tuple[aiowamp.WAMPType, ...] __kwargs: aiowamp.WAMPDict __details: aiowamp.WAMPDict
[docs] def __init__(self, session: aiowamp.SessionABC, client: ClientT, msg: aiowamp.msg.Invocation, *, procedure: aiowamp.URI) -> None: """Create a new invocation instance. Normally you should not create these yourself, they don't actively listen for incoming messages. Instead, they rely on the `aiowamp.ClientABC` to receive and pass them. Args: session: WAMP Session to send messages in. msg: Invocation message that spawned the invocation. procedure: Registered procedure URI. """ self.session = session self.__client = client self.__done = False self.__interrupt = None self.__procedure = procedure self.__request_id = msg.request_id self.__args = tuple(msg.args) if msg.args else () self.__kwargs = msg.kwargs or {} self.__details = msg.details try: self.__timeout = self.__details["timeout"] / 1e3 except KeyError: self.__timeout = 0 # the default is 0 which means no timeout if self.__timeout: ts = time.time() + self.__timeout self.__timeout_at = datetime.datetime.utcfromtimestamp(ts) else: self.__timeout_at = None
[docs] def __getitem__(self, key: Union[int, str]) -> aiowamp.WAMPType: try: return super().__getitem__(key) except LookupError as e: if isinstance(key, int): msg = f"expected {key + 1} arguments, got {len(self)}" else: msg = f"missing keyword argument {key!r}" set_invocation_error(e, InvocationError( INVALID_ARGUMENT, msg, kwargs={"key": key}, )) raise e from None
@property def client(self) -> ClientT: return self.__client @property def request_id(self) -> int: return self.__request_id @property def registered_procedure(self) -> aiowamp.URI: return self.__procedure @property def args(self) -> Tuple[aiowamp.WAMPType, ...]: return self.__args @property def kwargs(self) -> aiowamp.WAMPDict: return self.__kwargs @property def details(self) -> aiowamp.WAMPDict: return self.__details @property def timeout(self) -> float: return self.__timeout @property def timeout_at(self) -> Optional[datetime.datetime]: return self.__timeout_at @property def done(self) -> bool: return self.__done @property def interrupt(self) -> Optional[aiowamp.Interrupt]: return self.__interrupt def __assert_not_done(self) -> None: if self.__done: raise RuntimeError(f"{self}: already completed") def __mark_done(self) -> None: self.__assert_not_done() self.__done = True
[docs] async def send_progress(self, *args: aiowamp.WAMPType, kwargs: aiowamp.WAMPDict = None, options: aiowamp.WAMPDict = None) -> None: self.__assert_not_done() if not self.may_send_progress: raise RuntimeError(f"{self}: caller is unwilling to receive progress") # must not send progress when interrupted. if self.__interrupt: raise self.__interrupt options = options or {} options["progress"] = True await self.session.send(YieldMsg( self.__request_id, options, list(args) or None, kwargs, ))
[docs] async def send_result(self, *args: aiowamp.WAMPType, kwargs: aiowamp.WAMPDict = None, options: aiowamp.WAMPDict = None) -> None: self.__mark_done() if options: # make sure we're not accidentally sending a result with progress=True with contextlib.suppress(KeyError): del options["progress"] else: options = {} await self.session.send(YieldMsg( self.__request_id, options, list(args) or None, kwargs, ))
[docs] async def send_error(self, error: str, *args: aiowamp.WAMPType, kwargs: aiowamp.WAMPDict = None, details: aiowamp.WAMPDict = None) -> None: self.__mark_done() await self.session.send(ErrorMsg( InvocationMsg.message_type, self.__request_id, details or {}, error, list(args) or None, kwargs, ))
def _cancel(self) -> None: self.__done = True async def _receive_interrupt(self, interrupt: aiowamp.Interrupt) -> None: self.__interrupt = interrupt # if the cancel mode is killnowait the caller won't accept a response. if interrupt.cancel_mode == CANCEL_KILL_NO_WAIT: self._cancel()
[docs]class InvocationResult(ArgsMixin): """Helper class for procedures. Use this to return/yield a result from a `aiowamp.InvocationHandler` containing keyword arguments. """ __slots__ = ("args", "kwargs", "details") args: Tuple[aiowamp.WAMPType, ...] """Arguments.""" kwargs: aiowamp.WAMPDict """Keyword arguments.""" details: aiowamp.WAMPDict """Details."""
[docs] def __init__(self, *args: aiowamp.WAMPType, **kwargs: aiowamp.WAMPType) -> None: self.args = args self.kwargs = kwargs self.details = {}
[docs]class InvocationProgress(InvocationResult): """Helper class for procedures. Instances of this class can be yielded by procedures to indicate that it it's intended to be sent as a progress. Usually, because there's no way to tell whether an async generator has yielded for the last time, aiowamp waits for the next yield before sending a progress result (i.e. it always lags behind one message). When returning an instance of this class however, aiowamp will send it immediately. It is possible to abuse this by returning an instance of this class for the final yield. This is not supported by the WAMP protocol and currently results in aiowamp sending an empty final result. """ __slots__ = ()