Skip to content

Commit c499ac7

Browse files
committed
Add implementation for the DPUB command
1 parent ff4eef9 commit c499ac7

File tree

4 files changed

+150
-0
lines changed

4 files changed

+150
-0
lines changed

command.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ func ReadCommand(r *bufio.Reader) (cmd Command, err error) {
5757
return readPub(line[4:], r)
5858
}
5959

60+
if strings.HasPrefix(line, "DPUB ") {
61+
return readDPub(line[5:], r)
62+
}
63+
6064
if strings.HasPrefix(line, "MPUB ") {
6165
return readMPub(line[5:], r)
6266
}

dpub.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package nsq
2+
3+
import (
4+
"bufio"
5+
"encoding/binary"
6+
"io"
7+
"strconv"
8+
"time"
9+
10+
"github.com/pkg/errors"
11+
)
12+
13+
// DPub represents the DPUB command.
14+
type DPub struct {
15+
// Topic must be set to the name of the topic to which the message will be
16+
// published.
17+
Topic string
18+
19+
// Delay is the duration NSQ will defer the message before sending it to a
20+
// client.
21+
Delay time.Duration
22+
23+
// Message is the raw message to publish.
24+
Message []byte
25+
}
26+
27+
// Name returns the name of the command in order to satisfy the Command
28+
// interface.
29+
func (c DPub) Name() string {
30+
return "DPUB"
31+
}
32+
33+
// Write serializes the command to the given buffered output, satisfies the
34+
// Command interface.
35+
func (c DPub) Write(w *bufio.Writer) (err error) {
36+
for _, s := range [...]string{
37+
"DPUB ",
38+
c.Topic,
39+
" ",
40+
strconv.FormatUint(uint64(c.Delay/time.Millisecond), 10),
41+
"\n",
42+
} {
43+
if _, err = w.WriteString(s); err != nil {
44+
err = errors.Wrap(err, "writing DPUB command")
45+
return
46+
}
47+
}
48+
49+
if err = binary.Write(w, binary.BigEndian, uint32(len(c.Message))); err != nil {
50+
err = errors.Wrap(err, "writing DPUB message size")
51+
return
52+
}
53+
54+
if _, err = w.Write(c.Message); err != nil {
55+
err = errors.Wrap(err, "writing DPUB message data")
56+
return
57+
}
58+
59+
return
60+
}
61+
62+
func readDPub(line string, r *bufio.Reader) (cmd DPub, err error) {
63+
var topic string
64+
var delayStr string
65+
var delayMsecs uint64
66+
var size uint32
67+
var data []byte
68+
69+
topic, line = readNextWord(line)
70+
delayStr, line = readNextWord(line)
71+
72+
if len(topic) == 0 {
73+
err = errors.New("missing topic in DPUB command")
74+
return
75+
}
76+
77+
if len(delayStr) == 0 {
78+
err = errors.New("missing delay in DPUB command")
79+
return
80+
}
81+
82+
if len(line) != 0 {
83+
err = errors.New("too many arguments found in DPUB command")
84+
return
85+
}
86+
87+
if err = binary.Read(r, binary.BigEndian, &size); err != nil {
88+
err = errors.Wrap(err, "reading DPUB message size")
89+
return
90+
}
91+
92+
data = make([]byte, int(size))
93+
94+
if _, err = io.ReadFull(r, data); err != nil {
95+
err = errors.Wrap(err, "reading DPUB message data")
96+
return
97+
}
98+
99+
if delayMsecs, err = strconv.ParseUint(delayStr, 10, 64); err != nil {
100+
err = errors.Wrap(err, "reading DPUB delay")
101+
return
102+
}
103+
104+
cmd = DPub{
105+
Topic: topic,
106+
Delay: time.Duration(delayMsecs) * time.Millisecond,
107+
Message: data,
108+
}
109+
return
110+
}

dpub_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package nsq
2+
3+
import (
4+
"testing"
5+
"time"
6+
)
7+
8+
func TestDPub(t *testing.T) {
9+
tests := []struct {
10+
topic string
11+
delay time.Duration
12+
message string
13+
}{
14+
{
15+
topic: "A",
16+
delay: 0,
17+
message: "",
18+
},
19+
{
20+
topic: "B",
21+
delay: 1 * time.Second,
22+
message: "Hello World!",
23+
},
24+
}
25+
26+
for _, test := range tests {
27+
t.Run("topic:"+test.topic, func(t *testing.T) {
28+
testCommand(t, "DPUB", DPub{
29+
Topic: test.topic,
30+
Delay: test.delay,
31+
Message: []byte(test.message),
32+
})
33+
})
34+
}
35+
}

error.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ const (
1818
ErrBadChannel Error = "E_BAD_CHANNEL"
1919
ErrBadMessage Error = "E_BAD_MESSAGE"
2020
ErrPubFailed Error = "E_PUB_FAILED"
21+
ErrDPubFailed Error = "E_DPUB_FAILED"
2122
ErrMPubFailed Error = "E_MPUB_FAILED"
2223
ErrFinFailed Error = "E_FIN_FAILED"
2324
ErrReqFailed Error = "E_REQ_FAILED"

0 commit comments

Comments
 (0)