# -*- coding: utf-8 -*-
# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
#
# Permission to use, copy, modify, and/or distribute this software for
# any purpose with or without fee is hereby granted, provided that the
# above copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
# SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from builtins import super
from datetime import datetime, timedelta
from enum import Enum
from typing import Dict, List, Optional, Set, Tuple
import attr
import olm
from ..exceptions import EncryptionError
from ..messages import ToDeviceMessage
if False:
from ..responses import RoomKeyRequestResponse
class OlmAccount(olm.Account):
def __init__(self):
# type: () -> None
self.shared = False
super().__init__()
def __new__(cls, *args):
return super().__new__(cls)
@classmethod
def from_pickle(
cls,
pickle, # type: bytes
passphrase='', # type: str
shared=False # type: bool
):
# type: (...) -> OlmAccount
account = super().from_pickle(pickle, passphrase)
account.shared = shared
return account
class _SessionExpirationMixin(object):
@property
def expired(self):
return False
class Session(olm.Session, _SessionExpirationMixin):
def __init__(self):
super().__init__()
self.creation_time = datetime.now()
self.use_time = datetime.now()
def __new__(cls, *args):
return super().__new__(cls, *args)
@classmethod
def from_pickle(cls, pickle, creation_time, passphrase="", use_time=None):
# type: (str, datetime, str, Optional[datetime]) -> Session
session = super().from_pickle(pickle, passphrase)
session.creation_time = creation_time
session.use_time = use_time or creation_time
return session
def decrypt(self, ciphertext):
self.use_time = datetime.now()
return super().decrypt(ciphertext)
def encrypt(self, plaintext):
self.use_time = datetime.now()
return super().encrypt(plaintext)
class InboundSession(olm.InboundSession, _SessionExpirationMixin):
def __new__(cls, *args):
return super().__new__(cls, *args)
def __init__(self, account, message, identity_key=None):
super().__init__(account, message, identity_key)
self.creation_time = datetime.now()
self.use_time = datetime.now()
def decrypt(self, ciphertext):
self.use_time = datetime.now()
return super().decrypt(ciphertext)
def encrypt(self, plaintext):
self.use_time = datetime.now()
return super().encrypt(plaintext)
class OutboundSession(olm.OutboundSession, _SessionExpirationMixin):
def __new__(cls, *args):
return super().__new__(cls, *args)
def __init__(self, account, identity_key, one_time_key):
super().__init__(account, identity_key, one_time_key)
self.creation_time = datetime.now()
self.use_time = datetime.now()
def decrypt(self, ciphertext):
self.use_time = datetime.now()
return super().decrypt(ciphertext)
def encrypt(self, plaintext):
self.use_time = datetime.now()
return super().encrypt(plaintext)
class InboundGroupSession(olm.InboundGroupSession):
def __init__(
self,
session_key, # type: str
signing_key, # type: str
sender_key, # type: str
room_id, # type: str
forwarding_chains=None # type: Optional[List[str]]
):
# type: (...) -> None
self.ed25519 = signing_key
self.sender_key = sender_key
self.room_id = room_id
self.forwarding_chain = forwarding_chains or [] # type: List[str]
super().__init__(session_key)
def __new__(cls, *args):
return super().__new__(cls)
@classmethod
def from_pickle(
cls,
pickle, # type: bytes
signing_key, # type: str
sender_key, # type: str
room_id, # type: str
passphrase='', # type: str
forwarding_chain=None # type: List[str]
):
# type: (...) -> InboundGroupSession
session = super().from_pickle(pickle, passphrase)
session.ed25519 = signing_key
session.sender_key = sender_key
session.room_id = room_id
session.forwarding_chain = forwarding_chain or []
return session
@classmethod
def import_session(
cls,
session_key, # type: str
signing_key, # type: str
sender_key, # type: str
room_id, # type: str
forwarding_chain=None # type: Optional[List[str]]
):
session = super().import_session(session_key)
session.ed25519 = signing_key
session.sender_key = sender_key
session.room_id = room_id
session.forwarding_chain = forwarding_chain or []
return session
class OutboundGroupSession(olm.OutboundGroupSession):
"""Outbound group session aware of the users it is shared with.
Also remembers the time it was created and the number of messages it has
encrypted, in order to know if it needs to be rotated.
Attributes:
creation_time (datetime.datetime): Creation time of the session.
message_count (int): Number of messages encrypted using the session.
"""
def __init__(self):
self.max_age = timedelta(days=7)
self.max_messages = 100
self.creation_time = datetime.now()
self.message_count = 0
self.users_shared_with = set() # type: Set[Tuple[str, str]]
self.users_ignored = set() # type: Set[Tuple[str, str]]
self.shared = False
super().__init__()
def __new__(cls, **kwargs):
return super().__new__(cls)
def mark_as_shared(self):
self.shared = True
@property
def expired(self):
return self.should_rotate()
def should_rotate(self):
"""Wether the session should be rotated.
Returns:
True if it should, False if not.
"""
if (self.message_count >= self.max_messages
or datetime.now() - self.creation_time >= self.max_age):
return True
return False
def encrypt(self, plaintext):
if not self.shared:
raise EncryptionError("Error, session is not shared")
if self.expired:
raise EncryptionError("Error, session is has expired")
self.message_count += 1
return super().encrypt(plaintext)
# TODO document the values better.
[docs]class TrustState(Enum):
"""The device trust state.
An Enum holding differing values that a device trust state can be in.
"""
unset = 0
verified = 1
blacklisted = 2
ignored = 3
[docs]@attr.s
class OlmDevice(object):
"""Class holding info about users Olm devices.
OlmDevices represent user devices with which we can communicate in an
encrypted manner. To do so an OlmDevice needs to have its trust state set.
The trust state can be set to one of "verified", "ignored", or
"blacklisted".
Note that the trust state should never be moddified directly on an
OlmDevice, all the attributes here are read only.
The trust state can be changed by pasing the OlmDevice to a nio Client or a
MatrixStore class.
Attributes:
user_id (str): The id of the user that the device belongs to.
device_id (str): The device id that combined with the user id uniquely
identifies the device.
keys (Dict): A dictionary containing the type and the public part
of this devices encryption keys.
display_name (str): The human readable name of this device.
deleted (bool): A boolean signaling if this device has been deleted by
its owner.
trust_state (TrustState): The trust state of this device.
"""
user_id = attr.ib(type=str)
device_id = attr.ib(type=str)
keys = attr.ib(type=Dict[str, str])
display_name = attr.ib(type=str, default="")
deleted = attr.ib(type=bool, default=False)
trust_state = attr.ib(type=TrustState, default=TrustState.unset)
@property
def id(self):
"""The device id.
Same as the device_id attribute.
"""
return self.device_id
@property
def ed25519(self):
"""The ed25519 fingerprint key of the device."""
return self.keys["ed25519"]
@ed25519.setter
def ed25519(self, new_value):
self.keys["ed25519"] = new_value
@property
def curve25519(self):
"""The curve25519 key of the device."""
return self.keys["curve25519"]
@curve25519.setter
def curve25519(self, new_value):
self.keys["curve25519"] = new_value
[docs] def as_dict(self):
"""Convert the OlmDevice into a dictionary."""
device = attr.asdict(self)
device["trust_state"] = self.trust_state.name
return device
@property
def verified(self):
# type: () -> bool
"""Is the device verified."""
return self.trust_state == TrustState.verified
@property
def ignored(self):
# type: () -> bool
"""Is the device ignored."""
return self.trust_state == TrustState.ignored
@property
def blacklisted(self):
# type: () -> bool
"""Is the device blacklisted."""
return self.trust_state == TrustState.blacklisted
@attr.s
class OutgoingKeyRequest(object):
"""Key request that we sent out."""
request_id = attr.ib(type=str)
session_id = attr.ib(type=str)
room_id = attr.ib(type=str)
algorithm = attr.ib(type=str)
@classmethod
def from_response(cls, response):
# type: (RoomKeyRequestResponse) -> OutgoingKeyRequest
"""Create a key request object from a RoomKeyRequestResponse."""
return cls(
response.request_id,
response.session_id,
response.room_id,
response.algorithm
)
@classmethod
def from_database(cls, row):
"""Create a key request object from a database row."""
return cls.from_response(row)
def as_cancellation(self, user_id, requesting_device_id):
"""Turn the key request into a cancellation to-device message."""
content = {
"action": "request_cancellation",
"request_id": self.request_id,
"requesting_device_id": requesting_device_id,
}
return ToDeviceMessage(
"m.room_key_request",
user_id,
"*",
content
)