Skip to content

Cannot applied correctly TcpConnectorInterceptor on sending with server connection #8609

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
kazuki43zoo opened this issue May 3, 2023 · 16 comments · Fixed by #8610
Closed

Comments

@kazuki43zoo
Copy link
Contributor

kazuki43zoo commented May 3, 2023

In what version(s) of Spring Integration are you seeing this issue?

5.5.17 (Probably 6.0.x line too)

Describe the bug

If define multiple TcpConnectionInterceptor, cannot applied correctly TcpConnectorInterceptor on sending a message with server connection. As actual result, it has been applied only the first interceptor(interceptor that wrapped the real TcpConnection). Probably c1aa9b7 's changed are affect this behavior.

To Reproduce

For example, When following beans are defined, characterAppendingInterceptor1 has been applied but characterAppendingInterceptor2 has not been applied on sending a message. As a side note, When receive a message, all interceptors are applied.

  @Bean
  @Order(1)
  public TcpConnectionInterceptorFactory characterAppendingInterceptor1() {
    return new CharacterAppendingInterceptor.Factory("1");
  }

  @Bean
  @Order(2) // This interceptor cannot applied on sending
  public TcpConnectionInterceptorFactory characterAppendingInterceptor2() {
    return new CharacterAppendingInterceptor.Factory("2");
  }

  @Bean
  public TcpConnectionInterceptorFactoryChain tcpConnectionInterceptorFactoryChain(
      List<TcpConnectionInterceptorFactory> tcpConnectionInterceptorFactories) {
    TcpConnectionInterceptorFactoryChain chain = new TcpConnectionInterceptorFactoryChain();
    chain.setInterceptor(tcpConnectionInterceptorFactories.toArray(new TcpConnectionInterceptorFactory[0]));
    return chain;
  }

  @Bean
  public AbstractServerConnectionFactory serverConnectionFactory(
      TcpConnectionInterceptorFactoryChain chain) {
    return Tcp.netServer(1234)
        .deserializer(TcpCodecs.crlf())
        .interceptorFactoryChain(chain)
        .get();
  }

Expected behavior

We expected all interceptors are applied on sending a message. As workaround, I add the following TcpConnectorInterceptor, it work fine as expected behavior.

// The patch interceptor for binding a connection that wrapped all interceptor to TcpSender(in this case TcpSendingMessageHandler)
public class PatchInterceptor extends TcpConnectionInterceptorSupport {

  private PatchInterceptor(ApplicationEventPublisher publisher) {
    super(publisher);
  }

  @Override public void addNewConnection(TcpConnection connection) {
    super.addNewConnection(connection);
    // Call addNewConnection of TcpSender with wrapped connection(interceptor)
    Optional.ofNullable(getSender()).ifPresent(x -> x.addNewConnection(getTheConnection()));
  }

  static class Factory extends ApplicationObjectSupport implements TcpConnectionInterceptorFactory {

    @Override public TcpConnectionInterceptorSupport getInterceptor() {
      return new PatchInterceptor(obtainApplicationContext());
    }

  }

}

Sample

Just wait a few time!! I will add the repro project in My GitHub repository at after.

How to reproduce:
Please run the SpiGh8609ApplicationTests#connectAndReceive.

FYI:

  • SpiGh8609ApplicationWithPatchTests : Tests for applying workaround patch interceptor
  • SpiGh8609ApplicationWithoutInterceptorTests : Tests for no interceptors
@kazuki43zoo kazuki43zoo added status: waiting-for-triage The issue need to be evaluated and its future decided type: bug labels May 3, 2023
@kazuki43zoo
Copy link
Contributor Author

I've added the repro project.

@artembilan
Copy link
Member

I'm not sure what does difference your patch do?
That original method is like this in the TcpConnectionInterceptorSupport:

public void addNewConnection(TcpConnection connection) {
		if (this.interceptedSenders != null) {
			this.interceptedSenders.forEach(sender -> sender.addNewConnection(connection));
		}
	}

Where the mentioned getSender() i this:

	public TcpSender getSender() {
		return this.interceptedSenders != null && this.interceptedSenders.size() > 0
				? this.interceptedSenders.get(0)
				: null;
	}

So, we still deal with those this.interceptedSenders.
What do I miss, please?

@artembilan artembilan added status: waiting-for-reporter Needs a feedback from the reporter in: TCP/UDP and removed status: waiting-for-triage The issue need to be evaluated and its future decided labels May 3, 2023
@kazuki43zoo
Copy link
Contributor Author

kazuki43zoo commented May 3, 2023

Processing flow without patch (Spring Integration default behavior)

AbstractServerConnectionFactory#initializeConnection
  →  CharacterAppendingInterceptor2#registerSenders(TcpSendingMessageHandler)
        →  CharacterAppendingInterceptor1#registerSenders(CharacterAppendingInterceptor1)
             →  CharacterAppendingInterceptor1#registerSender(CharacterAppendingInterceptor1)
                  →  CharacterAppendingInterceptor1#addNewConnection(CharacterAppendingInterceptor1)
                       →  CharacterAppendingInterceptor2#addNewConnection(CharacterAppendingInterceptor1) 
                            →  TcpSendingMessageHandler#addNewConnection(CharacterAppendingInterceptor1) ※ Passed the connection that not applied some interceptors

Processing flow with applying patch interceptor

AbstractServerConnectionFactory#initializeConnection
  →  PatchInterceptor#registerSenders(TcpSendingMessageHandler)
       →  CharacterAppendingInterceptor2#registerSenders(PatchInterceptor)
            →  CharacterAppendingInterceptor1#registerSenders(CharacterAppendingInterceptor2)
                 →  CharacterAppendingInterceptor1#registerSender(CharacterAppendingInterceptor1)
                      →  CharacterAppendingInterceptor1#addNewConnection(CharacterAppendingInterceptor1)
                           →  CharacterAppendingInterceptor2#addNewConnection(CharacterAppendingInterceptor1)
                                →  PatchInterceptor#addNewConnection(CharacterAppendingInterceptor1)
                                     →  TcpSendingMessageHandler#addNewConnection(CharacterAppendingInterceptor1)
                                     →  (★★★Patch★★) TcpSendingMessageHandler#addNewConnection(CharacterAppendingInterceptor2) ※getTheConnection() return CharacterAppendingInterceptor2 instance and getSender() return TcpSendingMessageHandler instance

@kazuki43zoo
Copy link
Contributor Author

kazuki43zoo commented May 3, 2023

At lease, I think should be passed a last interceptor instance(in sample: CharacterAppendingInterceptor2) to the original TcpSender(in sample: TcpSendingMessageHandler).

@kazuki43zoo
Copy link
Contributor Author

kazuki43zoo commented May 3, 2023

Probably patch code of PatchInterceptor can rewrite as follows:

  @Override public void addNewConnection(TcpConnection connection) {
    super.addNewConnection(connection);
    // Call addNewConnection of TcpSender with wrapped connection(interceptor)
    Optional.ofNullable(getSender()).ifPresent(x -> x.addNewConnection(getTheConnection()));
  }

  @Override public void addNewConnection(TcpConnection connection) {
    // Call addNewConnection of TcpSender with wrapped connection(interceptor)
    getSenders().forEach(x -> x.addNewConnection(getTheConnection()));
  }

Processing flow with applying new patch interceptor

AbstractServerConnectionFactory#initializeConnection
  →  PatchInterceptor#registerSenders(TcpSendingMessageHandler)
       →  CharacterAppendingInterceptor2#registerSenders(PatchInterceptor)
            →  CharacterAppendingInterceptor1#registerSenders(CharacterAppendingInterceptor2)
                 →  CharacterAppendingInterceptor1#registerSender(CharacterAppendingInterceptor1)
                      →  CharacterAppendingInterceptor1#addNewConnection(CharacterAppendingInterceptor1)
                           →  CharacterAppendingInterceptor2#addNewConnection(CharacterAppendingInterceptor1)
                                →  PatchInterceptor#addNewConnection(CharacterAppendingInterceptor1)
                                     →  (★★★Patch★★) TcpSendingMessageHandler#addNewConnection(CharacterAppendingInterceptor2) ※getTheConnection() return CharacterAppendingInterceptor2 instance and getSender() return TcpSendingMessageHandler instance

@kazuki43zoo
Copy link
Contributor Author

kazuki43zoo commented May 3, 2023

@artembilan

I'm not sure what does difference your patch do?

Call the addNewConnection of TcpSender with wrapped connection(interceptor) using getTheConnection() instead of the passed connection via method argument.

@artembilan
Copy link
Member

Right. I see now. Thanks.

@garyrussell , any thoughts?

@garyrussell
Copy link
Contributor

Clearly looks like a bug.

@kazuki43zoo are you planning to submit a PR? If so, thanks.

@artembilan
Copy link
Member

OK. Just debugged TcpSenderTests.senderCalledForDeadConnectionClientNet().
With the current code the target TcpSender got this:

connection = {TcpSenderTests$1InterceptorFactory$1@3491} ":null"
 instance = 1
 this$1 = {TcpSenderTests$1InterceptorFactory@4219} 
 theConnection = {TcpNetConnection@4220} "TcpNetConnection:127.0.0.1:62884:62885:22ab6ceb-354c-4235-a155-2c127916f96c"

Which points only to one interceptor.

With the fix for that getTheConnection() call it is like this:

connection = {TcpSenderTests$1InterceptorFactory$1@3491} ":null"
 instance = 2
 this$1 = {TcpSenderTests$1InterceptorFactory@4218} 
 theConnection = {TcpSenderTests$1InterceptorFactory$1@3494} ":null"
  instance = 1
  this$1 = {TcpSenderTests$1InterceptorFactory@4228} 
  theConnection = {TcpNetConnection@4229} "TcpNetConnection:127.0.0.1:62910:62911:c16f4521-e69e-4d88-9dc7-8cb024791b7e"

Confirming that connection is wrapped twice. Just because we have 3 interceptors in our test.

With the fix we have a failing other test though:

TcpSendingMessageHandlerTests > testInterceptedConnection FAILED
    java.lang.AssertionError at TcpSendingMessageHandlerTests.java:1230

Perhaps the one just reflecting the current wrong behavior.

@garyrussell
Copy link
Contributor

I wonder why InterceptedSharedConnectionTests doesn't have this problem (it has 2 interceptors on both the client and server sides).

@artembilan
Copy link
Member

The result of failing test is this:

Expecting actual:
  TcpNetConnection:127.0.0.1:49271:49270:0c4dda4f-5a7e-4729-b04e-f390b18641c5
to be an instance of:
  org.springframework.integration.ip.tcp.connection.HelloWorldInterceptor
but was instance of:
  org.springframework.integration.ip.tcp.connection.TcpNetConnection

Which, I think, indicates that target sender must get the fully wrapped connection for all the interceptors.
So, if we have 3 of them, like in the previous test, then it has to start from a stack pointing to the third interceptor.

@artembilan
Copy link
Member

Doesn't look like that InterceptedSharedConnectionTests checks for its interceptors interaction.
I've just commented out the second interceptor and tests still pass 🤷

@garyrussell
Copy link
Contributor

I see - if the send is not called (because it's skipped), the negotiation error won't happen.

@artembilan
Copy link
Member

When I fix it like this:

	public void addNewConnection(TcpConnection connection) {
		if (this.interceptedSenders != null) {
			this.interceptedSenders.forEach(sender -> sender.addNewConnection(this));
		}
	}

All good:

connection = {TcpSenderTests$1InterceptorFactory$1@3487} ":null"
 instance = 3
 this$1 = {TcpSenderTests$1InterceptorFactory@4219} 
 theConnection = {TcpSenderTests$1InterceptorFactory$1@3492} ":null"
  instance = 2
  this$1 = {TcpSenderTests$1InterceptorFactory@4229} 
  theConnection = {TcpSenderTests$1InterceptorFactory$1@3495} ":null"
   instance = 1
   this$1 = {TcpSenderTests$1InterceptorFactory@4238} 
   theConnection = {TcpNetConnection@4239} "TcpNetConnection:127.0.0.1:53346:53349:342527f7-93b6-4662-a913-68b576fc3dbd"

@kazuki43zoo
Copy link
Contributor Author

@artembilan @garyrussell
I tried to fix this issue by other approche(passed the last wrapper to addNewConnection) as follow:
kazuki43zoo/spi-gh-8609@e52ce27
WDYT?

kazuki43zoo added a commit to kazuki43zoo/spring-integration that referenced this issue May 4, 2023
…ectorInterceptor

Fixes spring-projects#8609

* add getLastWrapper() in TcpConnectionSupport
* change to passed the instance that geted via getLastWrapper() instead of self instance to TcpSender#addNewConnection
@kazuki43zoo
Copy link
Contributor Author

@artembilan @garyrussell
I've submitted the PR.

@artembilan artembilan added this to the 6.1.0 milestone May 4, 2023
@artembilan artembilan added backport 5.5.x and removed status: waiting-for-reporter Needs a feedback from the reporter labels May 4, 2023
kazuki43zoo added a commit to kazuki43zoo/spring-integration that referenced this issue May 4, 2023
…ectorInterceptor

Fixes spring-projects#8609

* Changed to passed the self instance to addNewConnection instead of argument's connection
artembilan pushed a commit that referenced this issue May 5, 2023
Fixes #8609

* Changed to passed the self instance to `addNewConnection()` instead of argument's connection
in a `TcpConnectionInterceptorSupport` to compose a chain of interceptors from top to down.
This way the target `Sender` get the last interceptor in a chain as its connection.

**Cherry-pick to `6.0.x` & `5.5.x`**
artembilan pushed a commit that referenced this issue May 5, 2023
Fixes #8609

* Changed to passed the self instance to `addNewConnection()` instead of argument's connection
in a `TcpConnectionInterceptorSupport` to compose a chain of interceptors from top to down.
This way the target `Sender` get the last interceptor in a chain as its connection.

**Cherry-pick to `6.0.x` & `5.5.x`**
artembilan pushed a commit that referenced this issue May 5, 2023
Fixes #8609

* Changed to passed the self instance to `addNewConnection()` instead of argument's connection
in a `TcpConnectionInterceptorSupport` to compose a chain of interceptors from top to down.
This way the target `Sender` get the last interceptor in a chain as its connection.

**Cherry-pick to `6.0.x` & `5.5.x`**
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants