@@ -8,16 +8,22 @@ package integration
8
8
9
9
import (
10
10
"context"
11
+ "net"
11
12
"os"
12
13
"runtime"
13
14
"testing"
14
15
"time"
15
16
17
+ "go.mongodb.org/mongo-driver/bson/primitive"
18
+ "go.mongodb.org/mongo-driver/event"
16
19
"go.mongodb.org/mongo-driver/internal/assert"
17
20
"go.mongodb.org/mongo-driver/internal/handshake"
21
+ "go.mongodb.org/mongo-driver/internal/require"
22
+ "go.mongodb.org/mongo-driver/mongo/address"
18
23
"go.mongodb.org/mongo-driver/mongo/description"
19
24
"go.mongodb.org/mongo-driver/mongo/integration/mtest"
20
25
"go.mongodb.org/mongo-driver/mongo/options"
26
+ "go.mongodb.org/mongo-driver/x/mongo/driver/topology"
21
27
)
22
28
23
29
func TestSDAMProse (t * testing.T ) {
@@ -173,3 +179,62 @@ func TestSDAMProse(t *testing.T) {
173
179
174
180
})
175
181
}
182
+
183
+ func TestServerHeartbeatStartedEvent (t * testing.T ) {
184
+ t .Run ("emits the first HeartbeatStartedEvent before the monitoring socket was created" , func (t * testing.T ) {
185
+ t .Parallel ()
186
+
187
+ const address = address .Address ("localhost:9999" )
188
+ expectedEvents := []string {
189
+ "serverHeartbeatStartedEvent" ,
190
+ "client connected" ,
191
+ "client hello received" ,
192
+ "serverHeartbeatFailedEvent" ,
193
+ }
194
+
195
+ events := make (chan string )
196
+
197
+ listener , err := net .Listen ("tcp" , address .String ())
198
+ assert .NoError (t , err )
199
+ defer listener .Close ()
200
+ go func () {
201
+ conn , err := listener .Accept ()
202
+ assert .NoError (t , err )
203
+ defer conn .Close ()
204
+
205
+ events <- "client connected"
206
+ _ , _ = conn .Read (nil )
207
+ events <- "client hello received"
208
+ }()
209
+
210
+ server := topology .NewServer (
211
+ address ,
212
+ primitive .NewObjectID (),
213
+ topology .WithServerMonitor (func (* event.ServerMonitor ) * event.ServerMonitor {
214
+ return & event.ServerMonitor {
215
+ ServerHeartbeatStarted : func (e * event.ServerHeartbeatStartedEvent ) {
216
+ events <- "serverHeartbeatStartedEvent"
217
+ },
218
+ ServerHeartbeatFailed : func (e * event.ServerHeartbeatFailedEvent ) {
219
+ events <- "serverHeartbeatFailedEvent"
220
+ },
221
+ }
222
+ }),
223
+ )
224
+ require .NoError (t , server .Connect (nil ))
225
+
226
+ ticker := time .NewTicker (5 * time .Second )
227
+ defer ticker .Stop ()
228
+
229
+ actualEvents := make ([]string , 0 , len (expectedEvents ))
230
+ for len (actualEvents ) < len (expectedEvents ) {
231
+ select {
232
+ case event := <- events :
233
+ actualEvents = append (actualEvents , event )
234
+ case <- ticker .C :
235
+ assert .FailNow (t , "timed out for incoming event" )
236
+ }
237
+ }
238
+ assert .Equal (t , expectedEvents , actualEvents )
239
+ })
240
+ }
0 commit comments