Skip to content

Commit 38a0433

Browse files
committed
Add support for deferred publishing via Producer (TCP)
1 parent ea18c2e commit 38a0433

File tree

2 files changed

+147
-42
lines changed

2 files changed

+147
-42
lines changed

producer.go

+44-8
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ type Producer struct {
6565
// producers.
6666
type ProducerRequest struct {
6767
Topic string
68+
Delay time.Duration
6869
Message []byte
6970
Response chan<- error
7071
Deadline time.Time
@@ -138,7 +139,7 @@ func (p *Producer) Stop() {
138139
// Note that no retry is done internally, the producer will fail after the
139140
// first unsuccessful attempt to publish the message. It is the responsibility
140141
// of the caller to retry if necessary.
141-
func (p *Producer) Publish(message []byte) (err error) {
142+
func (p *Producer) Publish(message []byte) error {
142143
return p.PublishTo(p.topic, message)
143144
}
144145

@@ -149,7 +150,33 @@ func (p *Producer) Publish(message []byte) (err error) {
149150
// Note that no retry is done internally, the producer will fail after the
150151
// first unsuccessful attempt to publish the message. It is the responsibility
151152
// of the caller to retry if necessary.
152-
func (p *Producer) PublishTo(topic string, message []byte) (err error) {
153+
func (p *Producer) PublishTo(topic string, message []byte) error {
154+
return p.sendProducerRequest(topic, 0, message)
155+
}
156+
157+
// DeferredPublish sends a deferred message using the producer p, returning an
158+
// error if it was already closed or if an error occurred while publishing the
159+
// message.
160+
//
161+
// Note that no retry is done internally, the producer will fail after the
162+
// first unsuccessful attempt to publish the message. It is the responsibility
163+
// of the caller to retry if necessary.
164+
func (p *Producer) DeferredPublish(delay time.Duration, message []byte) error {
165+
return p.DeferredPublishTo(p.topic, delay, message)
166+
}
167+
168+
// DeferredPublishTo sends a deferred message to a specific topic using the
169+
// producer p, returning an error if it was already closed or if an error
170+
// occurred while publishing the message.
171+
//
172+
// Note that no retry is done internally, the producer will fail after the
173+
// first unsuccessful attempt to publish the message. It is the responsibility
174+
// of the caller to retry if necessary.
175+
func (p *Producer) DeferredPublishTo(topic string, delay time.Duration, message []byte) error {
176+
return p.sendProducerRequest(topic, delay, message)
177+
}
178+
179+
func (p *Producer) sendProducerRequest(topic string, delay time.Duration, message []byte) (err error) {
153180
defer func() {
154181
if recover() != nil {
155182
err = errors.New("publishing to a producer that was already stopped")
@@ -167,6 +194,7 @@ func (p *Producer) PublishTo(topic string, message []byte) (err error) {
167194
// it up.
168195
p.reqs <- ProducerRequest{
169196
Topic: topic,
197+
Delay: delay,
170198
Message: message,
171199
Response: response,
172200
Deadline: deadline,
@@ -265,7 +293,7 @@ func (p *Producer) run() {
265293
return
266294
}
267295

268-
if err := p.publish(conn, req.Topic, req.Message); err != nil {
296+
if err := p.publishMessage(conn, req.Topic, req.Delay, req.Message); err != nil {
269297
req.complete(err)
270298
shutdown(err)
271299
continue
@@ -333,11 +361,19 @@ func (p *Producer) write(conn *Conn, cmd Command) (err error) {
333361
return
334362
}
335363

336-
func (p *Producer) publish(conn *Conn, topic string, message []byte) error {
337-
return p.write(conn, Pub{
338-
Topic: topic,
339-
Message: message,
340-
})
364+
func (p *Producer) publishMessage(conn *Conn, topic string, delay time.Duration, message []byte) error {
365+
if delay == 0 {
366+
return p.write(conn, Pub{
367+
Topic: topic,
368+
Message: message,
369+
})
370+
} else {
371+
return p.write(conn, DPub{
372+
Topic: topic,
373+
Delay: delay,
374+
Message: message,
375+
})
376+
}
341377
}
342378

343379
func (p *Producer) ping(conn *Conn) error {

producer_test.go

+103-34
Original file line numberDiff line numberDiff line change
@@ -7,28 +7,67 @@ import (
77
"time"
88
)
99

10-
func TestProducer(t *testing.T) {
11-
for _, n := range []int{1, 10, 100, 1000} {
10+
func mustStartConsumerAndProducer(address, topic string) (*Consumer, *Producer) {
11+
c, err := StartConsumer(ConsumerConfig{
12+
Topic: topic,
13+
Channel: "channel",
14+
Address: address,
15+
})
16+
if err != nil {
17+
panic(err)
18+
}
19+
20+
// Give some time for the consumer to connect.
21+
time.Sleep(100 * time.Millisecond)
22+
23+
p, _ := StartProducer(ProducerConfig{
24+
Address: address,
25+
Topic: topic,
26+
MaxConcurrency: 3,
27+
})
28+
if err != nil {
29+
c.Stop()
30+
panic(err)
31+
}
32+
33+
return c, p
34+
}
35+
36+
func consumeAndCheckMessages(c *Consumer, count int, deadline *time.Timer) error {
37+
buckets := make([]int, count)
38+
39+
for i := 0; i != count; i++ {
40+
select {
41+
case msg := <-c.Messages():
42+
b, err := strconv.Atoi(string(msg.Body))
43+
if err != nil {
44+
return err
45+
}
46+
buckets[b]++
47+
msg.Finish()
48+
case <-deadline.C:
49+
return fmt.Errorf("timeout")
50+
}
51+
}
52+
53+
for i, b := range buckets {
54+
if b != 1 {
55+
return fmt.Errorf("bucket at index %d has value %d", i, b)
56+
}
57+
}
58+
59+
return nil
60+
}
61+
62+
func TestProducerPublish(t *testing.T) {
63+
for _, n := range [...]int{1, 10, 100, 1000} {
1264
count := n
13-
topic := fmt.Sprintf("test-publisher-%d", n)
65+
topic := fmt.Sprintf("test-publish-%d", n)
1466
t.Run(topic, func(t *testing.T) {
1567
t.Parallel()
1668

17-
c, _ := StartConsumer(ConsumerConfig{
18-
Topic: topic,
19-
Channel: "channel",
20-
Address: "localhost:4150",
21-
})
69+
c, p := mustStartConsumerAndProducer("localhost:4150", topic)
2270
defer c.Stop()
23-
24-
// Give some time for the consumer to connect.
25-
time.Sleep(100 * time.Millisecond)
26-
27-
p, _ := StartProducer(ProducerConfig{
28-
Address: "localhost:4150",
29-
Topic: topic,
30-
MaxConcurrency: 3,
31-
})
3271
defer p.Stop()
3372

3473
for i := 0; i != count; i++ {
@@ -38,30 +77,60 @@ func TestProducer(t *testing.T) {
3877
}
3978
}
4079

41-
buckets := make([]int, count)
42-
43-
deadline := time.NewTimer(10 * time.Second)
80+
// Margin of error: 5*time.Second
81+
deadline := time.NewTimer(5 * time.Second)
4482
defer deadline.Stop()
4583

84+
err := consumeAndCheckMessages(c, count, deadline)
85+
if err != nil {
86+
t.Error(err)
87+
}
88+
})
89+
}
90+
}
91+
92+
func TestProducerDeferredPublish(t *testing.T) {
93+
delay := 10 * time.Second
94+
95+
for _, n := range [...]int{1, 10, 100, 1000} {
96+
count := n
97+
topic := fmt.Sprintf("test-deferred-publish-%d", n)
98+
t.Run(topic, func(t *testing.T) {
99+
t.Parallel()
100+
101+
c, p := mustStartConsumerAndProducer("localhost:4150", topic)
102+
defer c.Stop()
103+
defer p.Stop()
104+
105+
publishStart := time.Now()
106+
46107
for i := 0; i != count; i++ {
47-
select {
48-
case msg := <-c.Messages():
49-
b, err := strconv.Atoi(string(msg.Body))
50-
if err != nil {
51-
t.Error(err)
52-
}
53-
buckets[b]++
54-
msg.Finish()
55-
case <-deadline.C:
56-
t.Error("timeout")
108+
if err := p.DeferredPublish(delay, []byte(strconv.Itoa(i))); err != nil {
109+
t.Error(err)
57110
return
58111
}
59112
}
60113

61-
for i, b := range buckets {
62-
if b != 1 {
63-
t.Errorf("bucket at index %d has value %d", i, b)
64-
}
114+
publishEnd := time.Now()
115+
116+
// Margin of error: 1*time.Second
117+
delayTimer := time.NewTimer(delay - publishEnd.Sub(publishStart) - 1*time.Second)
118+
defer delayTimer.Stop()
119+
120+
select {
121+
case _ = <-c.Messages():
122+
t.Error("received deferred message early (before delay time passed)")
123+
return
124+
case <-delayTimer.C:
125+
}
126+
127+
// Margin of error: 5*time.Second
128+
deadline := time.NewTimer(delay - time.Now().Sub(publishEnd) + 5*time.Second)
129+
defer deadline.Stop()
130+
131+
err := consumeAndCheckMessages(c, count, deadline)
132+
if err != nil {
133+
t.Error(err)
65134
}
66135
})
67136
}

0 commit comments

Comments
 (0)