Skip to content

TODO: add read/write lock #748

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

Jineshbansal
Copy link

@Jineshbansal Jineshbansal commented Jul 7, 2025

Purpose

Implements the pending TODO for read/write synchronization in MplexStream.

Logic Behind the Lock

  1. Writing is blocked if any read is in progress
  2. Reading is blocked if a write is in progress
  3. Multiple reads are allowed concurrently
  4. Only one write is allowed at a time

Steps Taken

  1. Introduced a custom ReadWriteLock class to handle concurrent access
  2. Instantiated rw_lock in MplexStream.init
  3. Wrapped read() and write() with appropriate rw_lock methods, without changing their internal logic

Tests

To validate the above logic, a unit test has been added in libp2p/stream_muxer/mplex/test_read_write_lock.py.

@Jineshbansal
Copy link
Author

Please take a look at this @seetadev, If you tell me some pointers on how to setup test files, that would be really helpful

@seetadev
Copy link
Contributor

seetadev commented Jul 7, 2025

@Jineshbansal : Welcome to py-libp2p :) Thank you for submitting this PR. Appreciate your efforts and initiative.

CCing @lla-dane, @acul71, @kaneki003 for their feedback and review on your PR. They will also help you in setting up the test suite, add key scenarios, newsfragment and arrive at a good conclusion on the issue addressed via the PR.

Also, CCing @guha-rahul and @sumanjeet0012. Would like them to review your PR with the team and share feedback too.

@lla-dane
Copy link
Contributor

lla-dane commented Jul 8, 2025

Also @Jineshbansal: Take a look at this file: newsfragment-readme, you can add a newsfragment file as per instructions here.

@acul71
Copy link
Contributor

acul71 commented Jul 8, 2025

@Jineshbansal

Review: Locking Mechanism in MplexStream and Trio Refactor

📄 Original Implementation

In the current implementation (mplex_stream.py), the MplexStream class uses threading.Lock to manage concurrent access to the internal _closed state and the send_message calls.

Lock Usage

import threading

self._lock = threading.Lock()

def write(self, data: bytes) -> None:
    with self._lock:
        if self._closed:
            raise StreamClosedError("Stream is closed")
        self._muxer.send_message(self._id, MessageType.DATA, data)

This works for synchronous, threaded programs, but py-libp2p uses Trio for its concurrency model, which is built on coroutines and structured concurrency, not threads.

⚠️ Problems with threading.Lock in Trio

  • Incompatible: Blocking locks like threading.Lock are unsafe in async Trio code. They can block the entire event loop.
  • Not async-aware: If used in async functions, threading.Lock can lead to deadlocks or reduced performance.
  • Non-cooperative: Prevents concurrency primitives like cancellation and timeouts from working properly.

✅ Suggested Fix: Use trio.Lock

Trio-Compatible Refactor

Replace threading.Lock with trio.Lock and update all methods that use the lock to be async def:

import trio

self._lock = trio.Lock()

async def write(self, data: bytes) -> None:
    async with self._lock:
        if self._closed:
            raise StreamClosedError("Stream is closed")
        await self._muxer.send_message(self._id, MessageType.DATA, data)

Context Management

Also update context management methods:

async def __aenter__(self) -> "MplexStream":
    return self

async def __aexit__(self, exc_type, exc, tb) -> None:
    await self.close()

🔁 Full Method Changes

  • write()async def write()
  • close()async def close()
  • reset()async def reset()
  • __enter__()__aenter__()
  • __exit__()__aexit__()

🧩 Muxer Assumption

This refactor assumes that self._muxer.send_message(...) is also an async def function. If it's still sync, wrap it with:

await trio.to_thread.run_sync(lambda: self._muxer.send_message(...))

✅ Benefits of Trio Refactor

  • Compatible with the rest of py-libp2p (Trio-based)
  • Prevents blocking the event loop
  • Supports structured concurrency and cancellation
  • Cleaner and more testable async code

🛠️ Next Steps

  • Update Mplex.send_message() to be async def if it isn't already.
  • Update all call sites to use await stream.write(...), await stream.close(), etc.
  • Ensure tests run in an async context using pytest-trio or similar.

⚠️ Race Condition Concerns During reset() and close()

Both reset() and close() mutate the _closed state and send control messages. Without proper locking, race conditions can occur:

  • Concurrent calls to reset() and close() might cause inconsistent stream states or duplicated messages.
  • Without the lock, two coroutines could both check _closed as False and proceed to send conflicting messages (CLOSE and RESET) simultaneously.
  • This may lead to protocol errors or unexpected behavior in peers.

How Trio Lock Helps

Using a trio.Lock and the pattern:

async with self._lock:
    if self._closed:
        return
    self._closed = True
    await self._muxer.send_message(...)

ensures:

  • Mutual exclusion: Only one coroutine can mutate _closed at a time.
  • Consistent state: Once _closed is set, subsequent calls no-op.
  • Safe message sending: Avoids sending contradictory or duplicated control frames.

Summary

Proper locking around reset() and close() is critical to:

  • Maintain stream state consistency.
  • Prevent subtle race conditions in async concurrent contexts.
  • Guarantee protocol compliance and stability.

@acul71
Copy link
Contributor

acul71 commented Jul 8, 2025

Example tests with trio

import pytest
import trio
from unittest.mock import AsyncMock, MagicMock

from libp2p.stream_muxer.exceptions import StreamClosedError
from libp2p.stream_muxer.mplex.constants import MessageType
from libp2p.stream_muxer.mplex.mplex_stream import MplexStream


@pytest.fixture
def mock_muxer():
    muxer = MagicMock()
    muxer.send_message = AsyncMock()
    return muxer


@pytest.mark.trio
async def test_write_calls_send_message(mock_muxer):
    stream = MplexStream(mock_muxer, 1)
    await stream.write(b"hello")
    mock_muxer.send_message.assert_awaited_once_with(1, MessageType.DATA, b"hello")


@pytest.mark.trio
async def test_write_raises_if_closed(mock_muxer):
    stream = MplexStream(mock_muxer, 1)
    await stream.close()
    with pytest.raises(StreamClosedError):
        await stream.write(b"data")


@pytest.mark.trio
async def test_close_sends_close_message_once(mock_muxer):
    stream = MplexStream(mock_muxer, 1)
    await stream.close()
    await stream.close()  # second call no-op
    mock_muxer.send_message.assert_awaited_once_with(1, MessageType.CLOSE)


@pytest.mark.trio
async def test_reset_sends_reset_message_and_closes(mock_muxer):
    stream = MplexStream(mock_muxer, 1)
    await stream.reset()
    assert stream.is_closed()
    mock_muxer.send_message.assert_awaited_once_with(1, MessageType.RESET)


@pytest.mark.trio
async def test_is_closed_reflects_state(mock_muxer):
    stream = MplexStream(mock_muxer, 1)
    assert not stream.is_closed()
    await stream.close()
    assert stream.is_closed()


@pytest.mark.trio
async def test_async_context_manager_calls_close(mock_muxer):
    async with MplexStream(mock_muxer, 1) as stream:
        await stream.write(b"test")
    mock_muxer.send_message.assert_any_await(1, MessageType.CLOSE)


# --- New tests for Trio locking behavior ---


@pytest.mark.trio
async def test_concurrent_write_lock(mock_muxer):
    stream = MplexStream(mock_muxer, 1)

    async def writer(data):
        await stream.write(data)

    async with trio.open_nursery() as nursery:
        nursery.start_soon(writer, b"hello")
        nursery.start_soon(writer, b"world")

    assert mock_muxer.send_message.await_count == 2


@pytest.mark.trio
async def test_concurrent_close_reset_lock(mock_muxer):
    stream = MplexStream(mock_muxer, 1)

    async def do_close():
        await stream.close()

    async def do_reset():
        await stream.reset()

    async with trio.open_nursery() as nursery:
        nursery.start_soon(do_close)
        nursery.start_soon(do_reset)

    # Only one control message should be sent (CLOSE or RESET)
    assert mock_muxer.send_message.await_count == 1


@pytest.mark.trio
async def test_write_after_close_raises(mock_muxer):
    stream = MplexStream(mock_muxer, 1)
    await stream.close()
    with pytest.raises(StreamClosedError):
        await stream.write(b"data")

@Jineshbansal Jineshbansal force-pushed the add-read-write-lock branch from 27c99d8 to e65e38a Compare July 8, 2025 13:42
@Jineshbansal Jineshbansal requested a review from lla-dane July 8, 2025 14:36
@Jineshbansal
Copy link
Author

Jineshbansal commented Jul 8, 2025

@lla-dane @acul71 Iks I had done all the recommended changes, please review it

@acul71
Copy link
Contributor

acul71 commented Jul 8, 2025

Review of MplexStream ReadWriteLock Implementation and Tests

Overview

This document contains a focused review of the ReadWriteLock integration within the MplexStream class of py-libp2p, including both the implementation and its unit tests.

The goal is to assess correctness, concurrency safety, and test robustness, and to offer actionable improvements with code suggestions.


✅ Strengths

Implementation

  • The ReadWriteLock is well-encapsulated and uses Trio's condition variables effectively.
  • Read/write access to the stream is protected with lock acquisition (acquire_read()/acquire_write()).
  • The MplexStream.read() and write() methods correctly guard access with rw_lock, enforcing mutual exclusion.

Tests

  • The test suite effectively verifies:
    • Mutual exclusion between readers and writers.
    • Concurrent access for multiple readers.
    • Only one writer can hold the lock at a time.
    • Correct interleaving behavior.

🔧 Implementation Suggestions

1. Use Async Context Manager for Lock Usage

Currently:

await self.rw_lock.acquire_read()
try:
    ...
finally:
    await self.rw_lock.release_read()

This is error-prone and verbose. Suggest adding context manager support:

🔄 Proposed change to ReadWriteLock:

class ReadWriteLock:
    ...

    @asynccontextmanager
    async def read_lock(self):
        await self.acquire_read()
        try:
            yield
        finally:
            await self.release_read()

    @asynccontextmanager
    async def write_lock(self):
        await self.acquire_write()
        try:
            yield
        finally:
            self.release_write()

✅ Then usage becomes:

async with self.rw_lock.read_lock():
    ...

This improves readability and ensures safer usage throughout MplexStream.


2. Handle Cancellation During Lock Acquisition

Currently, if a task is cancelled while acquiring the lock, the internal counters and waiters (_writer_waiting, _readers, _writer, etc.) may be left in an inconsistent state. This can lead to deadlocks or incorrect behavior (e.g., a lock that is never released).

🛡 Suggested Improvement

Wrap lock acquisition logic in a try/except trio.Cancelled block. Ensure the task cleans up any registration it did before cancellation (e.g., removing itself from waiters).

✍️ Example for acquire_read

Before (prone to inconsistent state on cancellation):

async def acquire_read(self) -> None:
    async with self._lock:
        while self._writer or self._writer_waiting:
            await self._no_writer.wait()
        self._readers += 1

After (safe from cancellation during wait):

async def acquire_read(self) -> None:
    try:
        async with self._lock:
            while self._writer or self._writer_waiting:
                await self._no_writer.wait()
            self._readers += 1
    except trio.Cancelled:
        # Optional: decrement wait count or log cancellation
        raise

✍️ Example for acquire_write with wait queue tracking

Before:

async def acquire_write(self) -> None:
    async with self._lock:
        self._writer_waiting = True
        while self._writer or self._readers > 0:
            await self._no_writer.wait()
        self._writer = True
        self._writer_waiting = False

After (safe from cancellation, preserves state):

async def acquire_write(self) -> None:
    self._writer_waiting = True
    try:
        async with self._lock:
            while self._writer or self._readers > 0:
                await self._no_writer.wait()
            self._writer = True
    except trio.Cancelled:
        # Cleanup the waiting flag if cancelled
        async with self._lock:
            self._writer_waiting = False
        raise
    else:
        self._writer_waiting = False

✅ Why This Matters

Trio’s cancellation is immediate and guaranteed, which means any side effects before an await must be carefully undone if the task is cancelled during that await. Otherwise, shared state (_writer_waiting, _readers) can remain in a corrupt state, leading to:

  • 💥 Deadlocks
  • 🐛 Incorrect concurrent behavior
  • 🧪 Flaky test failures

3. Naming Clarity (Optional)

  • _no_writer could be renamed to something like _writer_condition or _no_writer_or_pending_write to better reflect semantics.

4. Consider Adding Logging (Optional)

For debugging lock behavior in complex concurrency cases, optional debug logs using Python’s logging module can help trace:

  • When a read/write is requested, granted, or released
  • When tasks block and resume

🔬 Test Suggestions

1. Replace Time-Based Coordination with Events

Current test synchronization is sleep-based:

await trio.sleep(0.05)

This may cause flakiness. Replace with trio.Event:

start_event = trio.Event()

async def reader():
    await start_event.wait()
    ...

# in test
nursery.start_soon(reader)
start_event.set()

2. Add Tests for Stream-Level Lock Integration

Tests only verify lock behavior, not stream read/write correctness under concurrency.

➕ Suggested new test:

@pytest.mark.trio
async def test_stream_write_protected_by_rwlock(stream_with_lock):
    stream, _ = stream_with_lock
    stream.rw_lock.acquire_write = AsyncMock()
    stream.rw_lock.release_write = MagicMock()

    await stream.write(b"test")

    stream.rw_lock.acquire_write.assert_awaited_once()
    stream.rw_lock.release_write.assert_called_once()

Likewise, test read() with read-lock instrumentation.


3. Test Reset, EOF, and Close Interactions

Missing coverage:

  • Reading from a closed stream
  • Writing to a reset stream
  • Behavior when EOF is reached

➕ Suggested test:

@pytest.mark.trio
async def test_read_after_stream_closed(stream_with_lock):
    stream, send_chan = stream_with_lock
    stream.event_remote_closed.set()
    data = await stream.read(100)
    assert data == b"", "Expected EOF after remote closed"

✨ Summary of Key Suggestions

Area Suggestion
Lock API Add context manager support (read_lock, write_lock)
Cancellation Safety Handle Cancelled exceptions to maintain state
Tests Replace sleep() with Event for reliability
New Tests Cover actual read()/write() calls with lock instrumentation
Edge Cases Add tests for reset, close, EOF behavior

✅ Conclusion

The current implementation is structurally sound and aligns with Trio concurrency principles. A few refinements — especially regarding context managers, error/cancellation handling, and expanded test cases — will significantly improve robustness, maintainability, and test reliability.

@acul71
Copy link
Contributor

acul71 commented Jul 8, 2025

@Jineshbansal Well done, if you can refine: better.
tests should be placed here: tests/core/stream_muxer/test_mplex_stream.py

@Jineshbansal Jineshbansal force-pushed the add-read-write-lock branch from fa7601f to 9a950f6 Compare July 13, 2025 13:03
@Jineshbansal Jineshbansal force-pushed the add-read-write-lock branch from 9a950f6 to 9cd3805 Compare July 13, 2025 13:08
@Jineshbansal
Copy link
Author

Jineshbansal commented Jul 13, 2025

@acul71 @lla-dane @seetadev please review it, imo I had made all the recommended changes ie extend the testing also with handling of the cancellation of read or write task

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants