# -*- coding: utf-8 -*-
# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
#
# Permission to use, copy, modify, and/or distribute this software for
# any purpose with or without fee is hereby granted, provided that the
# above copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
# SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from __future__ import unicode_literals
from builtins import super
from collections import defaultdict
from typing import Any, DefaultDict, Dict, List, NamedTuple, Optional
from jsonschema.exceptions import SchemaError, ValidationError
from logbook import Logger
from .events import (Event, InviteAliasEvent, InviteMemberEvent,
InviteNameEvent, PowerLevels, PowerLevelsEvent,
RoomAliasEvent, RoomCreateEvent, RoomEncryptionEvent,
RoomGuestAccessEvent, RoomHistoryVisibilityEvent,
RoomJoinRulesEvent, RoomMemberEvent, RoomNameEvent,
RoomTopicEvent, RoomAvatarEvent, TypingNoticeEvent)
from .log import logger_group
from .responses import RoomSummary
logger = Logger("nio.rooms")
logger_group.add_logger(logger)
__all__ = [
"MatrixRoom",
"MatrixInvitedRoom",
"MatrixUser",
]
[docs]class MatrixRoom(object):
"""Represents a Matrix room."""
def __init__(self, room_id, own_user_id, encrypted=False):
# type: (str, str, bool) -> None
"""Initialize a MatrixRoom object."""
# yapf: disable
self.room_id = room_id # type: str
self.own_user_id = own_user_id
self.creator = "" # type: str
self.federate = True # type: bool
self.room_version = "1" # type: str
self.guest_access = "forbidden" # type: str
self.join_rule = "invite" # type: str
self.history_visibility = "shared" # type: str
self.canonical_alias = None # type: Optional[str]
self.topic = None # type: Optional[str]
self.name = None # type: Optional[str]
self.users = dict() # type: Dict[str, MatrixUser]
self.names = defaultdict(list) # type: DefaultDict[str, List[str]]
self.encrypted = encrypted # type: bool
self.power_levels = PowerLevels() # type: PowerLevels
self.typing_users = [] # type: List[str]
self.summary = None # type: Optional[RoomSummary]
self.room_avatar_url = None # type: Optional[str]
# yapf: enable
@property
def display_name(self):
"""Calculate display name for a room.
Prefer returning the room name if it exists, falling back to
a group-style name if not.
Follows:
https://matrix.org/docs/spec/client_server/r0.3.0.html#id268
"""
if self.is_named:
return self.named_room_name()
return self.group_name()
[docs] def named_room_name(self):
"""Return the name of the room, if it's a named room.
Otherwise, return None.
"""
if self.name:
return self.name
elif self.canonical_alias:
return self.canonical_alias
else:
return None
[docs] def group_name(self):
"""Return the group-style name of the room.
In other words, a display name based on the names of room members. This
is used for ad-hoc groups of people (usually direct chats).
"""
# Sort user display names, excluding our own user and using the
# mxid as the sorting key.
user_names = [
self.user_name(u)
for u in sorted(self.users.keys())
if u != self.own_user_id
]
num_users = len(user_names)
if num_users == 1:
return user_names[0]
elif num_users == 2:
return " and ".join(user_names)
elif num_users >= 3:
return "{first_user} and {num} others".format(
first_user=user_names[0], num=num_users - 1
)
else:
return "Empty room?"
[docs] def user_name(self, user_id):
"""Get disambiguated display name for a user.
Returns display name of a user if display name is unique or returns
a display name in form "<display name> (<matrix id>)" if there is
more than one user with same display name.
"""
if user_id not in self.users:
return None
user = self.users[user_id]
if len(self.names[user.name]) > 1:
return user.disambiguated_name
return user.name
[docs] def user_name_clashes(self, name):
"""Get a list of users that have same display name."""
return self.names[name]
[docs] def avatar_url(self, user_id):
# type: (str) -> Optional[str]
"""Get avatar url for a user.
Returns a matrix content URI, or None if the user has no avatar.
"""
if user_id not in self.users:
return None
return self.users[user_id].avatar_url
@property
def gen_avatar_url(self):
"""
Get the calculated room's avatar url.
Either the room's avatar if one is set, or the avatar of the
first user that's not ourselves if the room is an unnamed group or
has exactly two users.
"""
if self.room_avatar_url:
return self.room_avatar_url
if self.is_group or len(self.users) == 2:
user = next(
(u for u in self.users if u != self.own_user_id),
None
)
return self.avatar_url(user)
return None
@property
def machine_name(self):
"""Calculate an unambiguous, unique machine name for a room.
Either use the more human-friendly canonical alias, if it exists, or
the internal room ID if not.
"""
if self.canonical_alias:
return self.canonical_alias
else:
return self.room_id
@property
def is_named(self):
"""Determine whether a room is name.
A named room is a room with either the name or a canonical alias set.
"""
return self.canonical_alias or self.name
@property
def is_group(self):
"""Determine whether a room is an ad-hoc group (often a direct chat).
A group is an unnamed room with no canonical alias.
"""
return not self.is_named
[docs] def add_member(self, user_id, display_name, avatar_url):
# type (str, str, str) -> bool
if user_id in self.users:
return False
level = self.power_levels.users.get(
user_id,
self.power_levels.defaults.users_default
)
user = MatrixUser(user_id, display_name, avatar_url, level)
self.users[user_id] = user
name = display_name if display_name else user_id
self.names[name].append(user_id)
return True
[docs] def remove_member(self, user_id):
# type (str) -> bool
if user_id in self.users:
user = self.users[user_id]
self.names[user.name].remove(user.user_id)
del self.users[user_id]
return True
return False
[docs] def handle_membership(self, event):
# type: (RoomMemberEvent) -> bool
"""Handle a membership event for the room.
Args:
event (RoomMemberEvent): The event that should be handled that
updates the room state.
Returns True if the member list of the room has changed False
otherwise.
"""
if event.content["membership"] == "join":
if event.state_key not in self.users:
display_name = event.content.get("displayname", None)
avatar_url = event.content.get("avatar_url", None)
return self.add_member(
event.state_key, display_name, avatar_url
)
# Handle profile changes
user = self.users[event.sender]
if "displayname" in event.content:
self.names[user.name].remove(user.user_id)
user.display_name = event.content["displayname"]
self.names[user.name].append(user.user_id)
if "avatar_url" in event.content:
user.avatar_url = event.content["avatar_url"]
return False
elif event.content["membership"] in ["leave", "ban"]:
return self.remove_member(event.state_key)
elif event.content["membership"] == "invite":
pass
return False
[docs] def handle_ephemeral_event(self, event):
if isinstance(event, TypingNoticeEvent):
self.typing_users = event.users
[docs] def handle_event(self, event):
# type: (Event) -> None
logger.info(
"Room {} handling event of type {}".format(
self.room_id, type(event).__name__
)
)
if isinstance(event, RoomCreateEvent):
self.creator = event.creator
self.federate = event.federate
self.room_version = event.room_version
elif isinstance(event, RoomGuestAccessEvent):
self.guest_access = event.guest_access
elif isinstance(event, RoomHistoryVisibilityEvent):
self.history_visibility = event.history_visibility
elif isinstance(event, RoomJoinRulesEvent):
self.join_rule = event.join_rule
elif isinstance(event, RoomNameEvent):
self.name = event.name
elif isinstance(event, RoomAliasEvent):
self.canonical_alias = event.canonical_alias
elif isinstance(event, RoomTopicEvent):
self.topic = event.topic
elif isinstance(event, RoomAvatarEvent):
self.room_avatar_url = event.avatar_url
elif isinstance(event, RoomEncryptionEvent):
self.encrypted = True
elif isinstance(event, PowerLevelsEvent):
self.power_levels.update(event.power_levels)
# Update the power levels of the joined users
for user_id, level in self.power_levels.users.items():
if user_id in self.users:
logger.info(
"Changing power level for user {} from {} to "
"{}".format(
user_id, self.users[user_id].power_level, level
)
)
self.users[user_id].power_level = level
[docs] def update_summary(self, summary):
if not self.summary:
self.summary = summary
return
if summary.joined_member_count:
self.summary.joined_member_count = summary.joined_member_count
if summary.invited_member_count:
self.summary.invited_member_count = summary.joined_member_count
if summary.heroes:
self.summary.heroes = summary.heroes
@property
def members_synced(self):
# type: () -> bool
"""Check if the room member state is fully synced.
Room members can be missing from the room if syncs are done using lazy
member loading, the room summary will contain the full member count but
other member info will be missing.
A `joined_members` request should be done for this room to populate the
member list. This is crucial for encrypted rooms before sending any
messages.
"""
if self.summary:
if self.summary.joined_member_count is not None:
return self.summary.joined_member_count == len(self.users)
return True
@property
def member_count(self):
if self.summary:
return self.summary.joined_member_count or len(self.users)
return len(self.users)
[docs]class MatrixInvitedRoom(MatrixRoom):
def __init__(self, room_id, own_user_id):
# type: (str, str) -> None
self.inviter = None # type: Optional[str]
super().__init__(room_id, own_user_id)
[docs] def handle_membership(self, event):
# type: (RoomMemberEvent) -> bool
"""Handle a membership event for the invited room.
Args:
event (RoomMemberEvent): The event that should be handled that
updates the room state.
Returns True if the member list of the room has changed False
otherwise.
"""
if (event.content["membership"] == "invite"
and event.state_key == self.own_user_id):
self.inviter = event.sender
return super().handle_membership(event)
[docs] def handle_event(self, event):
# type: (Event) -> None
logger.info(
"Room {} handling event of type {}".format(
self.room_id, type(event).__name__
)
)
if isinstance(event, InviteMemberEvent):
self.handle_membership(event)
elif isinstance(event, InviteNameEvent):
self.name = event.name
elif isinstance(event, InviteAliasEvent):
self.canonical_alias = event.canonical_alias
[docs]class MatrixUser(object):
def __init__(
self, user_id, display_name=None, avatar_url=None, power_level=0
):
# yapf: disable
self.user_id = user_id # type: str
self.display_name = display_name # type: str
self.avatar_url = avatar_url # type: str
self.power_level = power_level # type: int
# yapf: enable
@property
def name(self):
if self.display_name:
return self.display_name
return self.user_id
@property
def disambiguated_name(self):
# as per https://matrix.org/docs/spec/client_server/r0.4.0.html#id346
if self.display_name:
return "{name} ({user_id})".format(name=self.display_name,
user_id=self.user_id)
return self.user_id