Skip to content

Upgrade Synapse to 1.19.1 #6442

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions raiden/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ class Capabilities(Enum):
WEBRTC = "webRTC"


class ServerListType(Enum):
ACTIVE_SERVERS = "active_servers"
ALL_SERVERS = "all_servers"


# Set at 64 since parity's default is 64 and Geth's default is 128
# TODO: Make this configurable. Since in parity this is also a configurable value
STATE_PRUNING_AFTER_BLOCKS = 64
Expand Down
56 changes: 44 additions & 12 deletions raiden/network/pathfinding.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class PFSInfo:
version: str
confirmed_block_number: BlockNumber
matrix_server: str
matrix_room_id: Optional[str]


@dataclass
Expand Down Expand Up @@ -164,6 +165,7 @@ def get_pfs_info(url: str) -> PFSInfo:
version=infos["version"],
confirmed_block_number=infos["network_info"]["confirmed_block"]["number"],
matrix_server=matrix_server_info.netloc,
matrix_room_id=infos.get("matrix_room_id"),
)
except RequestException as e:
msg = "Selected Pathfinding Service did not respond"
Expand Down Expand Up @@ -248,7 +250,6 @@ def configure_pfs_or_exit(
node_network_id: ChainID,
token_network_registry_address: TokenNetworkRegistryAddress,
pathfinding_max_fee: TokenAmount,
matrix_servers: List[str],
) -> PFSInfo:
"""
Take in the given pfs_address argument, the service registry and find out a
Expand Down Expand Up @@ -315,17 +316,6 @@ def configure_pfs_or_exit(
f"Raiden will shut down. Please choose a different Pathfinding Service."
)

server_in_federation = any(
pathfinding_service_info.matrix_server in matrix_server for matrix_server in matrix_servers
)
# Only check if PFS is right federation when matrix server is not given explicitely
if len(matrix_servers) > 0 and not server_in_federation:
raise RaidenError(
f"The Pathfinding Service {pfs_url} is not connected to the same matrix federation. "
f"Please check your settings for PFS and matrix server, if manually chosen. "
f"Otherwise, check your environment-type."
)

click.secho(
f"You have chosen the Pathfinding Service at {pfs_url}.\n"
f"Operator: {pathfinding_service_info.operator}, "
Expand All @@ -341,6 +331,48 @@ def configure_pfs_or_exit(
return pathfinding_service_info


def check_pfs_transport_configuration(
pfs_info: PFSInfo,
pfs_was_autoselected: bool,
transport_pfs_broadcast_room_id: str,
matrix_server_url: str,
matrix_server_was_autoselected: bool,
) -> None:
if pfs_info.matrix_room_id is None:
# Special case until all PFSs are upgraded to >= 0.12.0
log.warning(
"Can't check PFS transport configuration",
pfs_version=pfs_info.version,
min_required_version="0.12.0",
pfs_url=pfs_info.url,
)
return
if pfs_info.matrix_room_id != transport_pfs_broadcast_room_id:
msg = (
f"The Pathfinding Service at {pfs_info.url} is not connected to the "
f"same Matrix transport federation as this Raiden node."
)
if matrix_server_was_autoselected and pfs_was_autoselected:
msg += (
f"\nBoth the Matrix transport server at {matrix_server_url} and the PFS were "
f"automatically selected. This points to a server-side misconfiguration. "
f"Please report this issue at "
f"https://github.com/raiden-network/raiden/issues/new?template=bug_report.md"
)
else:
msg += (
"\nPlease verify that the Matrix transport server and PFS service you selected "
"are intended to be used together or use the automatic server selection."
)
msg += (
f"\n\n"
f"- Transport server PFS broadcast room-id: {transport_pfs_broadcast_room_id}\n"
f"- PFS server broadcast room-id: {pfs_info.matrix_room_id}"
)

raise RaidenError(msg)


def check_pfs_for_production(
service_registry: Optional[ServiceRegistry], pfs_info: PFSInfo
) -> None:
Expand Down
54 changes: 36 additions & 18 deletions raiden/network/transport/matrix/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from functools import wraps
from itertools import repeat
from typing import Any, Callable, Container, Dict, Iterable, Iterator, List, Optional, Tuple
from urllib.parse import quote, urlparse
from urllib.parse import quote
from uuid import UUID, uuid4

import gevent
Expand Down Expand Up @@ -54,6 +54,7 @@ def __init__(self, client: "GMatrixClient", room_id: str) -> None:
super().__init__(client, room_id)
self._members: Dict[str, User] = {}
self.aliases: List[str]
self.canonical_alias: str

def get_joined_members(self, force_resync: bool = False) -> List[User]:
""" Return a list of members of this room. """
Expand Down Expand Up @@ -81,38 +82,39 @@ def _rmmembers(self, user_id: str) -> None:
self._members.pop(user_id, None)

def __repr__(self) -> str:
return f"<Room id={self.room_id!r} aliases={self.aliases!r}>"
return f"<Room id={self.room_id!r} canonical_alias={self.canonical_alias!r}>"

def update_local_aliases(self) -> bool:
""" Fetch server local aliases for the room.
def update_local_alias(self) -> bool:
""" Fetch the server local canonical alias for the room.

This is an optimization over the general `update_aliases()` method which fetches the
entire room state (which can be large in Raiden) and then discards all non-alias events.

Unfortunately due to a limitation in the Matrix API it's not possible to query for all
aliases of a room. Only aliases for a specific server can be fetched, see:
https://github.com/matrix-org/synapse/issues/6908
With MSC2432[1] implemented only ``m.room.canonical_alias`` events exist.
They represent the server local canonical_alias.

Since in Raiden we always have server local aliases set, this method is sufficient for our
use case.
Since in Raiden broadcast rooms always have a server local alias set, this method is
sufficient for our use case.

[1] https://github.com/matrix-org/matrix-doc/pull/2432

Returns:
boolean: True if the aliases changed, False if not
boolean: True if the canonical_alias changed, False if not
"""
server_name = urlparse(self.client.api.base_url).netloc
changed = False

try:
response = self.client.api.get_room_state_type(
self.room_id, "m.room.aliases", server_name
self.room_id, "m.room.canonical_alias", ""
)
except MatrixRequestError:
return False

if "aliases" in response:
if self.aliases != response["aliases"]:
self.aliases = response["aliases"]
changed = True
server_sent_alias = response.get("alias")
if server_sent_alias is not None and self.canonical_alias != server_sent_alias:
self.canonical_alias = server_sent_alias
changed = True

return changed


Expand Down Expand Up @@ -235,6 +237,22 @@ def answer(self, room_id: RoomID, answer: Dict[str, str]) -> None:
room_id=room_id, text_content=json.dumps(answer), msgtype="m.call.answer"
)

def get_aliases(self, room_id: str) -> Dict[str, Any]:
"""
Perform GET /rooms/{room_id}/aliases.

Requires Synapse >= 1.11.0 which implements the (as of yet) unstable MSC2432 room alias
semantics change.
"""
return self._send(
"GET",
f"/rooms/{room_id}/aliases",
api_path="/_matrix/client/unstable/org.matrix.msc2432",
)

def __repr__(self) -> str:
return f"<GMatrixHttpApi base_url={self.base_url}>"


class GMatrixClient(MatrixClient):
""" Gevent-compliant MatrixClient subclass """
Expand Down Expand Up @@ -535,8 +553,8 @@ def _mkroom(self, room_id: str) -> Room:
if room_id not in self.rooms:
self.rooms[room_id] = Room(self, room_id)
room = self.rooms[room_id]
if not room.aliases:
room.update_local_aliases()
if not room.canonical_alias:
room.update_local_alias()
return room

def get_user_presence(self, user_id: str) -> Optional[str]:
Expand Down
44 changes: 23 additions & 21 deletions raiden/network/transport/matrix/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,15 +384,15 @@ def _http_retry_delay() -> Iterable[float]:
None, self._handle_web_rtc_messages, self._handle_sdp_callback
)

self._server_url = self._client.api.base_url
self._server_name = urlparse(self._server_url).netloc
self.server_url = self._client.api.base_url
self._server_name = urlparse(self.server_url).netloc

self.greenlets: List[gevent.Greenlet] = list()

self._address_to_retrier: Dict[Address, _RetryQueue] = dict()
self._displayname_cache = DisplayNameCache()

self._broadcast_rooms: Dict[str, Room] = dict()
self.broadcast_rooms: Dict[str, Room] = dict()
self._broadcast_queue: JoinableQueue[Tuple[str, Message]] = JoinableQueue()

self._started = False
Expand Down Expand Up @@ -734,11 +734,11 @@ def _broadcast(room_name: str, serialized_message: str) -> None:
f"Known public rooms: {self._config.broadcast_rooms}."
)
room_name = make_room_alias(self.chain_id, room_name)
if room_name not in self._broadcast_rooms:
if room_name not in self.broadcast_rooms:
room = join_broadcast_room(self._client, f"#{room_name}:{self._server_name}")
self._broadcast_rooms[room_name] = room
self.broadcast_rooms[room_name] = room

existing_room = self._broadcast_rooms.get(room_name)
existing_room = self.broadcast_rooms.get(room_name)
assert existing_room, f"Unknown broadcast room: {room_name!r}"

self.log.debug(
Expand Down Expand Up @@ -803,7 +803,7 @@ def _initialize_first_sync(self) -> None:
# Call sync to fetch the inventory rooms and new invites, the sync
# limit prevents fetching the messages.
filter_id = self._client.create_sync_filter(
not_rooms=self._broadcast_rooms.values(), limit=0
not_rooms=self.broadcast_rooms.values(), limit=0
)
prev_sync_filter_id = self._client.set_sync_filter_id(filter_id)
# Need to reset this here, otherwise we might run into problems after a restart
Expand Down Expand Up @@ -834,7 +834,10 @@ def _initialize_first_sync(self) -> None:
self._set_room_id_for_address(partner_address[0], room.room_id)

self.log.debug(
"Found room", room=room, aliases=room.aliases, members=room.get_joined_members()
"Found room",
room=room,
canonical_alias=room.canonical_alias,
members=room.get_joined_members(),
)

def _leave_unexpected_rooms(
Expand All @@ -853,7 +856,7 @@ def to_string_representation(partner: Optional[Address]) -> str:
self.log.warning(
"Leaving Room",
reason=reason,
room_aliases=room.aliases,
canonical_alias=room.canonical_alias,
room_id=room.room_id,
partners=[to_string_representation(partner) for partner in partners],
)
Expand All @@ -880,14 +883,14 @@ def _join_broadcast_room(transport: MatrixTransport, room_name: str) -> None:
transport.log.debug(
"Joining broadcast room", broadcast_room_alias=broadcast_room_alias
)
transport._broadcast_rooms[room_name] = join_broadcast_room(
transport.broadcast_rooms[room_name] = join_broadcast_room(
client=transport._client, broadcast_room_alias=broadcast_room_alias
)

for suffix in self._config.broadcast_rooms:
alias_prefix = make_room_alias(self.chain_id, suffix)

if alias_prefix not in self._broadcast_rooms:
if alias_prefix not in self.broadcast_rooms:
pool.apply_async(_join_broadcast_room, args=(self, alias_prefix))

pool.join(raise_error=True)
Expand All @@ -905,10 +908,10 @@ def _initialize_sync(self) -> None:
"sync thread, since that is necessary to properly generate the "
"filters."
)
assert self._broadcast_rooms, msg
assert self.broadcast_rooms, msg

broadcast_filter_id = self._client.create_sync_filter(
not_rooms=self._broadcast_rooms.values()
not_rooms=self.broadcast_rooms.values()
)
self._client.set_sync_filter_id(broadcast_filter_id)

Expand Down Expand Up @@ -1070,7 +1073,7 @@ def _handle_invite(self, room_id: RoomID, state: dict) -> None:
self.log.debug(
"Joined from invite",
room_id=room_id,
aliases=room.aliases,
canonical_alias=room.canonical_alias,
inviting_address=to_checksum_address(peer_address),
)

Expand All @@ -1081,8 +1084,8 @@ def _handle_invite(self, room_id: RoomID, state: dict) -> None:
def _handle_member_join(self, room: Room) -> None:
if self._is_broadcast_room(room):
raise AssertionError(
f"Broadcast room events should be filtered in syncs: {room.aliases}."
f"Joined Broadcast Rooms: {list(self._broadcast_rooms.keys())}"
f"Broadcast room events should be filtered in syncs: {room.canonical_alias}."
f"Joined Broadcast Rooms: {list(self.broadcast_rooms.keys())}"
f"Should be joined to: {self._config.broadcast_rooms}"
)

Expand Down Expand Up @@ -1142,7 +1145,7 @@ def _handle_text(self, room: Room, message: MatrixMessage) -> List[Message]:
if self._is_broadcast_room(room):
# This must not happen. Nodes must not listen on broadcast rooms.
raise RuntimeError(
f"Received message in broadcast room {room.aliases[0]}. Sending user: {user}"
f"Received message in broadcast room {room.canonical_alias}. Sending user: {user}"
)

if not self._address_mgr.is_address_known(peer_address):
Expand Down Expand Up @@ -1532,10 +1535,9 @@ def create_web_rtc_channel(self, partner_address: Address) -> None:
del self._web_rtc_manager.address_to_rtc_partners[partner_address]

def _is_broadcast_room(self, room: Room) -> bool:
return any(
suffix in room_alias
for suffix in self._config.broadcast_rooms
for room_alias in room.aliases
has_alias = room.canonical_alias is not None
return has_alias and any(
suffix in room.canonical_alias for suffix in self._config.broadcast_rooms
)

def _user_presence_changed(self, user: User, _presence: UserPresence) -> None:
Expand Down
2 changes: 1 addition & 1 deletion raiden/network/transport/matrix/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ def first_login(client: GMatrixClient, signer: Signer, username: str, cap_str: s

# Disabling sync because login is done before the transport is fully
# initialized, i.e. the inventory rooms don't have the callbacks installed.
client.login(username, password, sync=False)
client.login(username, password, sync=False, device_id="raiden")

# Because this is the first login, the display name has to be set, this
# prevents the impersonation mentioned above. subsequent calls will reuse
Expand Down
2 changes: 1 addition & 1 deletion raiden/network/transport/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from raiden.utils.typing import Iterator
from typing import Iterator


def timeout_exponential_backoff(retries: int, timeout: float, maximum: float) -> Iterator[float]:
Expand Down
4 changes: 2 additions & 2 deletions raiden/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@
DEFAULT_MATRIX_KNOWN_SERVERS = {
Environment.PRODUCTION: (
"https://raw.githubusercontent.com/raiden-network/raiden-service-bundle"
"/master/known_servers/known_servers.alderaan.yaml"
"/master/known_servers/known_servers-production-v1.2.0.json"
),
Environment.DEVELOPMENT: (
"https://raw.githubusercontent.com/raiden-network/raiden-service-bundle"
"/master/known_servers/known_servers.test.yaml"
"/master/known_servers/known_servers-development-v1.2.0.json"
),
}

Expand Down
Loading