Skip to content

Commit e97aa86

Browse files
authored
Merge pull request #1987 from c9s/dboy/coinbase-user-market-stream
REFACTOR: [coinbase] Revise User/Market data stream connection logic
2 parents 7708d52 + 6339ee1 commit e97aa86

File tree

2 files changed

+76
-51
lines changed

2 files changed

+76
-51
lines changed

pkg/exchange/coinbase/stream.go

+13
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@ var logStream = logrus.WithFields(logrus.Fields{
2121
"module": "stream",
2222
})
2323

24+
var (
25+
// interface implementations compile-time check
26+
_ types.PrivateChannelSymbolSetter = (*Stream)(nil)
27+
_ types.Stream = (*Stream)(nil)
28+
)
29+
2430
//go:generate callbackgen -type Stream
2531
type Stream struct {
2632
types.StandardStream
@@ -54,6 +60,8 @@ type Stream struct {
5460

5561
lockWorkingOrderMap sync.Mutex // lock to protect lastOrderMap
5662
workingOrdersMap map[string]types.Order
63+
64+
privateChannelSymbols []string
5765
}
5866

5967
func NewStream(
@@ -95,6 +103,11 @@ func NewStream(
95103
return &s
96104
}
97105

106+
// types.PrivateChannelSymbolSetter
107+
func (s *Stream) SetPrivateChannelSymbols(symbols []string) {
108+
s.privateChannelSymbols = symbols
109+
}
110+
98111
func logSubscriptions(m *SubscriptionsMessage) {
99112
if m == nil {
100113
return

pkg/exchange/coinbase/stream_handlers.go

+63-51
Original file line numberDiff line numberDiff line change
@@ -77,58 +77,56 @@ func (msg subscribeMsgType2) String() string {
7777
}
7878

7979
func (s *Stream) handleConnect() {
80-
// subscribe to channels
81-
if len(s.Subscriptions) == 0 {
82-
return
83-
}
84-
// bridge bbgo channels to coinbase channels
85-
// auth required: level2, full, user
8680
subProductsMap := make(map[types.Channel][]string)
87-
allProductsMap := make(map[string]struct{})
88-
for _, sub := range s.Subscriptions {
89-
localSymbol := toLocalSymbol(sub.Symbol)
90-
switch sub.Channel {
91-
case types.BookChannel:
92-
// bridge to level2 channel, which provides order book snapshot and book updates
93-
logStream.Infof("bridge %s to level2_batch channel (%s)", sub.Channel, sub.Symbol)
94-
subProductsMap[level2BatchChannel] = append(subProductsMap[level2Channel], localSymbol)
95-
case types.MarketTradeChannel:
96-
// full: all orders/trades on Coinbase Exchange
97-
if !s.PublicOnly {
98-
panic("subscribe to market trade channel for a public stream is not allowed")
99-
}
100-
subProductsMap[matchesChannel] = append(subProductsMap[matchesChannel], localSymbol)
101-
logStream.Infof("bridge %s to %s(%s)", sub.Channel, matchesChannel, localSymbol)
102-
case types.BookTickerChannel:
103-
// ticker channel provides feeds on best bid/ask prices
104-
subProductsMap[tickerChannel] = append(subProductsMap[tickerChannel], localSymbol)
105-
logStream.Infof("bridge %s to %s(%s)", sub.Channel, tickerChannel, localSymbol)
106-
case types.KLineChannel:
107-
// TODO: add support to kline channel
108-
case types.AggTradeChannel, types.ForceOrderChannel, types.MarkPriceChannel, types.LiquidationOrderChannel, types.ContractInfoChannel:
109-
logStream.Warnf("coinbase stream does not support subscription to %s, skipped", sub.Channel)
110-
default:
111-
// rfqMatchChannel allow empty symbol
112-
if sub.Channel != rfqMatchChannel && sub.Channel != statusChannel && len(sub.Symbol) == 0 {
113-
logStream.Warnf("do not support subscription to %s without symbol, skipped", sub.Channel)
114-
continue
115-
}
116-
subProductsMap[sub.Channel] = append(subProductsMap[sub.Channel], localSymbol)
117-
}
118-
}
11981

120-
for _, products := range subProductsMap {
121-
for _, product := range products {
122-
allProductsMap[product] = struct{}{}
123-
}
124-
}
12582
// user data strea, subscribe to user channel for the user order/trade updates
12683
if !s.PublicOnly {
12784
if !s.authEnabled {
12885
panic("user channel requires authentication")
12986
}
130-
for product := range allProductsMap {
131-
subProductsMap[userChannel] = append(subProductsMap[userChannel], product)
87+
// subscribe private symbols to user channel
88+
// Once subscribe to the user channel, it will receive events for the following types:
89+
// - order life cycle events: receive, open, done, change, activate(for stop orders)
90+
// - order match
91+
subProductsMap[userChannel] = append(subProductsMap[userChannel], s.privateChannelSymbols...)
92+
} else {
93+
// market data stream: subscribe to channels
94+
if len(s.Subscriptions) == 0 {
95+
return
96+
}
97+
// bridge bbgo channels to coinbase channels
98+
// auth required: level2, full, user
99+
for _, sub := range s.Subscriptions {
100+
localSymbol := toLocalSymbol(sub.Symbol)
101+
switch sub.Channel {
102+
case types.BookChannel:
103+
// bridge to level2 channel, which provides order book snapshot and book updates
104+
logStream.Infof("bridge %s to level2_batch channel (%s)", sub.Channel, sub.Symbol)
105+
subProductsMap[level2BatchChannel] = append(subProductsMap[level2Channel], localSymbol)
106+
case types.MarketTradeChannel:
107+
// matches: all trades
108+
if !s.PublicOnly {
109+
panic("subscribe to market trade channel for a public stream is not allowed")
110+
}
111+
subProductsMap[matchesChannel] = append(subProductsMap[matchesChannel], localSymbol)
112+
logStream.Infof("bridge %s to %s(%s)", sub.Channel, matchesChannel, localSymbol)
113+
case types.BookTickerChannel:
114+
// ticker channel provides feeds on best bid/ask prices
115+
subProductsMap[tickerChannel] = append(subProductsMap[tickerChannel], localSymbol)
116+
logStream.Infof("bridge %s to %s(%s)", sub.Channel, tickerChannel, localSymbol)
117+
case types.KLineChannel:
118+
// TODO: add support to kline channel
119+
// kline stream is available on Advanced Trade API only: https://docs.cdp.coinbase.com/advanced-trade/docs/ws-channels#candles-channel
120+
case types.AggTradeChannel, types.ForceOrderChannel, types.MarkPriceChannel, types.LiquidationOrderChannel, types.ContractInfoChannel:
121+
logStream.Warnf("coinbase stream does not support subscription to %s, skipped", sub.Channel)
122+
default:
123+
// rfqMatchChannel allow empty symbol
124+
if sub.Channel != rfqMatchChannel && sub.Channel != statusChannel && len(sub.Symbol) == 0 {
125+
logStream.Warnf("do not support subscription to %s without symbol, skipped", sub.Channel)
126+
continue
127+
}
128+
subProductsMap[sub.Channel] = append(subProductsMap[sub.Channel], localSymbol)
129+
}
132130
}
133131
}
134132

@@ -254,13 +252,15 @@ func (s *Stream) handleConnect() {
254252
ProductIDs: productIDs,
255253
},
256254
},
257-
258-
authMsg: authMsg{
255+
}
256+
if v, _ := subCmd.(subscribeMsgType1); s.authEnabled {
257+
v.authMsg = authMsg{
259258
Signature: signature,
260259
Key: s.apiKey,
261260
Passphrase: s.passphrase,
262261
Timestamp: ts,
263-
},
262+
}
263+
subCmd = v
264264
}
265265
}
266266
subCmds = append(subCmds, subCmd)
@@ -277,11 +277,14 @@ func (s *Stream) handleConnect() {
277277
s.clearSequenceNumber()
278278
s.clearWorkingOrders()
279279
go func() {
280+
if s.PublicOnly {
281+
return
282+
}
280283
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
281284
defer cancel()
282285

283286
// emit balance snapshot on connection
284-
// query account balances
287+
// query account balances for user stream
285288
balances, err := s.exchange.QueryAccountBalances(ctx)
286289
if err != nil {
287290
logStream.WithError(err).Warn("failed to query account balances, the balance snapshot is initialized with empty balances")
@@ -305,6 +308,7 @@ func (s *Stream) handleConnect() {
305308
}
306309
s.updateWorkingOrders(openOrders...)
307310
}
311+
308312
}()
309313
}
310314

@@ -314,7 +318,10 @@ func (s *Stream) handleDisconnect() {
314318
s.clearWorkingOrders()
315319
}
316320

321+
// Local Handlers: handlers that deal with the messages from the Coinbase WebSocket
322+
317323
// ticker update (real-time price update when there is a match)
324+
// To receive ticker messages, you need to subscribe bbgo BookTickerChannel
318325
func (s *Stream) handleTickerMessage(msg *TickerMessage) {
319326
// ignore outdated messages
320327
if !s.checkAndUpdateSequenceNumber(msg.Type, msg.ProductID, msg.Sequence) {
@@ -331,6 +338,7 @@ func (s *Stream) handleTickerMessage(msg *TickerMessage) {
331338
}
332339

333340
// matches channel (or match message from full/user channel)
341+
// To receive match messages, you need to subscribe bbgo MarketTradeChannel on a public stream
334342
func (s *Stream) handleMatchMessage(msg *MatchMessage) {
335343
if msg.Type == "last_match" {
336344
// TODO: fetch missing trades from the REST API and emit them
@@ -350,7 +358,9 @@ func (s *Stream) handleMatchMessage(msg *MatchMessage) {
350358
}
351359
}
352360

353-
// level2 handlers
361+
// level2 handlers: order book snapshot and order book updates
362+
// To receive order book updates, you need to subscribe bbgo BookChannel
363+
// level2 order book snapshot handler
354364
func (s *Stream) handleOrderBookSnapshotMessage(msg *OrderBookSnapshotMessage) {
355365
symbol := toGlobalSymbol(msg.ProductID)
356366
var bids types.PriceVolumeSlice
@@ -382,6 +392,7 @@ func (s *Stream) handleOrderBookSnapshotMessage(msg *OrderBookSnapshotMessage) {
382392
s.EmitBookSnapshot(book)
383393
}
384394

395+
// level2 order book update handler
385396
func (s *Stream) handleOrderbookUpdateMessage(msg *OrderBookUpdateMessage) {
386397
var bids types.PriceVolumeSlice
387398
var asks types.PriceVolumeSlice
@@ -422,7 +433,8 @@ func (s *Stream) handleOrderbookUpdateMessage(msg *OrderBookUpdateMessage) {
422433
}
423434
}
424435

425-
// order update (full or user channel)
436+
// full or user channel: all order updates
437+
// a private stream will automatically subscribe to the user channel
426438
func (s *Stream) handleReceivedMessage(msg *ReceivedMessage) {
427439
if !s.checkAndUpdateSequenceNumber(msg.Type, msg.ProductID, msg.Sequence) {
428440
return

0 commit comments

Comments
 (0)