Skip to content

Commit 8a26747

Browse files
committed
✨ Implement kline subscription handling: Add kline updater and context management for kline events
1 parent fa3d0cc commit 8a26747

File tree

5 files changed

+268
-5
lines changed

5 files changed

+268
-5
lines changed

pkg/core/trade_kline_converter.go

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
package core
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"github.com/c9s/bbgo/pkg/fixedpoint"
9+
"github.com/c9s/bbgo/pkg/types"
10+
"github.com/sirupsen/logrus"
11+
)
12+
13+
//go:generate callbackgen -type marketTradeKLineConverter -output trade_kline_converter_callbacks.go
14+
type marketTradeKLineConverter struct {
15+
symbol string
16+
minInterval types.Interval
17+
intervals map[types.Interval]struct{}
18+
// klinesMap is a map from interval to accumulated kline of that interval
19+
klinesMap map[types.Interval]*types.KLine
20+
21+
kLineCallbacks []func(kline types.KLine)
22+
kLineClosedCallbacks []func(kline types.KLine)
23+
logger *logrus.Entry
24+
25+
// the following fields should be protected by lock when being updated
26+
mu sync.Mutex
27+
open, high, low, close, volume, quoteVolume, price fixedpoint.Value
28+
numTrades uint64
29+
}
30+
31+
// NewMarketKLineConverter: constructor of kLineUpdater
32+
// - symbol: symbol to trace on
33+
// - minInterval: unit interval, related to your signal timeframe.
34+
// All the supported intervals of the binding stream should be multiple of this interval.
35+
func NewMarketKLineConverter(symbol string, minInterval types.Interval) *marketTradeKLineConverter {
36+
logger := logrus.WithField("symbol", symbol)
37+
return &marketTradeKLineConverter{
38+
symbol: symbol,
39+
minInterval: minInterval,
40+
intervals: make(map[types.Interval]struct{}),
41+
klinesMap: make(map[types.Interval]*types.KLine),
42+
logger: logger,
43+
}
44+
}
45+
46+
func (c *marketTradeKLineConverter) Subscribe(interval types.Interval) {
47+
c.intervals[interval] = struct{}{}
48+
}
49+
50+
func (c *marketTradeKLineConverter) BindStream(stream types.Stream) {
51+
stream.OnMarketTrade(c.handleMarketTrade)
52+
}
53+
54+
func (c *marketTradeKLineConverter) handleMarketTrade(trade types.Trade) {
55+
c.mu.Lock()
56+
defer c.mu.Unlock()
57+
58+
c.price = trade.Price
59+
c.close = trade.Price
60+
if !c.high.IsZero() {
61+
if c.price.Compare(c.high) > 0 {
62+
c.high = c.price
63+
}
64+
} else {
65+
c.high = c.price
66+
}
67+
if !c.low.IsZero() {
68+
if c.price.Compare(c.low) < 0 {
69+
c.low = c.price
70+
}
71+
} else {
72+
c.low = c.price
73+
}
74+
if c.open.IsZero() {
75+
c.open = c.price
76+
}
77+
c.volume = c.volume.Add(trade.Quantity)
78+
c.quoteVolume = c.quoteVolume.Add(trade.QuoteQuantity)
79+
c.numTrades++
80+
}
81+
82+
func (c *marketTradeKLineConverter) Run(ctx context.Context) {
83+
if len(c.intervals) == 0 {
84+
return
85+
}
86+
minDuration := c.minInterval.Duration()
87+
for interval := range c.intervals {
88+
if interval.Duration()%minDuration != 0 {
89+
c.logger.Errorf("interval %s is not a multiple of minInterval %s", interval, c.minInterval)
90+
panic(interval)
91+
}
92+
}
93+
c.logger.Infof("kline updater started for %s (%+v)", c.symbol, c.intervals)
94+
go c.update(ctx)
95+
}
96+
97+
func (c *marketTradeKLineConverter) update(ctx context.Context) {
98+
// wait for the next interval
99+
startTime := time.Now()
100+
startTimeRound := startTime.Round(c.minInterval.Duration())
101+
waitDuration := startTimeRound.Sub(startTime)
102+
if waitDuration > 0 {
103+
select {
104+
case <-time.After(waitDuration):
105+
case <-ctx.Done():
106+
return
107+
}
108+
}
109+
// start ticker
110+
duration := c.minInterval.Duration()
111+
ticker := time.NewTicker(duration)
112+
defer ticker.Stop()
113+
114+
for {
115+
select {
116+
case <-ctx.Done():
117+
return
118+
case tickTime := <-ticker.C:
119+
startTime := tickTime.Add(-1 * duration).Truncate(duration)
120+
121+
// construct kline and reset converter
122+
c.mu.Lock()
123+
klineMinInterval := types.KLine{
124+
Symbol: c.symbol,
125+
StartTime: types.Time(startTime),
126+
EndTime: types.Time(tickTime),
127+
Interval: c.minInterval,
128+
Closed: true,
129+
}
130+
klineMinInterval.Open = c.open
131+
klineMinInterval.Close = c.close
132+
klineMinInterval.High = c.high
133+
klineMinInterval.Low = c.low
134+
klineMinInterval.Volume = c.volume
135+
klineMinInterval.QuoteVolume = c.quoteVolume
136+
klineMinInterval.NumberOfTrades = c.numTrades
137+
c.open = fixedpoint.Zero
138+
c.close = fixedpoint.Zero
139+
c.high = fixedpoint.Zero
140+
c.low = fixedpoint.Zero
141+
c.volume = fixedpoint.Zero
142+
c.quoteVolume = fixedpoint.Zero
143+
c.numTrades = 0
144+
c.mu.Unlock()
145+
146+
// update klines map
147+
c.updateKLinesMap(&klineMinInterval)
148+
}
149+
}
150+
}
151+
152+
func (c *marketTradeKLineConverter) updateKLinesMap(klineMinInterval *types.KLine) {
153+
for interval := range c.intervals {
154+
if interval == c.minInterval {
155+
c.EmitKLineClosed(*klineMinInterval)
156+
continue
157+
}
158+
kline, ok := c.klinesMap[interval]
159+
if !ok {
160+
// interval not in map and kline is nil
161+
kline = &types.KLine{}
162+
kline.Set(klineMinInterval)
163+
c.klinesMap[interval] = kline
164+
} else {
165+
kline.Merge(klineMinInterval)
166+
}
167+
kline.Interval = interval
168+
169+
expectEndTime := kline.StartTime.Time().Add(interval.Duration())
170+
kline.Closed = expectEndTime.Before(klineMinInterval.EndTime.Time())
171+
172+
if kline.Closed {
173+
c.EmitKLineClosed(*kline)
174+
delete(c.klinesMap, interval)
175+
} else {
176+
c.EmitKline(*kline)
177+
}
178+
}
179+
}

pkg/core/trade_kline_converter_callbacks.go

Lines changed: 27 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/exchange/coinbase/exchange.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,8 +382,11 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval type
382382
candles = append(candles, *candle)
383383
}
384384

385-
klines := make([]types.KLine, 0, len(candles))
385+
klines := make([]types.KLine, 0, options.Limit)
386386
for _, candle := range candles {
387+
if len(klines) >= options.Limit {
388+
break
389+
}
387390
kline := toGlobalKline(symbol, interval, &candle)
388391
klines = append(klines, kline)
389392
}

pkg/exchange/coinbase/stream.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ type Stream struct {
6262
workingOrdersMap map[string]types.Order
6363

6464
privateChannelSymbols []string
65+
66+
klineCtx context.Context
67+
klineCancel context.CancelFunc
6568
}
6669

6770
func NewStream(

pkg/exchange/coinbase/stream_handlers.go

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"strconv"
88
"time"
99

10+
"github.com/c9s/bbgo/pkg/core"
1011
"github.com/c9s/bbgo/pkg/fixedpoint"
1112
"github.com/c9s/bbgo/pkg/types"
1213
)
@@ -25,6 +26,19 @@ const (
2526
balanceChannel types.Channel = "balance"
2627
)
2728

29+
// minInterval: for the kline updater
30+
var minInterval = types.Interval("1s")
31+
32+
func init() {
33+
minSeconds := 0
34+
for interval, seconds := range supportedIntervalMap {
35+
if minSeconds == 0 || seconds <= minSeconds {
36+
minInterval = interval
37+
minSeconds = seconds
38+
}
39+
}
40+
}
41+
2842
type channelType struct {
2943
Name types.Channel `json:"name"`
3044
ProductIDs []string `json:"product_ids,omitempty"`
@@ -77,7 +91,10 @@ func (msg subscribeMsgType2) String() string {
7791
}
7892

7993
func (s *Stream) handleConnect() {
94+
s.klineCtx, s.klineCancel = context.WithCancel(context.Background())
95+
8096
subProductsMap := make(map[types.Channel][]string)
97+
klineOptionsMap := make(map[string][]types.SubscribeOptions)
8198

8299
// user data strea, subscribe to user channel for the user order/trade updates
83100
if !s.PublicOnly {
@@ -88,7 +105,13 @@ func (s *Stream) handleConnect() {
88105
// Once subscribe to the user channel, it will receive events for the following types:
89106
// - order life cycle events: receive, open, done, change, activate(for stop orders)
90107
// - order match
91-
subProductsMap[userChannel] = append(subProductsMap[userChannel], s.privateChannelSymbols...)
108+
if len(s.privateChannelSymbols) == 0 {
109+
panic("user channel requires private symbols")
110+
}
111+
for _, symbol := range s.privateChannelSymbols {
112+
localSymbol := toLocalSymbol(symbol)
113+
subProductsMap[userChannel] = append(subProductsMap[userChannel], localSymbol)
114+
}
92115
} else {
93116
// market data stream: subscribe to channels
94117
if len(s.Subscriptions) == 0 {
@@ -115,8 +138,10 @@ func (s *Stream) handleConnect() {
115138
subProductsMap[tickerChannel] = append(subProductsMap[tickerChannel], localSymbol)
116139
logStream.Infof("bridge %s to %s(%s)", sub.Channel, tickerChannel, localSymbol)
117140
case types.KLineChannel:
118-
// TODO: add support to kline channel
119141
// kline stream is available on Advanced Trade API only: https://docs.cdp.coinbase.com/advanced-trade/docs/ws-channels#candles-channel
142+
// We implement the subscription to kline channel by market trade feeds for now
143+
klineOptionsMap[sub.Symbol] = append(klineOptionsMap[sub.Symbol], sub.Options)
144+
subProductsMap[matchesChannel] = append(subProductsMap[matchesChannel], localSymbol)
120145
case types.AggTradeChannel, types.ForceOrderChannel, types.MarkPriceChannel, types.LiquidationOrderChannel, types.ContractInfoChannel:
121146
logStream.Warnf("coinbase stream does not support subscription to %s, skipped", sub.Channel)
122147
default:
@@ -129,7 +154,28 @@ func (s *Stream) handleConnect() {
129154
}
130155
}
131156
}
132-
157+
// kline events
158+
if len(klineOptionsMap) > 0 {
159+
for symbol, options := range klineOptionsMap {
160+
klUpdater := core.NewMarketKLineConverter(symbol, minInterval)
161+
for _, option := range options {
162+
if !s.exchange.IsSupportedInterval(option.Interval) {
163+
logStream.Errorf("unsupported interval %s for symbol %s", option.Interval, symbol)
164+
panic(option.Interval)
165+
}
166+
logStream.Infof("subscribe for kline events of symbol %s with interval %s", symbol, option.Interval)
167+
klUpdater.Subscribe(option.Interval)
168+
}
169+
klUpdater.BindStream(s)
170+
klUpdater.OnKline(func(kline types.KLine) {
171+
s.EmitKLine(kline)
172+
})
173+
klUpdater.OnKLineClosed(func(kline types.KLine) {
174+
s.EmitKLineClosed(kline)
175+
})
176+
klUpdater.Run(s.klineCtx)
177+
}
178+
}
133179
// do subscription
134180
var subCmds []any
135181
signature, ts := s.generateSignature()
@@ -270,7 +316,7 @@ func (s *Stream) handleConnect() {
270316
if err != nil {
271317
logStream.WithError(err).Errorf("subscription error: %s", subCmd)
272318
} else {
273-
logStream.Infof("subscribed to %s", subCmd)
319+
logStream.Infof("subscribe command: %s", subCmd)
274320
}
275321
}
276322

@@ -316,6 +362,11 @@ func (s *Stream) handleDisconnect() {
316362
// clear sequence numbers
317363
s.clearSequenceNumber()
318364
s.clearWorkingOrders()
365+
366+
// stop all kline workers
367+
if s.klineCtx != nil {
368+
s.klineCancel()
369+
}
319370
}
320371

321372
// Local Handlers: handlers that deal with the messages from the Coinbase WebSocket

0 commit comments

Comments
 (0)