-
Notifications
You must be signed in to change notification settings - Fork 25
Switch websocket client to reactive Flux/Mono #248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Switch websocket client to reactive Flux/Mono #248
Conversation
…ient and improve code formatting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR modernizes the WebSocket client implementation by switching from blocking operations to reactive programming using Project Reactor's Flux/Mono. The change transforms synchronous message handling into asynchronous streams.
- Replaces blocking WebSocket client behavior with reactive Flux streams
- Updates all APIs to return Flux or Mono instead of List
- Adds block() calls throughout examples and tests to maintain compatibility
Reviewed Changes
Copilot reviewed 18 out of 18 changed files in this pull request and generated 3 comments.
Show a summary per file
File | Description |
---|---|
StandardWebSocketClient.java |
Core transformation from blocking to reactive using Sinks.Many for message emission |
WebSocketClientIF.java |
Interface updated to return Flux instead of List |
NostrIF.java |
Main API interface updated for reactive return types |
EventNostr.java |
Event handling converted to Mono with proper error handling |
NostrApiExamples.java |
All examples updated with block() calls to maintain synchronous behavior |
Test files | Integration tests updated to use blockFirst() or collectList().block() |
README.md |
Documentation updated to reflect reactive API usage |
Comments suppressed due to low confidence (1)
nostr-java-client/src/test/java/nostr/client/springwebsocket/StandardWebSocketClientTest.java:15
- The test uses reflection to bypass normal object construction, which doesn't test the actual initialization behavior. Consider adding a constructor or factory method for testing, or test the real initialization path.
private static StandardWebSocketClient createClient() throws Exception {
import sun.misc.Unsafe; | ||
|
||
class StandardWebSocketClientTest { | ||
|
||
private static StandardWebSocketClient createClient() throws Exception { | ||
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); | ||
theUnsafe.setAccessible(true); | ||
Unsafe unsafe = (Unsafe) theUnsafe.get(null); | ||
StandardWebSocketClient client = (StandardWebSocketClient) unsafe.allocateInstance(StandardWebSocketClient.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using sun.misc.Unsafe is strongly discouraged as it's an internal API that may be removed in future Java versions. Consider using reflection with constructor.setAccessible(true) or dependency injection for testing instead.
import sun.misc.Unsafe; | |
class StandardWebSocketClientTest { | |
private static StandardWebSocketClient createClient() throws Exception { | |
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); | |
theUnsafe.setAccessible(true); | |
Unsafe unsafe = (Unsafe) theUnsafe.get(null); | |
StandardWebSocketClient client = (StandardWebSocketClient) unsafe.allocateInstance(StandardWebSocketClient.class); | |
class StandardWebSocketClientTest { | |
private static StandardWebSocketClient createClient() throws Exception { | |
var constructor = StandardWebSocketClient.class.getDeclaredConstructor(); | |
constructor.setAccessible(true); | |
StandardWebSocketClient client = constructor.newInstance(); |
Copilot uses AI. Check for mistakes.
public <U extends BaseMessage> U signAndSend(Map<String, String> relays) { | ||
return (U) sign().send(relays); | ||
public <U extends BaseMessage> Mono<U> signAndSend(Map<String, String> relays) { | ||
return (Mono<U>) sign().send(relays); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unsafe cast without type checking. The sign().send(relays) returns Mono but casting it as (Mono) could cause ClassCastException. Remove the cast since the return type is already correct.
return (Mono<U>) sign().send(relays); | |
return sign().send(relays); |
Copilot uses AI. Check for mistakes.
events = new ArrayList<>(); | ||
completed.setRelease(false); | ||
return eventList; | ||
return sink.asFlux(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method returns the same Flux instance for every call to send(), which means all callers will receive all messages from this sink, not just responses to their specific message. Consider creating a new Flux that only contains responses to this specific message.
Copilot uses AI. Check for mistakes.
Summary
Testing
mvn -q verify
(fails: Could not find a valid Docker environment for integration tests)https://chatgpt.com/codex/tasks/task_b_688be0122f6483318c314e7eeb8bf301