From 39f33a1572b26b32a477ef75b9987bfa17bd3f33 Mon Sep 17 00:00:00 2001 From: Seoester Date: Fri, 4 Oct 2019 12:46:43 +0200 Subject: [PATCH 1/3] Add implementation of the DPUB command --- command.go | 4 ++ dpub.go | 110 ++++++++++++++++++++++++++++++++++++++++++++++++++ dpub_test.go | 35 ++++++++++++++++ error.go | 1 + error_test.go | 1 + 5 files changed, 151 insertions(+) create mode 100644 dpub.go create mode 100644 dpub_test.go diff --git a/command.go b/command.go index cf72996..f74ed23 100644 --- a/command.go +++ b/command.go @@ -57,6 +57,10 @@ func ReadCommand(r *bufio.Reader) (cmd Command, err error) { return readPub(line[4:], r) } + if strings.HasPrefix(line, "DPUB ") { + return readDPub(line[5:], r) + } + if strings.HasPrefix(line, "MPUB ") { return readMPub(line[5:], r) } diff --git a/dpub.go b/dpub.go new file mode 100644 index 0000000..74e3b50 --- /dev/null +++ b/dpub.go @@ -0,0 +1,110 @@ +package nsq + +import ( + "bufio" + "encoding/binary" + "io" + "strconv" + "time" + + "github.com/pkg/errors" +) + +// DPub represents the DPUB command. +type DPub struct { + // Topic must be set to the name of the topic to which the message will be + // published. + Topic string + + // Delay is the duration NSQ will defer the message before sending it to a + // client. + Delay time.Duration + + // Message is the raw message to publish. + Message []byte +} + +// Name returns the name of the command in order to satisfy the Command +// interface. +func (c DPub) Name() string { + return "DPUB" +} + +// Write serializes the command to the given buffered output, satisfies the +// Command interface. +func (c DPub) Write(w *bufio.Writer) (err error) { + for _, s := range [...]string{ + "DPUB ", + c.Topic, + " ", + strconv.FormatUint(uint64(c.Delay/time.Millisecond), 10), + "\n", + } { + if _, err = w.WriteString(s); err != nil { + err = errors.Wrap(err, "writing DPUB command") + return + } + } + + if err = binary.Write(w, binary.BigEndian, uint32(len(c.Message))); err != nil { + err = errors.Wrap(err, "writing DPUB message size") + return + } + + if _, err = w.Write(c.Message); err != nil { + err = errors.Wrap(err, "writing DPUB message data") + return + } + + return +} + +func readDPub(line string, r *bufio.Reader) (cmd DPub, err error) { + var topic string + var delayStr string + var delayMsecs uint64 + var size uint32 + var data []byte + + topic, line = readNextWord(line) + delayStr, line = readNextWord(line) + + if len(topic) == 0 { + err = errors.New("missing topic in DPUB command") + return + } + + if len(delayStr) == 0 { + err = errors.New("missing delay in DPUB command") + return + } + + if len(line) != 0 { + err = errors.New("too many arguments found in DPUB command") + return + } + + if err = binary.Read(r, binary.BigEndian, &size); err != nil { + err = errors.Wrap(err, "reading DPUB message size") + return + } + + data = make([]byte, int(size)) + + if _, err = io.ReadFull(r, data); err != nil { + err = errors.Wrap(err, "reading DPUB message data") + return + } + + if delayMsecs, err = strconv.ParseUint(delayStr, 10, 64); err != nil { + err = errors.Wrap(err, "reading DPUB delay") + return + } + + cmd = DPub{ + Topic: topic, + Delay: time.Duration(delayMsecs) * time.Millisecond, + Message: data, + } + return +} diff --git a/dpub_test.go b/dpub_test.go new file mode 100644 index 0000000..e0fb76a --- /dev/null +++ b/dpub_test.go @@ -0,0 +1,35 @@ +package nsq + +import ( + "testing" + "time" +) + +func TestDPub(t *testing.T) { + tests := []struct { + topic string + delay time.Duration + message string + }{ + { + topic: "A", + delay: 0, + message: "", + }, + { + topic: "B", + delay: 1 * time.Second, + message: "Hello World!", + }, + } + + for _, test := range tests { + t.Run("topic:"+test.topic, func(t *testing.T) { + testCommand(t, "DPUB", DPub{ + Topic: test.topic, + Delay: test.delay, + Message: []byte(test.message), + }) + }) + } +} diff --git a/error.go b/error.go index f617c2d..17285a4 100644 --- a/error.go +++ b/error.go @@ -18,6 +18,7 @@ const ( ErrBadChannel Error = "E_BAD_CHANNEL" ErrBadMessage Error = "E_BAD_MESSAGE" ErrPubFailed Error = "E_PUB_FAILED" + ErrDPubFailed Error = "E_DPUB_FAILED" ErrMPubFailed Error = "E_MPUB_FAILED" ErrFinFailed Error = "E_FIN_FAILED" ErrReqFailed Error = "E_REQ_FAILED" diff --git a/error_test.go b/error_test.go index 6bf913e..28f1738 100644 --- a/error_test.go +++ b/error_test.go @@ -10,6 +10,7 @@ func TestError(t *testing.T) { ErrBadChannel, ErrBadMessage, ErrPubFailed, + ErrDPubFailed, ErrMPubFailed, ErrFinFailed, ErrReqFailed, From e6cc0de2bb2869c714ba6710cca97532dbd8bd8c Mon Sep 17 00:00:00 2001 From: Seoester Date: Fri, 4 Oct 2019 12:56:38 +0200 Subject: [PATCH 2/3] Add support for deferred publishing via Client (HTTP) --- nsq.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/nsq.go b/nsq.go index 77f62ec..45a3cad 100644 --- a/nsq.go +++ b/nsq.go @@ -6,6 +6,8 @@ import ( "io/ioutil" "net/http" "net/url" + "strconv" + "time" "github.com/pkg/errors" ) @@ -28,6 +30,16 @@ func (c *Client) Publish(topic string, message []byte) (err error) { return } +func (c *Client) DeferredPublish(topic string, delay time.Duration, message []byte) (err error) { + delayStr := strconv.FormatUint(uint64(delay/time.Millisecond), 10) + + _, err = c.do("POST", "/pub", url.Values{ + "topic": []string{topic}, + "defer": []string{delayStr}, + }, message) + return +} + func (c *Client) MutliPublish(topic string, messages ...[]byte) (err error) { _, err = c.do("POST", "/mpub", url.Values{ "topic": []string{topic}, From 84725cb9bec8ee22255b40f0aaec43ec057eaf14 Mon Sep 17 00:00:00 2001 From: Seoester Date: Fri, 4 Oct 2019 14:38:39 +0200 Subject: [PATCH 3/3] Add support for deferred publishing via Producer (TCP) --- producer.go | 52 +++++++++++++++--- producer_test.go | 137 +++++++++++++++++++++++++++++++++++------------ 2 files changed, 147 insertions(+), 42 deletions(-) diff --git a/producer.go b/producer.go index 9c56eb5..378af87 100644 --- a/producer.go +++ b/producer.go @@ -67,6 +67,7 @@ type Producer struct { // producers. type ProducerRequest struct { Topic string + Delay time.Duration Message []byte Response chan<- error Deadline time.Time @@ -157,7 +158,7 @@ func (p *Producer) Stop() { // Note that no retry is done internally, the producer will fail after the // first unsuccessful attempt to publish the message. It is the responsibility // of the caller to retry if necessary. -func (p *Producer) Publish(message []byte) (err error) { +func (p *Producer) Publish(message []byte) error { return p.PublishTo(p.topic, message) } @@ -168,7 +169,33 @@ func (p *Producer) Publish(message []byte) (err error) { // Note that no retry is done internally, the producer will fail after the // first unsuccessful attempt to publish the message. It is the responsibility // of the caller to retry if necessary. -func (p *Producer) PublishTo(topic string, message []byte) (err error) { +func (p *Producer) PublishTo(topic string, message []byte) error { + return p.sendProducerRequest(topic, 0, message) +} + +// DeferredPublish sends a deferred message using the producer p, returning an +// error if it was already closed or if an error occurred while publishing the +// message. +// +// Note that no retry is done internally, the producer will fail after the +// first unsuccessful attempt to publish the message. It is the responsibility +// of the caller to retry if necessary. +func (p *Producer) DeferredPublish(delay time.Duration, message []byte) error { + return p.DeferredPublishTo(p.topic, delay, message) +} + +// DeferredPublishTo sends a deferred message to a specific topic using the +// producer p, returning an error if it was already closed or if an error +// occurred while publishing the message. +// +// Note that no retry is done internally, the producer will fail after the +// first unsuccessful attempt to publish the message. It is the responsibility +// of the caller to retry if necessary. +func (p *Producer) DeferredPublishTo(topic string, delay time.Duration, message []byte) error { + return p.sendProducerRequest(topic, delay, message) +} + +func (p *Producer) sendProducerRequest(topic string, delay time.Duration, message []byte) (err error) { defer func() { if recover() != nil { err = errors.New("publishing to a producer that was already stopped") @@ -186,6 +213,7 @@ func (p *Producer) PublishTo(topic string, message []byte) (err error) { // it up. p.reqs <- ProducerRequest{ Topic: topic, + Delay: delay, Message: message, Response: response, Deadline: deadline, @@ -297,7 +325,7 @@ func (p *Producer) run() { continue } - if err := p.publish(conn, req.Topic, req.Message); err != nil { + if err := p.publishMessage(conn, req.Topic, req.Delay, req.Message); err != nil { req.complete(err) shutdown(err) continue @@ -365,11 +393,19 @@ func (p *Producer) write(conn *Conn, cmd Command) (err error) { return } -func (p *Producer) publish(conn *Conn, topic string, message []byte) error { - return p.write(conn, Pub{ - Topic: topic, - Message: message, - }) +func (p *Producer) publishMessage(conn *Conn, topic string, delay time.Duration, message []byte) error { + if delay == 0 { + return p.write(conn, Pub{ + Topic: topic, + Message: message, + }) + } else { + return p.write(conn, DPub{ + Topic: topic, + Delay: delay, + Message: message, + }) + } } func (p *Producer) ping(conn *Conn) error { diff --git a/producer_test.go b/producer_test.go index 4ce984d..a6fe99f 100644 --- a/producer_test.go +++ b/producer_test.go @@ -7,28 +7,67 @@ import ( "time" ) -func TestProducer(t *testing.T) { - for _, n := range []int{1, 10, 100, 1000} { +func mustStartConsumerAndProducer(address, topic string) (*Consumer, *Producer) { + c, err := StartConsumer(ConsumerConfig{ + Topic: topic, + Channel: "channel", + Address: address, + }) + if err != nil { + panic(err) + } + + // Give some time for the consumer to connect. + time.Sleep(100 * time.Millisecond) + + p, _ := StartProducer(ProducerConfig{ + Address: address, + Topic: topic, + MaxConcurrency: 3, + }) + if err != nil { + c.Stop() + panic(err) + } + + return c, p +} + +func consumeAndCheckMessages(c *Consumer, count int, deadline *time.Timer) error { + buckets := make([]int, count) + + for i := 0; i != count; i++ { + select { + case msg := <-c.Messages(): + b, err := strconv.Atoi(string(msg.Body)) + if err != nil { + return err + } + buckets[b]++ + msg.Finish() + case <-deadline.C: + return fmt.Errorf("timeout") + } + } + + for i, b := range buckets { + if b != 1 { + return fmt.Errorf("bucket at index %d has value %d", i, b) + } + } + + return nil +} + +func TestProducerPublish(t *testing.T) { + for _, n := range [...]int{1, 10, 100, 1000} { count := n - topic := fmt.Sprintf("test-publisher-%d", n) + topic := fmt.Sprintf("test-publish-%d", n) t.Run(topic, func(t *testing.T) { t.Parallel() - c, _ := StartConsumer(ConsumerConfig{ - Topic: topic, - Channel: "channel", - Address: "localhost:4150", - }) + c, p := mustStartConsumerAndProducer("localhost:4150", topic) defer c.Stop() - - // Give some time for the consumer to connect. - time.Sleep(100 * time.Millisecond) - - p, _ := StartProducer(ProducerConfig{ - Address: "localhost:4150", - Topic: topic, - MaxConcurrency: 3, - }) defer p.Stop() for i := 0; i != count; i++ { @@ -38,30 +77,60 @@ func TestProducer(t *testing.T) { } } - buckets := make([]int, count) - - deadline := time.NewTimer(10 * time.Second) + // Margin of error: 5*time.Second + deadline := time.NewTimer(5 * time.Second) defer deadline.Stop() + err := consumeAndCheckMessages(c, count, deadline) + if err != nil { + t.Error(err) + } + }) + } +} + +func TestProducerDeferredPublish(t *testing.T) { + delay := 10 * time.Second + + for _, n := range [...]int{1, 10, 100, 1000} { + count := n + topic := fmt.Sprintf("test-deferred-publish-%d", n) + t.Run(topic, func(t *testing.T) { + t.Parallel() + + c, p := mustStartConsumerAndProducer("localhost:4150", topic) + defer c.Stop() + defer p.Stop() + + publishStart := time.Now() + for i := 0; i != count; i++ { - select { - case msg := <-c.Messages(): - b, err := strconv.Atoi(string(msg.Body)) - if err != nil { - t.Error(err) - } - buckets[b]++ - msg.Finish() - case <-deadline.C: - t.Error("timeout") + if err := p.DeferredPublish(delay, []byte(strconv.Itoa(i))); err != nil { + t.Error(err) return } } - for i, b := range buckets { - if b != 1 { - t.Errorf("bucket at index %d has value %d", i, b) - } + publishEnd := time.Now() + + // Margin of error: 1*time.Second + delayTimer := time.NewTimer(delay - publishEnd.Sub(publishStart) - 1*time.Second) + defer delayTimer.Stop() + + select { + case _ = <-c.Messages(): + t.Error("received deferred message early (before delay time passed)") + return + case <-delayTimer.C: + } + + // Margin of error: 5*time.Second + deadline := time.NewTimer(delay - time.Now().Sub(publishEnd) + 5*time.Second) + defer deadline.Stop() + + err := consumeAndCheckMessages(c, count, deadline) + if err != nil { + t.Error(err) } }) }