-
Notifications
You must be signed in to change notification settings - Fork 768
Description
Describe the bug
We have a generic event source deployed which streams off a gRPC port producing events.
generic:
our-event-source:
config: |
{}
insecure: true
url: our-event-producer:50051
On upgrading Argo Events > 1.9.3 we have realised that the event source fails to reconnect to the event source server in the event of the event source server restarting.
To Reproduce
Steps to reproduce the behaviour (argo events 1.9.4 or greater):
- Deploy a simple event source server exposing events on a gRPC port
- Deploy a generic event source as above
- Here everything should work.
- Restart the event source server
- The event source sees the failed connect and logs the error
- It retries 3 times and then stops doing anything, just in a hung state.
Expected behavior
The event source should retry forever (or be configurable) until it is able to connect to the source
Additional context
For Argo Events 1.9.4 or above
Logs showing the event source retrying 3 times after the initial failure then just hangs.
{"level":"info","ts":1747394260.585098,"logger":"argo-events.eventsource","caller":"generic/start.go:77","msg":"connected to eventsource server successfully, started event stream...","eventSourceName":"our-event-source","eventSourceType":"generic","eventName":"our-event-source","url":"our-event-producer:50051"}
{"level":"error","ts":1747394311.7187333,"logger":"argo-events.eventsource","caller":"generic/start.go:81","msg":"failed to receive events from the event stream, reconnecting in 5 seconds...","eventSourceName":"our-event-source","eventSourceType":"generic","eventName":"our-event-source","url":"our-event-producer:50051","error":"rpc error: code = Unavailable desc = error reading from server: EOF","stacktrace":"github.com/argoproj/argo-events/pkg/eventsources/sources/generic.(*EventListener).StartListening\n\t/home/runner/work/argo-events/argo-events/pkg/eventsources/sources/generic/start.go:81\ngithub.com/argoproj/argo-events/pkg/eventsources.(*EventSourceAdaptor).run.func3.1\n\t/home/runner/work/argo-events/argo-events/pkg/eventsources/eventing.go:534\ngithub.com/argoproj/argo-events/pkg/shared/util.DoWithRetry.func1\n\t/home/runner/work/argo-events/argo-events/pkg/shared/util/retry.go:90\nk8s.io/apimachinery/pkg/util/wait.runConditionWithCrashProtection\n\t/home/runner/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:145\nk8s.io/apimachinery/pkg/util/wait.ExponentialBackoff\n\t/home/runner/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/backoff.go:461\ngithub.com/argoproj/argo-events/pkg/shared/util.DoWithRetry\n\t/home/runner/work/argo-events/argo-events/pkg/shared/util/retry.go:89\ngithub.com/argoproj/argo-events/pkg/eventsources.(*EventSourceAdaptor).run.func3\n\t/home/runner/work/argo-events/argo-events/pkg/eventsources/eventing.go:533"}
{"level":"info","ts":1747394311.7223485,"logger":"argo-events.eventsource","caller":"generic/start.go:71","msg":"dialing eventsource server...","eventSourceName":"our-event-source","eventSourceType":"generic","eventName":"our-event-source","url":"our-event-producer:50051"}
{"level":"error","ts":1747394311.7637541,"logger":"argo-events.eventsource","caller":"generic/start.go:74","msg":"failed to connect eventsource server, reconnecting in 5 seconds...","eventSourceName":"our-event-source","eventSourceType":"generic","eventName":"our-event-source","url":"our-event-producer:50051","error":"rpc error: code = Unavailable desc = connection error: desc = \"transport: Error while dialing: dial tcp 34.118.224.26:50051: connect: connection refused\"","stacktrace":"github.com/argoproj/argo-events/pkg/eventsources/sources/generic.(*EventListener).StartListening\n\t/home/runner/work/argo-events/argo-events/pkg/eventsources/sources/generic/start.go:74\ngithub.com/argoproj/argo-events/pkg/eventsources.(*EventSourceAdaptor).run.func3.1\n\t/home/runner/work/argo-events/argo-events/pkg/eventsources/eventing.go:534\ngithub.com/argoproj/argo-events/pkg/shared/util.DoWithRetry.func1\n\t/home/runner/work/argo-events/argo-events/pkg/shared/util/retry.go:90\nk8s.io/apimachinery/pkg/util/wait.runConditionWithCrashProtection\n\t/home/runner/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:145\nk8s.io/apimachinery/pkg/util/wait.ExponentialBackoff\n\t/home/runner/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/backoff.go:461\ngithub.com/argoproj/argo-events/pkg/shared/util.DoWithRetry\n\t/home/runner/work/argo-events/argo-events/pkg/shared/util/retry.go:89\ngithub.com/argoproj/argo-events/pkg/eventsources.(*EventSourceAdaptor).run.func3\n\t/home/runner/work/argo-events/argo-events/pkg/eventsources/eventing.go:533"}
{"level":"info","ts":1747394315.5372474,"logger":"argo-events.eventsource","caller":"generic/start.go:71","msg":"dialing eventsource server...","eventSourceName":"our-event-source","eventSourceType":"generic","eventName":"our-event-source","url":"our-event-producer:50051"}
{"level":"error","ts":1747394315.5818846,"logger":"argo-events.eventsource","caller":"generic/start.go:74","msg":"failed to connect eventsource server, reconnecting in 5 seconds...","eventSourceName":"our-event-source","eventSourceType":"generic","eventName":"our-event-source","url":"our-event-producer:50051","error":"rpc error: code = Unavailable desc = connection error: desc = \"transport: Error while dialing: dial tcp 34.118.224.26:50051: connect: connection refused\"","stacktrace":"github.com/argoproj/argo-events/pkg/eventsources/sources/generic.(*EventListener).StartListening\n\t/home/runner/work/argo-events/argo-events/pkg/eventsources/sources/generic/start.go:74\ngithub.com/argoproj/argo-events/pkg/eventsources.(*EventSourceAdaptor).run.func3.1\n\t/home/runner/work/argo-events/argo-events/pkg/eventsources/eventing.go:534\ngithub.com/argoproj/argo-events/pkg/shared/util.DoWithRetry.func1\n\t/home/runner/work/argo-events/argo-events/pkg/shared/util/retry.go:90\nk8s.io/apimachinery/pkg/util/wait.runConditionWithCrashProtection\n\t/home/runner/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:145\nk8s.io/apimachinery/pkg/util/wait.ExponentialBackoff\n\t/home/runner/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/backoff.go:461\ngithub.com/argoproj/argo-events/pkg/shared/util.DoWithRetry\n\t/home/runner/work/argo-events/argo-events/pkg/shared/util/retry.go:89\ngithub.com/argoproj/argo-events/pkg/eventsources.(*EventSourceAdaptor).run.func3\n\t/home/runner/work/argo-events/argo-events/pkg/eventsources/eventing.go:533"}
{"level":"info","ts":1747394320.5373504,"logger":"argo-events.eventsource","caller":"generic/start.go:71","msg":"dialing eventsource server...","eventSourceName":"our-event-source","eventSourceType":"generic","eventName":"our-event-source","url":"our-event-producer:50051"}
{"level":"error","ts":1747394320.5788677,"logger":"argo-events.eventsource","caller":"generic/start.go:74","msg":"failed to connect eventsource server, reconnecting in 5 seconds...","eventSourceName":"our-event-source","eventSourceType":"generic","eventName":"our-event-source","url":"our-event-producer:50051","error":"rpc error: code = Unavailable desc = connection error: desc = \"transport: Error while dialing: dial tcp 34.118.224.26:50051: connect: connection refused\"","stacktrace":"github.com/argoproj/argo-events/pkg/eventsources/sources/generic.(*EventListener).StartListening\n\t/home/runner/work/argo-events/argo-events/pkg/eventsources/sources/generic/start.go:74\ngithub.com/argoproj/argo-events/pkg/eventsources.(*EventSourceAdaptor).run.func3.1\n\t/home/runner/work/argo-events/argo-events/pkg/eventsources/eventing.go:534\ngithub.com/argoproj/argo-events/pkg/shared/util.DoWithRetry.func1\n\t/home/runner/work/argo-events/argo-events/pkg/shared/util/retry.go:90\nk8s.io/apimachinery/pkg/util/wait.runConditionWithCrashProtection\n\t/home/runner/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:145\nk8s.io/apimachinery/pkg/util/wait.ExponentialBackoff\n\t/home/runner/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/backoff.go:461\ngithub.com/argoproj/argo-events/pkg/shared/util.DoWithRetry\n\t/home/runner/work/argo-events/argo-events/pkg/shared/util/retry.go:89\ngithub.com/argoproj/argo-events/pkg/eventsources.(*EventSourceAdaptor).run.func3\n\t/home/runner/work/argo-events/argo-events/pkg/eventsources/eventing.go:533"}
In Argo Events 1.9.3 we were seeing the following output in the event source on a restart of the event source server:
{"level":"error","ts":1747394025.5009372,"logger":"argo-events.eventsource","caller":"generic/start.go:81","msg":"failed to receive events from the event stream, reconnecting in 5 seconds...","eventSourceName":"our-event-source","eventSourceType":"generic","eventName":"our-event-source","url":"our-event-producer:50051","error":"rpc error: code = Unavailable desc = error reading from server: EOF","stacktrace":"github.com/argoproj/argo-events/pkg/eventsources/sources/generic.(*EventListener).StartListening\n\t/home/runner/work/argo-events/argo-events/pkg/eventsources/sources/generic/start.go:81\ngithub.com/argoproj/argo-events/pkg/eventsources.(*EventSourceAdaptor).run.func3.1\n\t/home/runner/work/argo-events/argo-events/pkg/eventsources/eventing.go:534\ngithub.com/argoproj/argo-events/pkg/shared/util.DoWithRetry.func1\n\t/home/runner/work/argo-events/argo-events/pkg/shared/util/retry.go:90\nk8s.io/apimachinery/pkg/util/wait.runConditionWithCrashProtection\n\t/home/runner/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:145\nk8s.io/apimachinery/pkg/util/wait.ExponentialBackoff\n\t/home/runner/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/backoff.go:461\ngithub.com/argoproj/argo-events/pkg/shared/util.DoWithRetry\n\t/home/runner/work/argo-events/argo-events/pkg/shared/util/retry.go:89\ngithub.com/argoproj/argo-events/pkg/eventsources.(*EventSourceAdaptor).run.func3\n\t/home/runner/work/argo-events/argo-events/pkg/eventsources/eventing.go:533"}
{"level":"info","ts":1747394025.5013978,"logger":"argo-events.eventsource","caller":"generic/start.go:71","msg":"dialing eventsource server...","eventSourceName":"our-event-source","eventSourceType":"generic","eventName":"our-event-source","url":"our-event-producer:50051"}
{"level":"info","ts":1747394041.4183476,"logger":"argo-events.eventsource","caller":"generic/start.go:77","msg":"connected to eventsource server successfully, started event stream...","eventSourceName":"our-event-source","eventSourceType":"generic","eventName":"our-event-source","url":"our-event-producer:50051"}
it dialled and waited till valid and then connected and all worked as expected.
Looking at the changes in the code between the two versions the grpc library was upgraded and the call made to create the client went from:
opt = append(opt, grpc.WithBlock())
if el.GenericEventSource.Insecure {
opt = append(opt, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
conn, err := grpc.DialContext(context.Background(), el.GenericEventSource.URL, opt...)
to:
if el.GenericEventSource.Insecure {
opt = append(opt, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
conn, err := grpc.NewClient(el.GenericEventSource.URL, opt...)
Can with WithBlock is deprecated in the grpc library but then the calling code in here should handle the trying better. I am not sure at this point why it currently only does 3 retries, best guess is connection getState changes to some other value not in this if statement and then would just tick infinitely not doing anything.
Message from the maintainers:
If you wish to see this enhancement implemented please add a 👍 reaction to this issue! We often sort issues this way to know what to prioritize.