Skip to content

Commit 84725cb

Browse files
committed
Add support for deferred publishing via Producer (TCP)
1 parent e6cc0de commit 84725cb

File tree

2 files changed

+147
-42
lines changed

2 files changed

+147
-42
lines changed

producer.go

+44-8
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ type Producer struct {
6767
// producers.
6868
type ProducerRequest struct {
6969
Topic string
70+
Delay time.Duration
7071
Message []byte
7172
Response chan<- error
7273
Deadline time.Time
@@ -157,7 +158,7 @@ func (p *Producer) Stop() {
157158
// Note that no retry is done internally, the producer will fail after the
158159
// first unsuccessful attempt to publish the message. It is the responsibility
159160
// of the caller to retry if necessary.
160-
func (p *Producer) Publish(message []byte) (err error) {
161+
func (p *Producer) Publish(message []byte) error {
161162
return p.PublishTo(p.topic, message)
162163
}
163164

@@ -168,7 +169,33 @@ func (p *Producer) Publish(message []byte) (err error) {
168169
// Note that no retry is done internally, the producer will fail after the
169170
// first unsuccessful attempt to publish the message. It is the responsibility
170171
// of the caller to retry if necessary.
171-
func (p *Producer) PublishTo(topic string, message []byte) (err error) {
172+
func (p *Producer) PublishTo(topic string, message []byte) error {
173+
return p.sendProducerRequest(topic, 0, message)
174+
}
175+
176+
// DeferredPublish sends a deferred message using the producer p, returning an
177+
// error if it was already closed or if an error occurred while publishing the
178+
// message.
179+
//
180+
// Note that no retry is done internally, the producer will fail after the
181+
// first unsuccessful attempt to publish the message. It is the responsibility
182+
// of the caller to retry if necessary.
183+
func (p *Producer) DeferredPublish(delay time.Duration, message []byte) error {
184+
return p.DeferredPublishTo(p.topic, delay, message)
185+
}
186+
187+
// DeferredPublishTo sends a deferred message to a specific topic using the
188+
// producer p, returning an error if it was already closed or if an error
189+
// occurred while publishing the message.
190+
//
191+
// Note that no retry is done internally, the producer will fail after the
192+
// first unsuccessful attempt to publish the message. It is the responsibility
193+
// of the caller to retry if necessary.
194+
func (p *Producer) DeferredPublishTo(topic string, delay time.Duration, message []byte) error {
195+
return p.sendProducerRequest(topic, delay, message)
196+
}
197+
198+
func (p *Producer) sendProducerRequest(topic string, delay time.Duration, message []byte) (err error) {
172199
defer func() {
173200
if recover() != nil {
174201
err = errors.New("publishing to a producer that was already stopped")
@@ -186,6 +213,7 @@ func (p *Producer) PublishTo(topic string, message []byte) (err error) {
186213
// it up.
187214
p.reqs <- ProducerRequest{
188215
Topic: topic,
216+
Delay: delay,
189217
Message: message,
190218
Response: response,
191219
Deadline: deadline,
@@ -297,7 +325,7 @@ func (p *Producer) run() {
297325
continue
298326
}
299327

300-
if err := p.publish(conn, req.Topic, req.Message); err != nil {
328+
if err := p.publishMessage(conn, req.Topic, req.Delay, req.Message); err != nil {
301329
req.complete(err)
302330
shutdown(err)
303331
continue
@@ -365,11 +393,19 @@ func (p *Producer) write(conn *Conn, cmd Command) (err error) {
365393
return
366394
}
367395

368-
func (p *Producer) publish(conn *Conn, topic string, message []byte) error {
369-
return p.write(conn, Pub{
370-
Topic: topic,
371-
Message: message,
372-
})
396+
func (p *Producer) publishMessage(conn *Conn, topic string, delay time.Duration, message []byte) error {
397+
if delay == 0 {
398+
return p.write(conn, Pub{
399+
Topic: topic,
400+
Message: message,
401+
})
402+
} else {
403+
return p.write(conn, DPub{
404+
Topic: topic,
405+
Delay: delay,
406+
Message: message,
407+
})
408+
}
373409
}
374410

375411
func (p *Producer) ping(conn *Conn) error {

producer_test.go

+103-34
Original file line numberDiff line numberDiff line change
@@ -7,28 +7,67 @@ import (
77
"time"
88
)
99

10-
func TestProducer(t *testing.T) {
11-
for _, n := range []int{1, 10, 100, 1000} {
10+
func mustStartConsumerAndProducer(address, topic string) (*Consumer, *Producer) {
11+
c, err := StartConsumer(ConsumerConfig{
12+
Topic: topic,
13+
Channel: "channel",
14+
Address: address,
15+
})
16+
if err != nil {
17+
panic(err)
18+
}
19+
20+
// Give some time for the consumer to connect.
21+
time.Sleep(100 * time.Millisecond)
22+
23+
p, _ := StartProducer(ProducerConfig{
24+
Address: address,
25+
Topic: topic,
26+
MaxConcurrency: 3,
27+
})
28+
if err != nil {
29+
c.Stop()
30+
panic(err)
31+
}
32+
33+
return c, p
34+
}
35+
36+
func consumeAndCheckMessages(c *Consumer, count int, deadline *time.Timer) error {
37+
buckets := make([]int, count)
38+
39+
for i := 0; i != count; i++ {
40+
select {
41+
case msg := <-c.Messages():
42+
b, err := strconv.Atoi(string(msg.Body))
43+
if err != nil {
44+
return err
45+
}
46+
buckets[b]++
47+
msg.Finish()
48+
case <-deadline.C:
49+
return fmt.Errorf("timeout")
50+
}
51+
}
52+
53+
for i, b := range buckets {
54+
if b != 1 {
55+
return fmt.Errorf("bucket at index %d has value %d", i, b)
56+
}
57+
}
58+
59+
return nil
60+
}
61+
62+
func TestProducerPublish(t *testing.T) {
63+
for _, n := range [...]int{1, 10, 100, 1000} {
1264
count := n
13-
topic := fmt.Sprintf("test-publisher-%d", n)
65+
topic := fmt.Sprintf("test-publish-%d", n)
1466
t.Run(topic, func(t *testing.T) {
1567
t.Parallel()
1668

17-
c, _ := StartConsumer(ConsumerConfig{
18-
Topic: topic,
19-
Channel: "channel",
20-
Address: "localhost:4150",
21-
})
69+
c, p := mustStartConsumerAndProducer("localhost:4150", topic)
2270
defer c.Stop()
23-
24-
// Give some time for the consumer to connect.
25-
time.Sleep(100 * time.Millisecond)
26-
27-
p, _ := StartProducer(ProducerConfig{
28-
Address: "localhost:4150",
29-
Topic: topic,
30-
MaxConcurrency: 3,
31-
})
3271
defer p.Stop()
3372

3473
for i := 0; i != count; i++ {
@@ -38,30 +77,60 @@ func TestProducer(t *testing.T) {
3877
}
3978
}
4079

41-
buckets := make([]int, count)
42-
43-
deadline := time.NewTimer(10 * time.Second)
80+
// Margin of error: 5*time.Second
81+
deadline := time.NewTimer(5 * time.Second)
4482
defer deadline.Stop()
4583

84+
err := consumeAndCheckMessages(c, count, deadline)
85+
if err != nil {
86+
t.Error(err)
87+
}
88+
})
89+
}
90+
}
91+
92+
func TestProducerDeferredPublish(t *testing.T) {
93+
delay := 10 * time.Second
94+
95+
for _, n := range [...]int{1, 10, 100, 1000} {
96+
count := n
97+
topic := fmt.Sprintf("test-deferred-publish-%d", n)
98+
t.Run(topic, func(t *testing.T) {
99+
t.Parallel()
100+
101+
c, p := mustStartConsumerAndProducer("localhost:4150", topic)
102+
defer c.Stop()
103+
defer p.Stop()
104+
105+
publishStart := time.Now()
106+
46107
for i := 0; i != count; i++ {
47-
select {
48-
case msg := <-c.Messages():
49-
b, err := strconv.Atoi(string(msg.Body))
50-
if err != nil {
51-
t.Error(err)
52-
}
53-
buckets[b]++
54-
msg.Finish()
55-
case <-deadline.C:
56-
t.Error("timeout")
108+
if err := p.DeferredPublish(delay, []byte(strconv.Itoa(i))); err != nil {
109+
t.Error(err)
57110
return
58111
}
59112
}
60113

61-
for i, b := range buckets {
62-
if b != 1 {
63-
t.Errorf("bucket at index %d has value %d", i, b)
64-
}
114+
publishEnd := time.Now()
115+
116+
// Margin of error: 1*time.Second
117+
delayTimer := time.NewTimer(delay - publishEnd.Sub(publishStart) - 1*time.Second)
118+
defer delayTimer.Stop()
119+
120+
select {
121+
case _ = <-c.Messages():
122+
t.Error("received deferred message early (before delay time passed)")
123+
return
124+
case <-delayTimer.C:
125+
}
126+
127+
// Margin of error: 5*time.Second
128+
deadline := time.NewTimer(delay - time.Now().Sub(publishEnd) + 5*time.Second)
129+
defer deadline.Stop()
130+
131+
err := consumeAndCheckMessages(c, count, deadline)
132+
if err != nil {
133+
t.Error(err)
65134
}
66135
})
67136
}

0 commit comments

Comments
 (0)