Source code for aiowamp.errors

from __future__ import annotations

import dataclasses
import logging
from typing import Callable, Dict, Optional, Type

import aiowamp
from .args_mixin import ArgsMixin
from .uri import RUNTIME_ERROR, URI
from .uri_map import URIMap

__all__ = ["BaseError",
           "TransportError",
           "AbortError", "AuthError",
           "InvalidMessage", "UnexpectedMessageError",
           "ErrorResponse",
           "ClientClosed",
           "Interrupt",
           "register_error_response", "error_to_exception",
           "InvocationError", "exception_to_invocation_error",
           "set_invocation_error"]

log = logging.getLogger(__name__)


[docs]class BaseError(Exception): """Base exception for all WAMP related errors. You will most likely never encounter this error directly, but its subclasses. """ __slots__ = ()
[docs]class TransportError(BaseError): """Transport level error.""" __slots__ = ()
[docs]class AbortError(BaseError): """Join abort error.""" __slots__ = ("reason", "details") reason: str details: aiowamp.WAMPDict
[docs] def __init__(self, msg: aiowamp.msg.Abort) -> None: self.reason = msg.reason self.details = msg.details
[docs] def __str__(self) -> str: return f"{self.reason} (details = {self.details})"
[docs]class AuthError(BaseError): __slots__ = ()
[docs]class InvalidMessage(BaseError): """Exception for invalid messages.""" __slots__ = ()
[docs]@dataclasses.dataclass() class UnexpectedMessageError(InvalidMessage): """Exception raised when an unexpected message type is received.""" __slots__ = ("received", "expected") received: aiowamp.MessageABC """Message that was received.""" expected: Type["aiowamp.MessageABC"] """Message type that was expected."""
[docs] def __str__(self) -> str: return f"received message {self.received!r} but expected message of type {self.expected.__qualname__}"
[docs]class ErrorResponse(BaseError, ArgsMixin): __slots__ = ("message", "uri", "args", "kwargs", "details") message: aiowamp.msg.Error """Error message.""" uri: aiowamp.URI details: aiowamp.WAMPDict
[docs] def __init__(self, message: aiowamp.msg.Error): self.message = message self.uri = message.error self.args = tuple(message.args) if message.args else () self.kwargs = message.kwargs or {} self.details = message.details
[docs] def __repr__(self) -> str: return f"{type(self).__qualname__}({self.message!r})"
[docs] def __str__(self) -> str: s = f"{self.uri}" args_str = ", ".join(map(repr, self.args)) if args_str: s += f" {args_str}" kwargs_str = ", ".join(f"{k}={v!r}" for k, v in self.kwargs.items()) if kwargs_str: s += f" ({kwargs_str})" return s
ErrorFactory = Callable[["aiowamp.msg.Error"], Exception] """Callable creating an exception from a WAMP error message.""" ERR_RESP_MAP: URIMap[ErrorFactory] = URIMap() EXC_URI_MAP: Dict[Type[Exception], "aiowamp.URI"] = {} def register_error_response(uri: str, *, match_policy: aiowamp.MatchPolicy = None): if match_policy is not None: uri = URI(uri, match_policy=match_policy) else: uri = URI.cast(uri) def decorator(cls: ErrorFactory): if not callable(cls): raise TypeError("error factory must be callable") ERR_RESP_MAP[uri] = cls return cls return decorator def get_exception_factory(uri: str) -> ErrorFactory: return ERR_RESP_MAP[uri] def error_to_exception(message: aiowamp.msg.Error) -> Exception: try: return get_exception_factory(message.error)(message) except LookupError: return ErrorResponse(message) def get_exception_uri(exc: Type[Exception]) -> aiowamp.URI: return EXC_URI_MAP[exc]
[docs]class InvocationError(BaseError): __slots__ = ("uri", "args", "kwargs", "details") uri: aiowamp.URI args: Optional["aiowamp.WAMPList"] kwargs: Optional["aiowamp.WAMPDict"] details: Optional["aiowamp.WAMPDict"]
[docs] def __init__(self, uri: str, *args: aiowamp.WAMPType, kwargs: aiowamp.WAMPDict = None, details: aiowamp.WAMPDict = None) -> None: self.uri = URI(uri) self.args = list(args) or None self.kwargs = kwargs or None self.details = details or None
def _init(self, other: InvocationError) -> None: self.uri = other.uri self.args = other.args self.kwargs = other.kwargs self.details = other.details
[docs] def __repr__(self) -> str: if self.args: args_str = ", " + ", ".join(map(repr, self.args)) else: args_str = "" if self.kwargs: kwargs_str = f", kwargs={self.kwargs!r}" else: kwargs_str = "" if self.details: details_str = f", details={self.details!r}" else: details_str = "" return f"{type(self).__qualname__}({self.uri!r}{args_str}{kwargs_str}{details_str})"
[docs] def __str__(self) -> str: if self.args: args_str = ", ".join(map(str, self.args)) return f"{self.uri} {args_str}" return self.uri
ATTACHED_ERR_KEY = "__invocation_error__"
[docs]def set_invocation_error(exc: Exception, err: aiowamp.InvocationError) -> None: """Attach an invocation error to an exception. This makes it possible to raise a seemingly normal python `Exception` while preserving the additional information for WAMP. The attached error can then be retrieved by `exception_to_invocation_error`. If the exception is an invocation error, it is overwritten with the new error. Args: exc: Exception to attach the invocation error to. err: Invocation error to attach. """ if isinstance(exc, InvocationError): log.info("overwriting %s with %s", exc, err) exc._init(err) return setattr(exc, ATTACHED_ERR_KEY, err)
def exception_to_invocation_error(exc: Exception) -> InvocationError: if isinstance(exc, InvocationError): return exc try: return getattr(exc, ATTACHED_ERR_KEY) except AttributeError: pass try: uri = get_exception_uri(type(exc)) except LookupError: log.info(f"no uri registered for exception {type(exc).__qualname__}. " f"Using {RUNTIME_ERROR!r}") uri = RUNTIME_ERROR return InvocationError(uri, *exc.args)
[docs]class ClientClosed(BaseError): __slots__ = ()
[docs]class Interrupt(BaseError): __slots__ = ("options",) options: aiowamp.WAMPDict """Options sent with the interrupt."""
[docs] def __init__(self, options: aiowamp.WAMPDict) -> None: self.options = options
[docs] def __repr__(self) -> str: return f"{type(self).__qualname__}(options={self.options!r})"
@property def cancel_mode(self) -> aiowamp.CancelMode: """Cancel mode sent with the interrupt.""" return self.options["mode"]