@@ -7,28 +7,67 @@ import (
7
7
"time"
8
8
)
9
9
10
- func TestProducer (t * testing.T ) {
11
- for _ , n := range []int {1 , 10 , 100 , 1000 } {
10
+ func mustStartConsumerAndProducer (address , topic string ) (* Consumer , * Producer ) {
11
+ c , err := StartConsumer (ConsumerConfig {
12
+ Topic : topic ,
13
+ Channel : "channel" ,
14
+ Address : address ,
15
+ })
16
+ if err != nil {
17
+ panic (err )
18
+ }
19
+
20
+ // Give some time for the consumer to connect.
21
+ time .Sleep (100 * time .Millisecond )
22
+
23
+ p , _ := StartProducer (ProducerConfig {
24
+ Address : address ,
25
+ Topic : topic ,
26
+ MaxConcurrency : 3 ,
27
+ })
28
+ if err != nil {
29
+ c .Stop ()
30
+ panic (err )
31
+ }
32
+
33
+ return c , p
34
+ }
35
+
36
+ func consumeAndCheckMessages (c * Consumer , count int , deadline * time.Timer ) error {
37
+ buckets := make ([]int , count )
38
+
39
+ for i := 0 ; i != count ; i ++ {
40
+ select {
41
+ case msg := <- c .Messages ():
42
+ b , err := strconv .Atoi (string (msg .Body ))
43
+ if err != nil {
44
+ return err
45
+ }
46
+ buckets [b ]++
47
+ msg .Finish ()
48
+ case <- deadline .C :
49
+ return fmt .Errorf ("timeout" )
50
+ }
51
+ }
52
+
53
+ for i , b := range buckets {
54
+ if b != 1 {
55
+ return fmt .Errorf ("bucket at index %d has value %d" , i , b )
56
+ }
57
+ }
58
+
59
+ return nil
60
+ }
61
+
62
+ func TestProducerPublish (t * testing.T ) {
63
+ for _ , n := range [... ]int {1 , 10 , 100 , 1000 } {
12
64
count := n
13
- topic := fmt .Sprintf ("test-publisher -%d" , n )
65
+ topic := fmt .Sprintf ("test-publish -%d" , n )
14
66
t .Run (topic , func (t * testing.T ) {
15
67
t .Parallel ()
16
68
17
- c , _ := StartConsumer (ConsumerConfig {
18
- Topic : topic ,
19
- Channel : "channel" ,
20
- Address : "localhost:4150" ,
21
- })
69
+ c , p := mustStartConsumerAndProducer ("localhost:4150" , topic )
22
70
defer c .Stop ()
23
-
24
- // Give some time for the consumer to connect.
25
- time .Sleep (100 * time .Millisecond )
26
-
27
- p , _ := StartProducer (ProducerConfig {
28
- Address : "localhost:4150" ,
29
- Topic : topic ,
30
- MaxConcurrency : 3 ,
31
- })
32
71
defer p .Stop ()
33
72
34
73
for i := 0 ; i != count ; i ++ {
@@ -38,30 +77,60 @@ func TestProducer(t *testing.T) {
38
77
}
39
78
}
40
79
41
- buckets := make ([]int , count )
42
-
43
- deadline := time .NewTimer (10 * time .Second )
80
+ // Margin of error: 5*time.Second
81
+ deadline := time .NewTimer (5 * time .Second )
44
82
defer deadline .Stop ()
45
83
84
+ err := consumeAndCheckMessages (c , count , deadline )
85
+ if err != nil {
86
+ t .Error (err )
87
+ }
88
+ })
89
+ }
90
+ }
91
+
92
+ func TestProducerDeferredPublish (t * testing.T ) {
93
+ delay := 10 * time .Second
94
+
95
+ for _ , n := range [... ]int {1 , 10 , 100 , 1000 } {
96
+ count := n
97
+ topic := fmt .Sprintf ("test-deferred-publish-%d" , n )
98
+ t .Run (topic , func (t * testing.T ) {
99
+ t .Parallel ()
100
+
101
+ c , p := mustStartConsumerAndProducer ("localhost:4150" , topic )
102
+ defer c .Stop ()
103
+ defer p .Stop ()
104
+
105
+ publishStart := time .Now ()
106
+
46
107
for i := 0 ; i != count ; i ++ {
47
- select {
48
- case msg := <- c .Messages ():
49
- b , err := strconv .Atoi (string (msg .Body ))
50
- if err != nil {
51
- t .Error (err )
52
- }
53
- buckets [b ]++
54
- msg .Finish ()
55
- case <- deadline .C :
56
- t .Error ("timeout" )
108
+ if err := p .DeferredPublish (delay , []byte (strconv .Itoa (i ))); err != nil {
109
+ t .Error (err )
57
110
return
58
111
}
59
112
}
60
113
61
- for i , b := range buckets {
62
- if b != 1 {
63
- t .Errorf ("bucket at index %d has value %d" , i , b )
64
- }
114
+ publishEnd := time .Now ()
115
+
116
+ // Margin of error: 1*time.Second
117
+ delayTimer := time .NewTimer (delay - publishEnd .Sub (publishStart ) - 1 * time .Second )
118
+ defer delayTimer .Stop ()
119
+
120
+ select {
121
+ case _ = <- c .Messages ():
122
+ t .Error ("received deferred message early (before delay time passed)" )
123
+ return
124
+ case <- delayTimer .C :
125
+ }
126
+
127
+ // Margin of error: 5*time.Second
128
+ deadline := time .NewTimer (delay - time .Now ().Sub (publishEnd ) + 5 * time .Second )
129
+ defer deadline .Stop ()
130
+
131
+ err := consumeAndCheckMessages (c , count , deadline )
132
+ if err != nil {
133
+ t .Error (err )
65
134
}
66
135
})
67
136
}
0 commit comments