Skip to content

Commit 697363f

Browse files
authored
feature(persistence): update persistence (#27)
* feature(persistence): update persistence * feature(persistence): update persistence * feature(persistence): update persistence * test: fix tests * feature(persistence): update persistence * feature(persistence): update persistence
1 parent 6b67b66 commit 697363f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+1370
-1728
lines changed

cmd/lighthouse/config.yaml

+35-1
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,43 @@ log:
66
filename: log.log
77
persistence:
88
session:
9-
type: memory
9+
type: redis
10+
# The redis configuration only take effect when type == redis.
11+
redis:
12+
# redis server address
13+
addr: "127.0.0.1:6379"
14+
# the maximum number of idle connections in the redis connection pool.
15+
max_idle: 1000
16+
# the maximum number of connections allocated by the redis connection pool at a given time.
17+
# If zero, there is no limit on the number of connections in the pool.
18+
max_active: 0
19+
# the connection idle timeout, connection will be closed after remaining idle for this duration. If the value is zero, then idle connections are not closed.
20+
idle_timeout: 240s
21+
password: ""
22+
# the number of the redis database.
23+
database: 0
24+
25+
timeout: 240s
1026
queue:
1127
type: memory
28+
subscription:
29+
type: redis
30+
redis:
31+
# redis server address
32+
addr: "127.0.0.1:6379"
33+
# the maximum number of idle connections in the redis connection pool.
34+
max_idle: 1000
35+
# the maximum number of connections allocated by the redis connection pool at a given time.
36+
# If zero, there is no limit on the number of connections in the pool.
37+
max_active: 0
38+
# the connection idle timeout, connection will be closed after remaining idle for this duration. If the value is zero, then idle connections are not closed.
39+
idle_timeout: 240s
40+
password: ""
41+
# the number of the redis database.
42+
database: 0
43+
44+
timeout: 240s
45+
1246
trace:
1347
name: lighthouse
1448
endpoint: http://localhost:14268/api/traces

cmd/lighthouse/main.go

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"github.com/yunqi/lighthouse/config"
77
_ "github.com/yunqi/lighthouse/internal/persistence/session/memory"
88
_ "github.com/yunqi/lighthouse/internal/persistence/session/redis"
9+
_ "github.com/yunqi/lighthouse/internal/persistence/subscription/memory"
10+
_ "github.com/yunqi/lighthouse/internal/persistence/subscription/redis"
911
"github.com/yunqi/lighthouse/internal/server"
1012
"github.com/yunqi/lighthouse/internal/xlog"
1113
"github.com/yunqi/lighthouse/internal/xtrace"

config/persistence.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ import "time"
44

55
type (
66
Persistence struct {
7-
Session StoreType `yaml:"session"`
8-
Queue StoreType `yaml:"queue"`
7+
Session StoreType `yaml:"session"`
8+
Subscription StoreType `yaml:"subscription"`
9+
Queue StoreType `yaml:"queue"`
910
}
1011

1112
StoreType struct {

internal/packet/fixed_header.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ type (
2222

2323
// FixedHeader represents the FixedHeader of the MQTT packet
2424
FixedHeader struct {
25-
PacketType PacketType
25+
PacketType Type
2626
Flags byte
2727
RemainLength int
2828
}

internal/packet/packet.go

+6-7
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ const (
3434
// SubscribeFailure 订阅失败
3535
SubscribeFailure = 0x80
3636

37-
MaxPacketID PacketId = 65535
38-
MinPacketID PacketId = 1
37+
MaxPacketID Id = 65535
38+
MinPacketID Id = 1
3939
// UTF8EncodedStringsMaxLen There is a limit on the size of a string that can be passed in one of these UTF-8 encoded string components; you cannot use a string that would encode to more than 65535 bytes.
4040
// http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_2.2_-
4141
// 2 byte = uint16
@@ -54,7 +54,7 @@ const (
5454
//Packet type
5555
const (
5656
// RESERVED Forbidden
57-
RESERVED PacketType = iota
57+
RESERVED Type = iota
5858
// CONNECT Client request to connect to Server
5959
CONNECT
6060
// CONNACK Connect acknowledgment
@@ -153,10 +153,10 @@ type (
153153
Version byte
154154
// QoS 消息质量
155155
QoS = byte
156-
// PacketId 数据包ID
157-
PacketId = uint16
156+
// Id 数据包ID
157+
Id = uint16
158158

159-
PacketType = byte
159+
Type = byte
160160

161161
// Packet defines the interface for structs intended to hold
162162
// decoded MQTT packets, either from being read or before being
@@ -168,7 +168,6 @@ type (
168168
Decode(r io.Reader) (err error)
169169
// String is mainly used in logging, debugging and testing.
170170
String() string
171-
//Context() context.Context
172171
}
173172
)
174173

internal/packet/pub.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
type BasePub struct {
2626
Version Version
2727
FixedHeader *FixedHeader
28-
PacketId PacketId
28+
PacketId Id
2929
}
3030

3131
func (bp *BasePub) decode(r io.Reader) (err error) {

internal/packet/puback.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ type (
3131
Puback struct {
3232
Version Version
3333
FixedHeader *FixedHeader
34-
PacketId PacketId
34+
PacketId Id
3535
}
3636
)
3737

internal/packet/pubcomp.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type (
2727
Pubcomp struct {
2828
Version Version
2929
FixedHeader *FixedHeader
30-
PacketId PacketId
30+
PacketId Id
3131
}
3232
)
3333

internal/packet/publish.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ type (
3737
QoS uint8 //qos等级
3838
Retain bool //是否保留消息
3939
TopicName []byte //主题名
40-
PacketId //报文标识符
40+
PacketId Id //报文标识符
4141
Payload []byte
4242
}
4343
)

internal/packet/pubrec.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ type (
3131
Pubrec struct {
3232
Version Version
3333
FixedHeader *FixedHeader
34-
PacketId PacketId
34+
PacketId Id
3535
}
3636
)
3737

internal/packet/pubrel.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type (
2727
Pubrel struct {
2828
Version Version
2929
FixedHeader *FixedHeader
30-
PacketId PacketId
30+
PacketId Id
3131
}
3232
)
3333

internal/packet/suback.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type (
3030
Suback struct {
3131
Version Version
3232
FixedHeader *FixedHeader
33-
PacketId PacketId
33+
PacketId Id
3434
Payload []code.Code
3535
}
3636
)

internal/packet/subscribe.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type (
2727
Subscribe struct {
2828
Version Version
2929
FixedHeader *FixedHeader
30-
PacketId PacketId
30+
PacketId Id
3131
Topics []*Topic //suback响应之前填充
3232
}
3333
)

internal/packet/unsuback.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type (
2828
Unsuback struct {
2929
Version Version
3030
FixedHeader *FixedHeader
31-
PacketId PacketId
31+
PacketId Id
3232
Payload []code.Code
3333
}
3434
)

internal/packet/unsubscribe.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type (
2727
Unsubscribe struct {
2828
Version Version
2929
FixedHeader *FixedHeader
30-
PacketId PacketId
30+
PacketId Id
3131
Topics []string
3232
}
3333
)

internal/persistence/message/binary/binary.go renamed to internal/persistence/message/encoding/binary.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package binary
17+
package encoding
1818

1919
import (
2020
"bytes"

internal/persistence/message/binary/binary_test.go renamed to internal/persistence/message/encoding/binary_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package binary
17+
package encoding
1818

1919
import (
2020
"bytes"
@@ -31,7 +31,7 @@ func TestEncodeMessage(t *testing.T) {
3131
Retained: false,
3232
Topic: "test",
3333
Payload: []byte("payload"),
34-
PacketId: packet.PacketId(11),
34+
PacketId: packet.Id(11),
3535
ContentType: "context/json",
3636
CorrelationData: []byte("1"),
3737
MessageExpiry: 1,

internal/persistence/message/message.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type (
2828
Retained bool
2929
Topic string
3030
Payload []byte
31-
PacketId packet.PacketId
31+
PacketId packet.Id
3232

3333
ContentType string
3434
CorrelationData []byte

internal/persistence/persistence.go

+16-10
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,17 @@ package persistence
1818

1919
import (
2020
"github.com/yunqi/lighthouse/internal/persistence/session"
21+
"github.com/yunqi/lighthouse/internal/persistence/subscription"
22+
)
23+
24+
const (
25+
Memory = "memory"
26+
Redis = "redis"
2127
)
2228

2329
var (
24-
sessionStores = map[string]session.NewStore{}
25-
//queueStores = map[string]SessionPersistence{}
30+
sessionStores = map[string]session.NewStore{}
31+
subscriptionStores = map[string]subscription.NewStore{}
2632
)
2733

2834
func RegisterSessionStore(name string, store session.NewStore) {
@@ -33,11 +39,11 @@ func GetSessionStore(name string) (store session.NewStore, ok bool) {
3339
return s, ok
3440
}
3541

36-
//
37-
//func RegisterQueueStore(name string, store queue.Store) {
38-
// queueStores[name] = store
39-
//}
40-
//func GetQueueStore(name string) (store queue.Store, ok bool) {
41-
// s, ok := queueStores[name]
42-
// return s, ok
43-
//}
42+
func RegisterSubscriptionStore(name string, store subscription.NewStore) {
43+
subscriptionStores[name] = store
44+
}
45+
46+
func GetSubscriptionStore(name string) (store subscription.NewStore, ok bool) {
47+
s, ok := subscriptionStores[name]
48+
return s, ok
49+
}

internal/persistence/queue/elem.go

+116
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package queue
2+
3+
import (
4+
"bytes"
5+
"encoding/binary"
6+
"errors"
7+
"github.com/chenquan/go-pkg/xbinary"
8+
"github.com/yunqi/lighthouse/internal/packet"
9+
"github.com/yunqi/lighthouse/internal/persistence/message"
10+
"github.com/yunqi/lighthouse/internal/persistence/message/encoding"
11+
"time"
12+
)
13+
14+
type Message interface {
15+
Id() packet.Id
16+
SetId(id packet.Id)
17+
}
18+
19+
type Publish struct {
20+
*message.Message
21+
}
22+
23+
func (p *Publish) Id() packet.Id {
24+
return p.PacketId
25+
}
26+
func (p *Publish) SetId(id packet.Id) {
27+
p.PacketId = id
28+
}
29+
30+
type Pubrel struct {
31+
PacketID packet.Id
32+
}
33+
34+
func (p *Pubrel) Id() packet.Id {
35+
return p.PacketID
36+
}
37+
func (p *Pubrel) SetId(id packet.Id) {
38+
p.PacketID = id
39+
}
40+
41+
// Element represents the element store in the queue.
42+
type Element struct {
43+
// At represents the entry time.
44+
At time.Time
45+
// Expiry represents the expiry time.
46+
// Empty means never expire.
47+
Expiry time.Time
48+
Message
49+
}
50+
51+
// Encode encodes the publish structure into bytes and write it to the buffer
52+
func (p *Publish) Encode(b *bytes.Buffer) {
53+
encoding.EncodeMessage(p.Message, b)
54+
}
55+
56+
func (p *Publish) Decode(b *bytes.Buffer) (err error) {
57+
msg, err := encoding.DecodeMessage(bytes.NewReader(b.Bytes()))
58+
if err != nil {
59+
return err
60+
}
61+
p.Message = msg
62+
return nil
63+
}
64+
65+
// Encode encode the pubrel structure into bytes.
66+
func (p *Pubrel) Encode(b *bytes.Buffer) {
67+
_ = xbinary.WriteUint16(b, p.PacketID)
68+
}
69+
70+
func (p *Pubrel) Decode(b *bytes.Buffer) (err error) {
71+
p.PacketID, err = xbinary.ReadUint16(b)
72+
return
73+
}
74+
75+
// Encode encode the elem structure into bytes.
76+
// Format: 8 byte timestamp | 1 byte identifier| data
77+
func (e *Element) Encode() []byte {
78+
b := bytes.NewBuffer(make([]byte, 0, 100))
79+
rs := make([]byte, 19)
80+
binary.BigEndian.PutUint64(rs[0:9], uint64(e.At.Unix()))
81+
binary.BigEndian.PutUint64(rs[9:18], uint64(e.Expiry.Unix()))
82+
switch m := e.Message.(type) {
83+
case *Publish:
84+
rs[18] = 0
85+
b.Write(rs)
86+
m.Encode(b)
87+
case *Pubrel:
88+
rs[18] = 1
89+
b.Write(rs)
90+
m.Encode(b)
91+
}
92+
return b.Bytes()
93+
}
94+
95+
func (e *Element) Decode(b []byte) (err error) {
96+
if len(b) < 19 {
97+
return errors.New("invalid input length")
98+
}
99+
e.At = time.Unix(int64(binary.BigEndian.Uint64(b[0:9])), 0)
100+
e.Expiry = time.Unix(int64(binary.BigEndian.Uint64(b[9:19])), 0)
101+
switch b[18] {
102+
case 0: // publish
103+
p := &Publish{}
104+
buf := bytes.NewBuffer(b[19:])
105+
err = p.Decode(buf)
106+
e.Message = p
107+
case 1: // pubrel
108+
p := &Pubrel{}
109+
buf := bytes.NewBuffer(b[19:])
110+
err = p.Decode(buf)
111+
e.Message = p
112+
default:
113+
return errors.New("invalid identifier")
114+
}
115+
return
116+
}

0 commit comments

Comments
 (0)