Source code for aiowamp.client.auth

"""Provides the built-in authentication methods."""

from __future__ import annotations

import abc
import base64
import hashlib
import hmac
import logging
from typing import ClassVar, Dict, Iterator, Mapping, Optional, Union

import aiowamp
from aiowamp.msg import Authenticate as AuthenticateMsg

__all__ = ["AuthMethodABC",
           "AuthKeyringABC", "AuthKeyring",
           "CRAuth", "TicketAuth"]

log = logging.getLogger(__name__)


[docs]class AuthMethodABC(abc.ABC): """Abstract auth method.""" __slots__ = () method_name: ClassVar[str] """Name of the auth method."""
[docs] def __str__(self) -> str: return f"{type(self).__qualname__} {self.method_name!r}"
@property @abc.abstractmethod def requires_auth_id(self) -> bool: """Whether the auth method requires auth_id to be set.""" ... @property @abc.abstractmethod def auth_extra(self) -> Optional[aiowamp.WAMPDict]: """Additional auth extras that need to be passed along. `None` indicates that no extras need to be passed. """ ...
[docs] @abc.abstractmethod async def authenticate(self, challenge: aiowamp.msg.Challenge) \ -> Union[aiowamp.msg.Authenticate, aiowamp.msg.Abort]: """Generate an authenticate message for the challenge. Args: challenge: Challenge message to respond to. Returns: Either an authenticate message to send to the router or an abort message to indicate that the attempt should be aborted. Raises: Exception: When something goes wrong during authentication. Should be interpreted the same as an abort message. """ ...
[docs] async def check_welcome(self, welcome: aiowamp.msg.Welcome) -> None: """Check the welcome message sent by the router. Used to perform mutual authentication. Args: welcome: Welcome message sent by the router. Raises: aiowamp.AuthError: """ pass
[docs]class AuthKeyringABC(Mapping[str, "aiowamp.AuthMethodABC"], abc.ABC): """Abstract keyring for auth methods.""" __slots__ = ()
[docs] def __str__(self) -> str: methods = ", ".join(self) return f"{type(self).__qualname__}({methods})"
[docs] @abc.abstractmethod def __getitem__(self, method: str) -> aiowamp.AuthMethodABC: """Get the auth method with the given name. Args: method: Name of the method to get. Returns: Auth method stored in the keyring. Raises: KeyError: If no auth method with the given method exists in the keyring. """ ...
[docs] @abc.abstractmethod def __len__(self) -> int: """Get the amount of auth methods in the keyring.""" ...
[docs] @abc.abstractmethod def __iter__(self) -> Iterator[str]: """Get an iterator for the auth method names in the keyring.""" ...
@property @abc.abstractmethod def auth_id(self) -> Optional[str]: """Auth id to use during authentication. Because most authentication methods require an auth id, this is handled by the keyring. """ ... @property @abc.abstractmethod def auth_extra(self) -> Optional[aiowamp.WAMPDict]: """Auth extras with all the auth extras from the underlying methods. `None` if no auth extras are required. """ ...
[docs]class AuthKeyring(AuthKeyringABC): __slots__ = ("__auth_methods", "__auth_id", "__auth_extra") __auth_methods: Dict[str, "aiowamp.AuthMethodABC"] __auth_id: Optional[str] __auth_extra: Optional[aiowamp.WAMPDict]
[docs] def __init__(self, *methods: "aiowamp.AuthMethodABC", auth_id: str = None) -> None: """Initialise the keyring. Args: *methods: Methods to initialise the keyring with. auth_id: Auth id to use. Defaults to `None`. Raises: ValueError: - The same auth method type was specified multiple times. - No auth id was specified but one of the methods requires it. - Multiple methods specify the same auth extra key with differing values. """ auth_methods = {} auth_extra = {} for method in methods: name = method.method_name if name in auth_methods: raise ValueError(f"received same auth method multiple times: {name}") if auth_id is None and method.requires_auth_id: raise ValueError(f"{method} requires auth_id!") auth_methods[name] = method m_auth_extra = method.auth_extra if not m_auth_extra: continue for key, value in m_auth_extra.items(): try: existing_value = auth_extra[key] except KeyError: pass else: if existing_value != value: raise ValueError(f"{method} provides auth extra {key} = {value!r}, " f"but the key is already set by another method as {existing_value!r}") auth_extra[key] = value self.__auth_methods = auth_methods self.__auth_id = auth_id self.__auth_extra = auth_extra or None
[docs] def __repr__(self) -> str: methods = ", ".join(map(repr, self.__auth_methods.values())) return f"{type(self).__qualname__}({methods})"
[docs] def __getitem__(self, method: str) -> AuthMethodABC: return self.__auth_methods[method]
[docs] def __len__(self) -> int: return len(self.__auth_methods)
[docs] def __iter__(self) -> Iterator[str]: return iter(self.__auth_methods)
@property def auth_id(self) -> Optional[str]: return self.__auth_id @property def auth_extra(self) -> Optional[aiowamp.WAMPDict]: return self.__auth_extra
[docs]class CRAuth(AuthMethodABC): """Auth method for challenge response authentication. WAMP Challenge-Response ("WAMP-CRA") authentication is a simple, secure authentication mechanism using a shared secret. The client and the server share a secret. The secret never travels the wire, hence WAMP-CRA can be used via non-TLS connections. The actual pre-sharing of the secret is outside the scope of the authentication mechanism. """ method_name = "wampcra" __slots__ = ("secret",) secret: str """Secret to use for authentication."""
[docs] def __init__(self, secret: str) -> None: """Initialise the auth method. Args: secret: Secret to use. """ self.secret = secret
[docs] def __repr__(self) -> str: return f"{type(self).__qualname__}({self.secret!r})"
@property def requires_auth_id(self) -> bool: return True @property def auth_extra(self) -> None: return None
[docs] def pbkdf2_hmac(self, salt: str, key_len: int, iterations: int) -> bytes: """Derive the token using the pdkdf2 scheme. Args: salt: Salt key_len: Key length iterations: Amount of iterations Returns: Generated token bytes. """ return hashlib.pbkdf2_hmac("sha256", self.secret, salt, iterations, key_len)
[docs] async def authenticate(self, challenge: aiowamp.msg.Challenge) -> aiowamp.msg.Authenticate: extra = challenge.extra try: challenge_str: str = extra["challenge"] except KeyError: raise KeyError("challenge didn't provide 'challenge' string to sign") from None try: salt: str = extra["salt"] key_len: int = extra["keylen"] iterations: int = extra["iterations"] except KeyError: log.info("%s: using secret directly", self) secret = self.secret else: log.info("%s: deriving secret from salted password", self) secret = self.pbkdf2_hmac(salt, iterations, key_len) digest = hmac.digest(secret, challenge_str, hashlib.sha256) signature = base64.b64encode(digest).encode() return aiowamp.msg.Authenticate(signature, {})
[docs]class TicketAuth(AuthMethodABC): """Auth method for ticket-based authentication. With Ticket-based authentication, the client needs to present the server an authentication "ticket" - some magic value to authenticate itself to the server. This "ticket" could be a long-lived, pre-agreed secret (e.g. a user password) or a short-lived authentication token (like a Kerberos token). WAMP does not care or interpret the ticket presented by the client. Caution: This scheme is extremely simple and flexible, but the resulting security may be limited. E.g., the ticket value will be sent over the wire. If the transport WAMP is running over is not encrypted, a man-in-the-middle can sniff and possibly hijack the ticket. If the ticket value is reused, that might enable replay attacks. """ method_name = "ticket" __slots__ = ("__ticket",) __ticket: str
[docs] def __init__(self, ticket: str) -> None: self.__ticket = ticket
@property def requires_auth_id(self) -> bool: return True @property def auth_extra(self) -> None: return None
[docs] async def authenticate(self, challenge: aiowamp.msg.Challenge) -> aiowamp.msg.Authenticate: return AuthenticateMsg(self.__ticket, {})
class ScramAuth(AuthMethodABC): method_name = "wamp-scram" __slots__ = () @property def requires_auth_id(self) -> bool: return True @property def auth_extra(self) -> aiowamp.WAMPDict: return {"nonce": "", "channel_binding": None}