Skip to content

Deferred publishing #38

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func ReadCommand(r *bufio.Reader) (cmd Command, err error) {
return readPub(line[4:], r)
}

if strings.HasPrefix(line, "DPUB ") {
return readDPub(line[5:], r)
}

if strings.HasPrefix(line, "MPUB ") {
return readMPub(line[5:], r)
}
Expand Down
110 changes: 110 additions & 0 deletions dpub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package nsq

import (
"bufio"
"encoding/binary"
"io"
"strconv"
"time"

"github.com/pkg/errors"
)

// DPub represents the DPUB command.
type DPub struct {
// Topic must be set to the name of the topic to which the message will be
// published.
Topic string

// Delay is the duration NSQ will defer the message before sending it to a
// client.
Delay time.Duration

// Message is the raw message to publish.
Message []byte
}

// Name returns the name of the command in order to satisfy the Command
// interface.
func (c DPub) Name() string {
return "DPUB"
}

// Write serializes the command to the given buffered output, satisfies the
// Command interface.
func (c DPub) Write(w *bufio.Writer) (err error) {
for _, s := range [...]string{
"DPUB ",
c.Topic,
" ",
strconv.FormatUint(uint64(c.Delay/time.Millisecond), 10),
"\n",
} {
if _, err = w.WriteString(s); err != nil {
err = errors.Wrap(err, "writing DPUB command")
return
}
}

if err = binary.Write(w, binary.BigEndian, uint32(len(c.Message))); err != nil {
err = errors.Wrap(err, "writing DPUB message size")
return
}

if _, err = w.Write(c.Message); err != nil {
err = errors.Wrap(err, "writing DPUB message data")
return
}

return
}

func readDPub(line string, r *bufio.Reader) (cmd DPub, err error) {
var topic string
var delayStr string
var delayMsecs uint64
var size uint32
var data []byte

topic, line = readNextWord(line)
delayStr, line = readNextWord(line)

if len(topic) == 0 {
err = errors.New("missing topic in DPUB command")
return
}

if len(delayStr) == 0 {
err = errors.New("missing delay in DPUB command")
return
}

if len(line) != 0 {
err = errors.New("too many arguments found in DPUB command")
return
}

if err = binary.Read(r, binary.BigEndian, &size); err != nil {
err = errors.Wrap(err, "reading DPUB message size")
return
}

data = make([]byte, int(size))

if _, err = io.ReadFull(r, data); err != nil {
err = errors.Wrap(err, "reading DPUB message data")
return
}

if delayMsecs, err = strconv.ParseUint(delayStr, 10, 64); err != nil {
err = errors.Wrap(err, "reading DPUB delay")
return
}

cmd = DPub{
Topic: topic,
Delay: time.Duration(delayMsecs) * time.Millisecond,
Message: data,
}
return
}
35 changes: 35 additions & 0 deletions dpub_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package nsq

import (
"testing"
"time"
)

func TestDPub(t *testing.T) {
tests := []struct {
topic string
delay time.Duration
message string
}{
{
topic: "A",
delay: 0,
message: "",
},
{
topic: "B",
delay: 1 * time.Second,
message: "Hello World!",
},
}

for _, test := range tests {
t.Run("topic:"+test.topic, func(t *testing.T) {
testCommand(t, "DPUB", DPub{
Topic: test.topic,
Delay: test.delay,
Message: []byte(test.message),
})
})
}
}
1 change: 1 addition & 0 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
ErrBadChannel Error = "E_BAD_CHANNEL"
ErrBadMessage Error = "E_BAD_MESSAGE"
ErrPubFailed Error = "E_PUB_FAILED"
ErrDPubFailed Error = "E_DPUB_FAILED"
ErrMPubFailed Error = "E_MPUB_FAILED"
ErrFinFailed Error = "E_FIN_FAILED"
ErrReqFailed Error = "E_REQ_FAILED"
Expand Down
1 change: 1 addition & 0 deletions error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ func TestError(t *testing.T) {
ErrBadChannel,
ErrBadMessage,
ErrPubFailed,
ErrDPubFailed,
ErrMPubFailed,
ErrFinFailed,
ErrReqFailed,
Expand Down
12 changes: 12 additions & 0 deletions nsq.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"io/ioutil"
"net/http"
"net/url"
"strconv"
"time"

"github.com/pkg/errors"
)
Expand All @@ -28,6 +30,16 @@ func (c *Client) Publish(topic string, message []byte) (err error) {
return
}

func (c *Client) DeferredPublish(topic string, delay time.Duration, message []byte) (err error) {
delayStr := strconv.FormatUint(uint64(delay/time.Millisecond), 10)

_, err = c.do("POST", "/pub", url.Values{
"topic": []string{topic},
"defer": []string{delayStr},
}, message)
return
}

func (c *Client) MutliPublish(topic string, messages ...[]byte) (err error) {
_, err = c.do("POST", "/mpub", url.Values{
"topic": []string{topic},
Expand Down
52 changes: 44 additions & 8 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type Producer struct {
// producers.
type ProducerRequest struct {
Topic string
Delay time.Duration
Message []byte
Response chan<- error
Deadline time.Time
Expand Down Expand Up @@ -157,7 +158,7 @@ func (p *Producer) Stop() {
// Note that no retry is done internally, the producer will fail after the
// first unsuccessful attempt to publish the message. It is the responsibility
// of the caller to retry if necessary.
func (p *Producer) Publish(message []byte) (err error) {
func (p *Producer) Publish(message []byte) error {
return p.PublishTo(p.topic, message)
}

Expand All @@ -168,7 +169,33 @@ func (p *Producer) Publish(message []byte) (err error) {
// Note that no retry is done internally, the producer will fail after the
// first unsuccessful attempt to publish the message. It is the responsibility
// of the caller to retry if necessary.
func (p *Producer) PublishTo(topic string, message []byte) (err error) {
func (p *Producer) PublishTo(topic string, message []byte) error {
return p.sendProducerRequest(topic, 0, message)
}

// DeferredPublish sends a deferred message using the producer p, returning an
// error if it was already closed or if an error occurred while publishing the
// message.
//
// Note that no retry is done internally, the producer will fail after the
// first unsuccessful attempt to publish the message. It is the responsibility
// of the caller to retry if necessary.
func (p *Producer) DeferredPublish(delay time.Duration, message []byte) error {
return p.DeferredPublishTo(p.topic, delay, message)
}

// DeferredPublishTo sends a deferred message to a specific topic using the
// producer p, returning an error if it was already closed or if an error
// occurred while publishing the message.
//
// Note that no retry is done internally, the producer will fail after the
// first unsuccessful attempt to publish the message. It is the responsibility
// of the caller to retry if necessary.
func (p *Producer) DeferredPublishTo(topic string, delay time.Duration, message []byte) error {
return p.sendProducerRequest(topic, delay, message)
}

func (p *Producer) sendProducerRequest(topic string, delay time.Duration, message []byte) (err error) {
defer func() {
if recover() != nil {
err = errors.New("publishing to a producer that was already stopped")
Expand All @@ -186,6 +213,7 @@ func (p *Producer) PublishTo(topic string, message []byte) (err error) {
// it up.
p.reqs <- ProducerRequest{
Topic: topic,
Delay: delay,
Message: message,
Response: response,
Deadline: deadline,
Expand Down Expand Up @@ -297,7 +325,7 @@ func (p *Producer) run() {
continue
}

if err := p.publish(conn, req.Topic, req.Message); err != nil {
if err := p.publishMessage(conn, req.Topic, req.Delay, req.Message); err != nil {
req.complete(err)
shutdown(err)
continue
Expand Down Expand Up @@ -365,11 +393,19 @@ func (p *Producer) write(conn *Conn, cmd Command) (err error) {
return
}

func (p *Producer) publish(conn *Conn, topic string, message []byte) error {
return p.write(conn, Pub{
Topic: topic,
Message: message,
})
func (p *Producer) publishMessage(conn *Conn, topic string, delay time.Duration, message []byte) error {
if delay == 0 {
return p.write(conn, Pub{
Topic: topic,
Message: message,
})
} else {
return p.write(conn, DPub{
Topic: topic,
Delay: delay,
Message: message,
})
}
}

func (p *Producer) ping(conn *Conn) error {
Expand Down
Loading