-
Notifications
You must be signed in to change notification settings - Fork 156
TODO: add read/write lock #748
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Please take a look at this @seetadev, If you tell me some pointers on how to setup test files, that would be really helpful |
@Jineshbansal : Welcome to py-libp2p :) Thank you for submitting this PR. Appreciate your efforts and initiative. CCing @lla-dane, @acul71, @kaneki003 for their feedback and review on your PR. They will also help you in setting up the test suite, add key scenarios, newsfragment and arrive at a good conclusion on the issue addressed via the PR. Also, CCing @guha-rahul and @sumanjeet0012. Would like them to review your PR with the team and share feedback too. |
Also @Jineshbansal: Take a look at this file: newsfragment-readme, you can add a newsfragment file as per instructions here. |
Review: Locking Mechanism in
|
Example tests with trio import pytest
import trio
from unittest.mock import AsyncMock, MagicMock
from libp2p.stream_muxer.exceptions import StreamClosedError
from libp2p.stream_muxer.mplex.constants import MessageType
from libp2p.stream_muxer.mplex.mplex_stream import MplexStream
@pytest.fixture
def mock_muxer():
muxer = MagicMock()
muxer.send_message = AsyncMock()
return muxer
@pytest.mark.trio
async def test_write_calls_send_message(mock_muxer):
stream = MplexStream(mock_muxer, 1)
await stream.write(b"hello")
mock_muxer.send_message.assert_awaited_once_with(1, MessageType.DATA, b"hello")
@pytest.mark.trio
async def test_write_raises_if_closed(mock_muxer):
stream = MplexStream(mock_muxer, 1)
await stream.close()
with pytest.raises(StreamClosedError):
await stream.write(b"data")
@pytest.mark.trio
async def test_close_sends_close_message_once(mock_muxer):
stream = MplexStream(mock_muxer, 1)
await stream.close()
await stream.close() # second call no-op
mock_muxer.send_message.assert_awaited_once_with(1, MessageType.CLOSE)
@pytest.mark.trio
async def test_reset_sends_reset_message_and_closes(mock_muxer):
stream = MplexStream(mock_muxer, 1)
await stream.reset()
assert stream.is_closed()
mock_muxer.send_message.assert_awaited_once_with(1, MessageType.RESET)
@pytest.mark.trio
async def test_is_closed_reflects_state(mock_muxer):
stream = MplexStream(mock_muxer, 1)
assert not stream.is_closed()
await stream.close()
assert stream.is_closed()
@pytest.mark.trio
async def test_async_context_manager_calls_close(mock_muxer):
async with MplexStream(mock_muxer, 1) as stream:
await stream.write(b"test")
mock_muxer.send_message.assert_any_await(1, MessageType.CLOSE)
# --- New tests for Trio locking behavior ---
@pytest.mark.trio
async def test_concurrent_write_lock(mock_muxer):
stream = MplexStream(mock_muxer, 1)
async def writer(data):
await stream.write(data)
async with trio.open_nursery() as nursery:
nursery.start_soon(writer, b"hello")
nursery.start_soon(writer, b"world")
assert mock_muxer.send_message.await_count == 2
@pytest.mark.trio
async def test_concurrent_close_reset_lock(mock_muxer):
stream = MplexStream(mock_muxer, 1)
async def do_close():
await stream.close()
async def do_reset():
await stream.reset()
async with trio.open_nursery() as nursery:
nursery.start_soon(do_close)
nursery.start_soon(do_reset)
# Only one control message should be sent (CLOSE or RESET)
assert mock_muxer.send_message.await_count == 1
@pytest.mark.trio
async def test_write_after_close_raises(mock_muxer):
stream = MplexStream(mock_muxer, 1)
await stream.close()
with pytest.raises(StreamClosedError):
await stream.write(b"data") |
27c99d8
to
e65e38a
Compare
Review of
|
Area | Suggestion |
---|---|
Lock API | Add context manager support (read_lock , write_lock ) |
Cancellation Safety | Handle Cancelled exceptions to maintain state |
Tests | Replace sleep() with Event for reliability |
New Tests | Cover actual read() /write() calls with lock instrumentation |
Edge Cases | Add tests for reset, close, EOF behavior |
✅ Conclusion
The current implementation is structurally sound and aligns with Trio concurrency principles. A few refinements — especially regarding context managers, error/cancellation handling, and expanded test cases — will significantly improve robustness, maintainability, and test reliability.
@Jineshbansal Well done, if you can refine: better. |
fa7601f
to
9a950f6
Compare
9a950f6
to
9cd3805
Compare
Purpose
Implements the pending TODO for read/write synchronization in MplexStream.
Logic Behind the Lock
Steps Taken
Tests
To validate the above logic, a unit test has been added in libp2p/stream_muxer/mplex/test_read_write_lock.py.