@@ -3106,35 +3106,53 @@ export default function abstractTest(server, config, ports) {
3106
3106
timeout : 4000 ,
3107
3107
} ,
3108
3108
function _test ( t , done ) {
3109
- const client = connect ( { reconnectPeriod : 200 } )
3109
+ t . after ( ( ) => {
3110
+ // close client if not closed
3111
+ if ( client ) {
3112
+ client . end ( true )
3113
+ }
3114
+ } )
3115
+ let client = connect ( { reconnectPeriod : 100 } )
3110
3116
let serverPublished = false
3111
3117
let clientCalledBack = false
3112
3118
3119
+ // client is connected the first time
3113
3120
server . once ( 'client' , ( serverClient ) => {
3114
- serverClient . on ( 'connect' , ( ) => {
3121
+ // destroy the stream before the publish is acknowledged
3122
+ serverClient . once ( 'connect' , ( ) => {
3115
3123
setImmediate ( ( ) => {
3116
3124
serverClient . stream . destroy ( )
3117
3125
} )
3118
3126
} )
3119
3127
3128
+ // after 100ms the client should reconnect
3120
3129
server . once ( 'client' , ( serverClientNew ) => {
3121
3130
serverClientNew . on ( 'publish' , ( ) => {
3122
3131
serverPublished = true
3123
- check ( )
3124
3132
} )
3125
3133
} )
3126
3134
} )
3127
3135
3136
+ // ensure that on first reconnect the publish is still not acknowledged
3137
+ client . once ( 'reconnect' , ( ) => {
3138
+ // client callback should not be triggered on first connection
3139
+ assert . isFalse ( clientCalledBack )
3140
+ } )
3141
+
3128
3142
client . publish ( 'hello' , 'world' , { qos : 1 } , ( ) => {
3129
3143
clientCalledBack = true
3130
- check ( )
3131
3144
} )
3132
3145
3133
- function check ( ) {
3134
- if ( serverPublished && clientCalledBack ) {
3135
- client . end ( true , done )
3146
+ client . on ( 'packetreceive' , ( packet ) => {
3147
+ if ( packet . cmd === 'puback' ) {
3148
+ assert . isTrue ( serverPublished )
3149
+ setImmediate ( ( ) => {
3150
+ assert . isTrue ( clientCalledBack )
3151
+ client . end ( true , done )
3152
+ client = null
3153
+ } )
3136
3154
}
3137
- }
3155
+ } )
3138
3156
} ,
3139
3157
)
3140
3158
@@ -3143,7 +3161,7 @@ export default function abstractTest(server, config, ports) {
3143
3161
let serverPublished = false
3144
3162
let clientCalledBack = false
3145
3163
server . once ( 'client' , ( serverClient ) => {
3146
- serverClient . on ( 'connect' , ( ) => {
3164
+ serverClient . once ( 'connect' , ( ) => {
3147
3165
setImmediate ( ( ) => {
3148
3166
serverClient . stream . destroy ( )
3149
3167
client . end ( true , ( err ) => {
@@ -3164,42 +3182,65 @@ export default function abstractTest(server, config, ports) {
3164
3182
} )
3165
3183
} )
3166
3184
3167
- it ( 'should resend in-flight QoS 2 publish messages from the client' , function _test ( t , done ) {
3168
- const client = connect ( { reconnectPeriod : 200 } )
3169
- let serverPublished = false
3170
- let clientCalledBack = false
3185
+ it (
3186
+ 'should resend in-flight QoS 2 publish messages from the client' ,
3187
+ {
3188
+ timeout : 4000 ,
3189
+ } ,
3190
+ function _test ( t , done ) {
3191
+ t . after ( ( ) => {
3192
+ // close client if not closed
3193
+ if ( client ) {
3194
+ client . end ( true )
3195
+ }
3196
+ } )
3171
3197
3172
- server . once ( 'client' , ( serverClient ) => {
3173
- // ignore errors
3174
- serverClient . on ( 'error' , ( ) => { } )
3175
- serverClient . on ( 'publish' , ( ) => {
3176
- setImmediate ( ( ) => {
3177
- serverClient . stream . destroy ( )
3198
+ let client = connect ( { reconnectPeriod : 100 } )
3199
+ let serverPublished = false
3200
+ let clientCalledBack = false
3201
+
3202
+ server . once ( 'client' , ( serverClient ) => {
3203
+ // ignore errors
3204
+ serverClient . on ( 'error' , ( ) => { } )
3205
+ serverClient . on ( 'publish' , ( ) => {
3206
+ setImmediate ( ( ) => {
3207
+ serverClient . stream . destroy ( )
3208
+ } )
3178
3209
} )
3179
- } )
3180
3210
3181
- server . once ( 'client' , ( serverClientNew ) => {
3182
- serverClientNew . on ( 'pubrel' , ( ) => {
3183
- serverPublished = true
3184
- check ( )
3211
+ server . once ( 'client' , ( serverClientNew ) => {
3212
+ serverClientNew . on ( 'pubrel' , ( ) => {
3213
+ serverPublished = true
3214
+ } )
3185
3215
} )
3186
3216
} )
3187
- } )
3188
3217
3189
- client . publish ( 'hello' , 'world' , { qos : 2 } , ( ) => {
3190
- clientCalledBack = true
3191
- check ( )
3192
- } )
3218
+ client . publish ( 'hello' , 'world' , { qos : 2 } , ( ) => {
3219
+ clientCalledBack = true
3220
+ } )
3193
3221
3194
- function check ( ) {
3195
- if ( serverPublished && clientCalledBack ) {
3196
- client . end ( true , done )
3197
- }
3198
- }
3199
- } )
3222
+ client . on ( 'packetreceive' , ( packet ) => {
3223
+ if ( packet . cmd === 'pubcomp' ) {
3224
+ assert . isTrue ( serverPublished )
3225
+ setImmediate ( ( ) => {
3226
+ assert . isTrue ( clientCalledBack )
3227
+ client . end ( true , done )
3228
+ client = null
3229
+ } )
3230
+ }
3231
+ } )
3232
+ } ,
3233
+ )
3200
3234
3201
3235
it ( 'should not resend in-flight QoS 1 removed publish messages from the client' , function _test ( t , done ) {
3202
- const client = connect ( { reconnectPeriod : 200 } )
3236
+ t . after ( ( ) => {
3237
+ // close client if not closed
3238
+ if ( client ) {
3239
+ client . end ( true )
3240
+ }
3241
+ } )
3242
+
3243
+ let client = connect ( { reconnectPeriod : 100 } )
3203
3244
let clientCalledBack = false
3204
3245
3205
3246
server . once ( 'client' , ( serverClient ) => {
@@ -3211,8 +3252,7 @@ export default function abstractTest(server, config, ports) {
3211
3252
3212
3253
server . once ( 'client' , ( serverClientNew ) => {
3213
3254
serverClientNew . on ( 'publish' , ( ) => {
3214
- fail ( )
3215
- done ( )
3255
+ done ( Error ( 'should not have received publish' ) )
3216
3256
} )
3217
3257
} )
3218
3258
} )
@@ -3234,6 +3274,7 @@ export default function abstractTest(server, config, ports) {
3234
3274
assert . isTrue ( clientCalledBack )
3235
3275
client . end ( true , ( err ) => {
3236
3276
done ( err )
3277
+ client = null
3237
3278
} )
3238
3279
} )
3239
3280
@@ -3250,8 +3291,7 @@ export default function abstractTest(server, config, ports) {
3250
3291
3251
3292
server . once ( 'client' , ( serverClientNew ) => {
3252
3293
serverClientNew . on ( 'publish' , ( ) => {
3253
- fail ( )
3254
- done ( )
3294
+ done ( Error ( 'should not have received publish' ) )
3255
3295
} )
3256
3296
} )
3257
3297
} )
0 commit comments