Skip to content

Introduce capabilites #6482

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions raiden/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ class Networks(Enum):
SMOKETEST = ChainID(627)


class Capabilities(Enum):
"""Capabilities allow for protocol handshake between nodes.
"""

NO_RECEIVE = "noReceive" # won't proceed with protocol for incoming transfers
NO_MEDIATE = "noMediate" # can't mediate transfers; mediating requires receiving
NO_DELIVERY = "noDelivery" # don't need Delivery messages
WEBRTC = "webRTC"


# Set at 64 since parity's default is 64 and Geth's default is 128
# TODO: Make this configurable. Since in parity this is also a configurable value
STATE_PRUNING_AFTER_BLOCKS = 64
Expand Down
7 changes: 6 additions & 1 deletion raiden/network/transport/matrix/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
from raiden.transfer.identifiers import CANONICAL_IDENTIFIER_UNORDERED_QUEUE, QueueIdentifier
from raiden.transfer.state import NetworkState, QueueIdsToQueues
from raiden.transfer.state_change import ActionChangeNodeNetworkState
from raiden.utils.capabilities import capconfig_to_dict
from raiden.utils.formatting import to_checksum_address, to_hex_address
from raiden.utils.logging import redact_secret
from raiden.utils.notifying_queue import NotifyingQueue
Expand All @@ -72,6 +73,7 @@
MessageID,
NamedTuple,
Optional,
PeerCapabilities,
RoomID,
Set,
Tuple,
Expand Down Expand Up @@ -442,10 +444,12 @@ def start( # type: ignore
self._address_mgr.start()

try:
capabilities = capconfig_to_dict(self._config.capabilities_config)
login(
client=self._client,
signer=self._raiden_service.signer,
prev_auth_data=prev_auth_data,
capabilities=capabilities,
)
except ValueError:
# `ValueError` may be raised if `get_user` provides invalid data to
Expand Down Expand Up @@ -1376,7 +1380,7 @@ def _user_presence_changed(self, user: User, _presence: UserPresence) -> None:
)

def _address_reachability_changed(
self, address: Address, reachability: AddressReachability
self, address: Address, reachability: AddressReachability, capabilities: PeerCapabilities
) -> None:
if reachability is AddressReachability.REACHABLE:
node_reachability = NetworkState.REACHABLE
Expand All @@ -1392,6 +1396,7 @@ def _address_reachability_changed(
raise TypeError(f'Unexpected reachability state "{reachability}".')

assert self._raiden_service is not None, "_raiden_service not set"
self._address_mgr._address_to_capabilities[address] = capabilities
state_change = ActionChangeNodeNetworkState(address, node_reachability)
self._raiden_service.handle_and_track_state_changes([state_change])

Expand Down
64 changes: 54 additions & 10 deletions raiden/network/transport/matrix/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@
)
from raiden.network.utils import get_average_http_response_time
from raiden.storage.serialization.serializer import MessageSerializer
from raiden.utils.capabilities import parse_capabilities, serialize_capabilities
from raiden.utils.gevent import spawn_named
from raiden.utils.signer import Signer, recover
from raiden.utils.typing import Address, ChainID, MessageID, Signature
from raiden.utils.typing import Address, ChainID, MessageID, PeerCapabilities, Signature
from raiden_contracts.constants import ID_TO_CHAINNAME

log = structlog.get_logger(__name__)
Expand Down Expand Up @@ -186,7 +187,9 @@ def __init__(
self,
client: GMatrixClient,
displayname_cache: DisplayNameCache,
address_reachability_changed_callback: Callable[[Address, AddressReachability], None],
address_reachability_changed_callback: Callable[
[Address, AddressReachability, PeerCapabilities], None
],
user_presence_changed_callback: Optional[Callable[[User, UserPresence], None]] = None,
_log_context: Optional[Dict[str, Any]] = None,
) -> None:
Expand Down Expand Up @@ -269,6 +272,10 @@ def get_address_reachability_state(self, address: Address) -> ReachabilityState:
""" Return the current reachability state for ``address``. """
return self._address_to_reachabilitystate.get(address, UNKNOWN_REACHABILITY_STATE)

def get_address_capabilities(self, address: Address) -> PeerCapabilities:
""" Return the protocol capabilities for ``address``. """
return self._address_to_capabilities.get(address, PeerCapabilities({}))

def force_user_presence(self, user: User, presence: UserPresence) -> None:
""" Forcibly set the ``user`` presence to ``presence``.

Expand Down Expand Up @@ -331,6 +338,18 @@ def track_address_presence(

self._maybe_address_reachability_changed(address)

def query_capabilities_for_user_id(self, user_id: str) -> PeerCapabilities:
""" This pulls the `avatar_url` for a given user/user_id and parses the capabilities. """
try:
user: User = self._client.get_user(user_id)
except MatrixRequestError:
return PeerCapabilities({})
avatar_url = user.get_avatar_url()
if avatar_url is not None:
return PeerCapabilities(parse_capabilities(avatar_url))
else:
return PeerCapabilities({})

def get_reachability_from_matrix(self, user_ids: Iterable[str]) -> AddressReachability:
""" Get the current reachability without any side effects

Expand All @@ -347,11 +366,14 @@ def get_reachability_from_matrix(self, user_ids: Iterable[str]) -> AddressReacha
def _maybe_address_reachability_changed(self, address: Address) -> None:
# A Raiden node may have multiple Matrix users, this happens when
# Raiden roams from a Matrix server to another. This loop goes over all
# these users and uses the "best" presence. IOW, if there is a single
# these users and uses the "best" presence. IOW, if there is at least one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just had a discussion with @ulope about the feasability of allowing multiple users of the same address online. This should actually be forbidden per definition because it would imply multiple usage of the same keystore. A raiden node could prevent starting if a user for the same address is found online.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment change was only to reflect the current code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we should ask us if we should change the code. Why should we handle multiple users being online if this should be prevented anyway?
But on the other hand this could also be implemented in a different PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should create a follow up issue to discuss this:

  • for a canonical client, we can definitely try to forbid multiple logins
  • for the peer handling side (such as here), we have to deal with the possibility of multiple logins
  • I don't know how roaming user ids behave, when it comes to login/logout. We need to clarify if there are interleaved states, where the old user_id still appears online, while the new user_id already came up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. A user is set to be offline by the server whenever he did not longpoll sync() for 30s. This definitely leaves the possibility open to appear online with 2 user ids for the same address.

# Matrix user that is reachable, then the Raiden node is considered
# reachable.
userids = self._address_to_userids[address].copy()
composite_presence = {self._userid_to_presence.get(uid) for uid in userids}
presence_to_uid = defaultdict(list)
for uid in userids:
presence_to_uid[self._userid_to_presence.get(uid)].append(uid)
composite_presence = set(presence_to_uid.keys())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference here to the old code?

Copy link
Contributor Author

@konradkonrad konradkonrad Sep 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We preserve the mapping presence <-> user_id in order to pull the avatar_url for a user_id that has the new_presence.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old code was only concerned with ordering presences.


new_presence = UserPresence.UNKNOWN
for presence in UserPresence.__members__.values():
Expand All @@ -364,7 +386,9 @@ def _maybe_address_reachability_changed(self, address: Address) -> None:
prev_reachability_state = self.get_address_reachability_state(address)
if new_address_reachability == prev_reachability_state.reachability:
return

# for capabilities, we get the "first" uid that showed the `new_presence`
present_uid = presence_to_uid[new_presence].pop()
capabilities = self.query_capabilities_for_user_id(present_uid)
now = datetime.now()

self.log.debug(
Expand All @@ -380,7 +404,9 @@ def _maybe_address_reachability_changed(self, address: Address) -> None:
new_address_reachability, now
)

self._address_reachability_changed_callback(address, new_address_reachability)
self._address_reachability_changed_callback(
address, new_address_reachability, capabilities
)

def _presence_listener(self, event: Dict[str, Any], presence_update_id: int) -> None:
"""
Expand Down Expand Up @@ -437,6 +463,7 @@ def _presence_listener(self, event: Dict[str, Any], presence_update_id: int) ->
def _reset_state(self) -> None:
self._address_to_userids: Dict[Address, Set[str]] = defaultdict(set)
self._address_to_reachabilitystate: Dict[Address, ReachabilityState] = dict()
self._address_to_capabilities: Dict[Address, PeerCapabilities] = dict()
self._userid_to_presence: Dict[str, UserPresence] = dict()
self._userid_to_presence_update_id: Dict[str, int] = dict()

Expand Down Expand Up @@ -569,7 +596,7 @@ def join_broadcast_room(client: GMatrixClient, broadcast_room_alias: str) -> Roo
)


def first_login(client: GMatrixClient, signer: Signer, username: str) -> User:
def first_login(client: GMatrixClient, signer: Signer, username: str, cap_str: str) -> User:
"""Login within a server.

There are multiple cases where a previous auth token can become invalid and
Expand Down Expand Up @@ -623,7 +650,7 @@ def first_login(client: GMatrixClient, signer: Signer, username: str) -> User:
client.login(username, password, sync=False)

# Because this is the first login, the display name has to be set, this
# prevents the impersonation metioned above. subsequent calls will reuse
# prevents the impersonation mentioned above. subsequent calls will reuse
# the authentication token and the display name will be properly set.
signature_bytes = signer.sign(client.user_id.encode())
signature_hex = encode_hex(signature_bytes)
Expand All @@ -635,6 +662,12 @@ def first_login(client: GMatrixClient, signer: Signer, username: str) -> User:
if current_display_name != signature_hex:
user.set_display_name(signature_hex)

current_capabilities = user.get_avatar_url() or ""

# Only set the capabilities if necessary.
if current_capabilities != cap_str:
user.set_avatar_url(cap_str)

log.debug(
"Logged in",
node=to_checksum_address(username),
Expand Down Expand Up @@ -679,14 +712,21 @@ def login_with_token(client: GMatrixClient, user_id: str, access_token: str) ->
return client.get_user(client.user_id)


def login(client: GMatrixClient, signer: Signer, prev_auth_data: Optional[str] = None) -> User:
def login(
client: GMatrixClient,
signer: Signer,
prev_auth_data: Optional[str] = None,
capabilities: Dict[str, Any] = None,
) -> User:
""" Login with a matrix server.

Params:
client: GMatrixClient instance configured with desired homeserver.
signer: Signer used to sign the password and displayname.
prev_auth_data: Previously persisted authentication using the format "{user}/{password}".
"""
if capabilities is None:
capabilities = {}
server_url = client.api.base_url
server_name = urlparse(server_url).netloc

Expand All @@ -708,7 +748,11 @@ def login(client: GMatrixClient, signer: Signer, prev_auth_data: Optional[str] =
server_name=server_name,
)

return first_login(client, signer, username)
try:
capstr = serialize_capabilities(capabilities)
except ValueError:
raise Exception("error serializing")
return first_login(client, signer, username, capstr)


@cached(cache=LRUCache(128), key=attrgetter("user_id", "displayname"), lock=Semaphore())
Expand Down
10 changes: 10 additions & 0 deletions raiden/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,14 @@ def get_proportional_imbalance_fee(self, token_address: TokenAddress) -> Proport
)


@dataclass
class CapabilitiesConfig:
no_receive: bool = False
no_mediate: bool = False
no_delivery: bool = False
web_rtc: bool = False


@dataclass
class MatrixTransportConfig:
retries_before_backoff: int
Expand All @@ -137,6 +145,7 @@ class MatrixTransportConfig:
available_servers: List[str]
sync_timeout: int = DEFAULT_TRANSPORT_MATRIX_SYNC_TIMEOUT
sync_latency: int = DEFAULT_TRANSPORT_MATRIX_SYNC_LATENCY
capabilities_config: CapabilitiesConfig = CapabilitiesConfig()


@dataclass
Expand Down Expand Up @@ -201,6 +210,7 @@ class RaidenConfig:
retry_interval_max=DEFAULT_TRANSPORT_MATRIX_RETRY_INTERVAL_MAX,
server=MATRIX_AUTO_SELECT_SERVER,
sync_timeout=DEFAULT_TRANSPORT_MATRIX_SYNC_TIMEOUT,
capabilities_config=CapabilitiesConfig(),
)

rest_api: RestApiConfig = RestApiConfig()
Expand Down
17 changes: 17 additions & 0 deletions raiden/tests/integration/fixtures/raiden_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from raiden.constants import Environment, RoutingMode
from raiden.raiden_service import RaidenService
from raiden.settings import CapabilitiesConfig
from raiden.tests.utils.network import (
CHAIN,
BlockchainServices,
Expand Down Expand Up @@ -82,6 +83,7 @@ def raiden_chain(
resolver_ports: List[Optional[int]],
enable_rest_api: bool,
port_generator: Iterator[Port],
capabilities: CapabilitiesConfig,
) -> Iterable[List[RaidenService]]:

if len(token_addresses) != 1:
Expand Down Expand Up @@ -122,6 +124,7 @@ def raiden_chain(
resolver_ports=resolver_ports,
enable_rest_api=enable_rest_api,
port_generator=port_generator,
capabilities_config=capabilities,
)

confirmed_block = BlockNumber(raiden_apps[0].confirmation_blocks + 1)
Expand Down Expand Up @@ -187,6 +190,18 @@ def resolvers(resolver_ports):
resolver.terminate()


@pytest.fixture
def adhoc_capability():
return False


@pytest.fixture
def capabilities(adhoc_capability) -> CapabilitiesConfig:
config = CapabilitiesConfig()
config.adhoc_capability = adhoc_capability # type: ignore
return config


@pytest.fixture
def raiden_network(
token_addresses: List[TokenAddress],
Expand Down Expand Up @@ -217,6 +232,7 @@ def raiden_network(
resolver_ports: List[Optional[int]],
enable_rest_api: bool,
port_generator: Iterator[Port],
capabilities: CapabilitiesConfig,
) -> Iterable[List[RaidenService]]:
service_registry_address = None
if blockchain_services.service_registry:
Expand Down Expand Up @@ -249,6 +265,7 @@ def raiden_network(
resolver_ports=resolver_ports,
enable_rest_api=enable_rest_api,
port_generator=port_generator,
capabilities_config=capabilities,
)

confirmed_block = BlockNumber(raiden_apps[0].confirmation_blocks + 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@
from raiden.transfer.identifiers import CANONICAL_IDENTIFIER_UNORDERED_QUEUE, QueueIdentifier
from raiden.transfer.state import NetworkState
from raiden.transfer.state_change import ActionChannelClose
from raiden.utils.capabilities import parse_capabilities
from raiden.utils.formatting import to_checksum_address
from raiden.utils.typing import Address, Dict, List, TokenNetworkAddress, cast
from raiden.utils.typing import Address, Dict, List, PeerCapabilities, TokenNetworkAddress, cast
from raiden.waiting import wait_for_network_state

HOP1_BALANCE_PROOF = factories.BalanceProofSignedStateProperties(pkey=factories.HOP1_KEY)
Expand Down Expand Up @@ -1322,3 +1323,36 @@ def test_transport_presence_updates(
app2.transport.immediate_health_check_for(app1.address)
wait_for_network_state(app0, app2.address, NetworkState.REACHABLE, retry_timeout)
wait_for_network_state(app1, app2.address, NetworkState.REACHABLE, retry_timeout)


@raise_on_failure
@pytest.mark.parametrize("matrix_server_count", [1])
@pytest.mark.parametrize("number_of_nodes", [2])
@pytest.mark.parametrize("adhoc_capability", [True])
@pytest.mark.parametrize(
"broadcast_rooms", [[DISCOVERY_DEFAULT_ROOM, PATH_FINDING_BROADCASTING_ROOM]]
)
def test_transport_capabilities(raiden_network: List[RaidenService], retry_timeout):
"""
Test that raiden matrix users have the `avatar_url` set in a format understood
by the capabilities parser.
"""
app0, app1 = raiden_network

app0.transport.immediate_health_check_for(app1.address)
app1.transport.immediate_health_check_for(app0.address)

wait_for_network_state(app0, app1.address, NetworkState.REACHABLE, retry_timeout)
wait_for_network_state(app1, app0.address, NetworkState.REACHABLE, retry_timeout)

app1_user_ids = app0.transport.get_user_ids_for_address(app1.address)
assert len(app1_user_ids) == 1, "app1 should have exactly one user_id"
app1_user = app0.transport._client.get_user(app1_user_ids.pop())
app1_avatar_url = app1_user.get_avatar_url()
assert "adhoc_capability" in app1_avatar_url, "avatar_url not set for app1"
msg = "capabilities could not be parsed"
assert parse_capabilities(app1_avatar_url) == dict(adhoc_capability=True), msg

msg = "capabilities were not collected in transport client"
collected_capabilities = app0.transport._address_mgr.get_address_capabilities(app1.address)
assert collected_capabilities == PeerCapabilities(dict(adhoc_capability=True)), msg
Loading