Skip to content

Switch websocket client to reactive Flux/Mono #248

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from

Conversation

tcheeric
Copy link
Owner

Summary

  • switch StandardWebSocketClient to return Flux instead of blocking
  • update WebSocketClientHandler and NostrSpringWebSocketClient for reactive API
  • adjust NostrIF and examples for new async behaviour
  • add unit test for non-blocking StandardWebSocketClient
  • update docs for reactive usage

Testing

  • mvn -q verify (fails: Could not find a valid Docker environment for integration tests)

https://chatgpt.com/codex/tasks/task_b_688be0122f6483318c314e7eeb8bf301

@tcheeric tcheeric requested a review from Copilot July 31, 2025 22:45
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR modernizes the WebSocket client implementation by switching from blocking operations to reactive programming using Project Reactor's Flux/Mono. The change transforms synchronous message handling into asynchronous streams.

  • Replaces blocking WebSocket client behavior with reactive Flux streams
  • Updates all APIs to return Flux or Mono instead of List
  • Adds block() calls throughout examples and tests to maintain compatibility

Reviewed Changes

Copilot reviewed 18 out of 18 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
StandardWebSocketClient.java Core transformation from blocking to reactive using Sinks.Many for message emission
WebSocketClientIF.java Interface updated to return Flux instead of List
NostrIF.java Main API interface updated for reactive return types
EventNostr.java Event handling converted to Mono with proper error handling
NostrApiExamples.java All examples updated with block() calls to maintain synchronous behavior
Test files Integration tests updated to use blockFirst() or collectList().block()
README.md Documentation updated to reflect reactive API usage
Comments suppressed due to low confidence (1)

nostr-java-client/src/test/java/nostr/client/springwebsocket/StandardWebSocketClientTest.java:15

  • The test uses reflection to bypass normal object construction, which doesn't test the actual initialization behavior. Consider adding a constructor or factory method for testing, or test the real initialization path.
    private static StandardWebSocketClient createClient() throws Exception {

Comment on lines +11 to +19
import sun.misc.Unsafe;

class StandardWebSocketClientTest {

private static StandardWebSocketClient createClient() throws Exception {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
Unsafe unsafe = (Unsafe) theUnsafe.get(null);
StandardWebSocketClient client = (StandardWebSocketClient) unsafe.allocateInstance(StandardWebSocketClient.class);
Copy link
Preview

Copilot AI Jul 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using sun.misc.Unsafe is strongly discouraged as it's an internal API that may be removed in future Java versions. Consider using reflection with constructor.setAccessible(true) or dependency injection for testing instead.

Suggested change
import sun.misc.Unsafe;
class StandardWebSocketClientTest {
private static StandardWebSocketClient createClient() throws Exception {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
Unsafe unsafe = (Unsafe) theUnsafe.get(null);
StandardWebSocketClient client = (StandardWebSocketClient) unsafe.allocateInstance(StandardWebSocketClient.class);
class StandardWebSocketClientTest {
private static StandardWebSocketClient createClient() throws Exception {
var constructor = StandardWebSocketClient.class.getDeclaredConstructor();
constructor.setAccessible(true);
StandardWebSocketClient client = constructor.newInstance();

Copilot uses AI. Check for mistakes.

public <U extends BaseMessage> U signAndSend(Map<String, String> relays) {
return (U) sign().send(relays);
public <U extends BaseMessage> Mono<U> signAndSend(Map<String, String> relays) {
return (Mono<U>) sign().send(relays);
Copy link
Preview

Copilot AI Jul 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsafe cast without type checking. The sign().send(relays) returns Mono but casting it as (Mono) could cause ClassCastException. Remove the cast since the return type is already correct.

Suggested change
return (Mono<U>) sign().send(relays);
return sign().send(relays);

Copilot uses AI. Check for mistakes.

events = new ArrayList<>();
completed.setRelease(false);
return eventList;
return sink.asFlux();
Copy link
Preview

Copilot AI Jul 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method returns the same Flux instance for every call to send(), which means all callers will receive all messages from this sink, not just responses to their specific message. Consider creating a new Flux that only contains responses to this specific message.

Copilot uses AI. Check for mistakes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant