Skip to content

Commit 3461010

Browse files
authored
feat: Detect broken observe-events call. (#107)
1 parent 6c6e976 commit 3461010

File tree

4 files changed

+61
-47
lines changed

4 files changed

+61
-47
lines changed

eventsourcingdb/observe_events.go

Lines changed: 50 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"encoding/json"
77
"fmt"
88
"net/http"
9+
"time"
910

1011
"github.com/thenativeweb/goutils/v2/coreutils/contextutils"
1112

@@ -40,6 +41,9 @@ func newObserveEventsValue(item StoreItem) ObserveEventsResult {
4041
func (client *Client) ObserveEvents(ctx context.Context, subject string, recursive ObserveRecursivelyOption, options ...ObserveEventsOption) <-chan ObserveEventsResult {
4142
results := make(chan ObserveEventsResult, 1)
4243

44+
heartbeatInterval := 1 * time.Second
45+
heartbeatTimeout := heartbeatInterval * 3
46+
4347
go func() {
4448
defer close(results)
4549

@@ -94,52 +98,70 @@ func (client *Client) ObserveEvents(ctx context.Context, subject string, recursi
9498
}
9599
defer response.Body.Close()
96100

101+
heartbeatTimer := time.NewTimer(heartbeatTimeout)
102+
defer heartbeatTimer.Stop()
103+
97104
unmarshalContext, cancelUnmarshalling := context.WithCancel(ctx)
98105
defer cancelUnmarshalling()
99106

100107
unmarshalResults := ndjson.UnmarshalStream[ndjson.StreamItem](unmarshalContext, response.Body)
101108
for unmarshalResult := range unmarshalResults {
102-
data, err := unmarshalResult.GetData()
103-
if err != nil {
104-
if contextutils.IsContextTerminationError(err) {
105-
results <- newObserveEventsError(err)
106-
return
107-
}
108-
109+
select {
110+
case <-heartbeatTimer.C:
109111
results <- newObserveEventsError(
110-
customErrors.NewServerError(fmt.Sprintf("unsupported stream item encountered: %s", err.Error())),
112+
customErrors.NewServerError("heartbeat timeout"),
111113
)
112114
return
113-
}
114115

115-
switch data.Type {
116-
case "heartbeat":
117-
continue
118-
case "error":
119-
var serverError streamError
120-
if err := json.Unmarshal(data.Payload, &serverError); err != nil {
116+
default:
117+
data, err := unmarshalResult.GetData()
118+
if err != nil {
119+
if contextutils.IsContextTerminationError(err) {
120+
results <- newObserveEventsError(err)
121+
return
122+
}
123+
121124
results <- newObserveEventsError(
122-
customErrors.NewServerError(fmt.Sprintf("unsupported stream error encountered: %s", err.Error())),
125+
customErrors.NewServerError(fmt.Sprintf("unsupported stream item encountered: %s", err.Error())),
123126
)
124127
return
125128
}
126129

127-
results <- newObserveEventsError(customErrors.NewServerError(serverError.Error))
128-
case "item":
129-
var storeItem StoreItem
130-
if err := json.Unmarshal(data.Payload, &storeItem); err != nil {
130+
switch data.Type {
131+
case "heartbeat":
132+
if !heartbeatTimer.Stop() {
133+
<-heartbeatTimer.C
134+
}
135+
heartbeatTimer.Reset(heartbeatTimeout)
136+
137+
case "error":
138+
var serverError streamError
139+
if err := json.Unmarshal(data.Payload, &serverError); err != nil {
140+
results <- newObserveEventsError(
141+
customErrors.NewServerError(fmt.Sprintf("unsupported stream error encountered: %s", err.Error())),
142+
)
143+
return
144+
}
145+
146+
results <- newObserveEventsError(customErrors.NewServerError(serverError.Error))
147+
148+
case "item":
149+
var storeItem StoreItem
150+
if err := json.Unmarshal(data.Payload, &storeItem); err != nil {
151+
results <- newObserveEventsError(
152+
customErrors.NewServerError(fmt.Sprintf("unsupported stream item encountered: '%s' (trying to unmarshal '%+v')", err.Error(), data)),
153+
)
154+
return
155+
}
156+
157+
results <- newObserveEventsValue(storeItem)
158+
159+
default:
131160
results <- newObserveEventsError(
132-
customErrors.NewServerError(fmt.Sprintf("unsupported stream item encountered: '%s' (trying to unmarshal '%+v')", err.Error(), data)),
161+
customErrors.NewServerError(fmt.Sprintf("unsupported stream item encountered: '%+v' does not have a recognized type", data)),
133162
)
134163
return
135164
}
136-
137-
results <- newObserveEventsValue(storeItem)
138-
default:
139-
results <- newObserveEventsError(
140-
customErrors.NewServerError(fmt.Sprintf("unsupported stream item encountered: '%+v' does not have a recognized type", data)),
141-
)
142-
return
143165
}
144166
}
145167
}()

go.mod

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
module github.com/thenativeweb/eventsourcingdb-client-golang
22

3-
go 1.21
4-
5-
toolchain go1.21.0
3+
go 1.23.1
64

75
require (
86
github.com/Masterminds/semver v1.5.0
9-
github.com/google/uuid v1.3.0
10-
github.com/stretchr/testify v1.8.4
11-
github.com/thenativeweb/goutils/v2 v2.6.0
7+
github.com/google/uuid v1.6.0
8+
github.com/stretchr/testify v1.9.0
9+
github.com/thenativeweb/goutils/v2 v2.7.2
1210
)
1311

1412
require (

go.sum

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,19 @@ github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3Q
22
github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y=
33
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
44
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
5-
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
6-
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
5+
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
6+
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
77
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
88
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
99
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
1010
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
1111
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
1212
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
1313
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
14-
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
15-
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
16-
github.com/thenativeweb/goutils/v2 v2.2.0 h1:J3sFbgzJgt2vE7PSZI1k1xLUl67NHDOp1Iv0URzogGY=
17-
github.com/thenativeweb/goutils/v2 v2.2.0/go.mod h1:9TcyFZFzQNoTrL8lNkewhNKodpZlfJTGNC8HI8LFCHY=
18-
github.com/thenativeweb/goutils/v2 v2.4.0 h1:oJgi5NZ+HOVh27H7otlm/fb/ECptIyYj94ew3okUmIA=
19-
github.com/thenativeweb/goutils/v2 v2.4.0/go.mod h1:/VHDjasi1pIqSB8M4x5nKS7sSNSCPqbXYeceJcfcUVU=
20-
github.com/thenativeweb/goutils/v2 v2.5.0 h1:heeZnWyv//WVcA9TqnEZu+f/z+Qgq3sSCo9sWFaIq8g=
21-
github.com/thenativeweb/goutils/v2 v2.5.0/go.mod h1:/VHDjasi1pIqSB8M4x5nKS7sSNSCPqbXYeceJcfcUVU=
22-
github.com/thenativeweb/goutils/v2 v2.6.0 h1:Bh8uHMh6Qq5CJZArG/wlcRNwFzmh6pZDqQejL8GDc9k=
23-
github.com/thenativeweb/goutils/v2 v2.6.0/go.mod h1:/VHDjasi1pIqSB8M4x5nKS7sSNSCPqbXYeceJcfcUVU=
14+
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
15+
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
16+
github.com/thenativeweb/goutils/v2 v2.7.2 h1:akXVEFQPRNJhueoFKLt1PSdoIUd7q5xPW2aGPe+im/o=
17+
github.com/thenativeweb/goutils/v2 v2.7.2/go.mod h1:SywpWk83y9P5a4+NPVFvoOua0ZxUMpunKVGLDw6yHWg=
2418
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
2519
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
2620
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
FROM ghcr.io/thenativeweb/eventsourcingdb:0.77.0
1+
FROM ghcr.io/thenativeweb/eventsourcingdb:0.78.0

0 commit comments

Comments
 (0)