Skip to content

Commit 9c5b6cf

Browse files
committed
✨ Implement kline subscription handling: Add kline updater and context management for kline events
1 parent fa3d0cc commit 9c5b6cf

File tree

6 files changed

+299
-5
lines changed

6 files changed

+299
-5
lines changed

pkg/bbgo/environment.go

+2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
"github.com/c9s/bbgo/pkg/envvar"
2424
"github.com/c9s/bbgo/pkg/exchange"
25+
"github.com/c9s/bbgo/pkg/exchange/coinbase"
2526
"github.com/c9s/bbgo/pkg/fixedpoint"
2627
"github.com/c9s/bbgo/pkg/interact"
2728
"github.com/c9s/bbgo/pkg/notifier/slacknotifier"
@@ -49,6 +50,7 @@ var BackTestService *service.BacktestService
4950
func SetBackTesting(s *service.BacktestService) {
5051
BackTestService = s
5152
IsBackTesting = s != nil
53+
coinbase.IsBackTesting = IsBackTesting
5254
}
5355

5456
var LoadedExchangeStrategies = make(map[string]SingleExchangeStrategy)

pkg/core/klinebuilder.go

+207
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
package core
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
"time"
8+
9+
"github.com/c9s/bbgo/pkg/fixedpoint"
10+
"github.com/c9s/bbgo/pkg/types"
11+
"github.com/sirupsen/logrus"
12+
)
13+
14+
// KLineBuilder is a kline builder for a symbol
15+
// It will accumulate trades and build klines
16+
//
17+
//go:generate callbackgen -type kLineBuilder
18+
type kLineBuilder struct {
19+
symbol string
20+
minInterval types.Interval
21+
intervals map[types.Interval]struct{}
22+
isBackTesting bool
23+
// klinesMap is a map from interval to accumulated kline of that interval
24+
klinesMap map[types.Interval]*types.KLine
25+
26+
kLineCallbacks []func(kline types.KLine)
27+
kLineClosedCallbacks []func(kline types.KLine)
28+
logger *logrus.Entry
29+
30+
// the following fields should be protected by lock when being updated
31+
mu sync.Mutex
32+
open, high, low, close, volume, quoteVolume, price fixedpoint.Value
33+
updateTime types.Time
34+
numTrades uint64
35+
}
36+
37+
// NewKLineBuilder: constructor of kLineBuilder
38+
// - symbol: symbol to trace on
39+
// - minInterval: unit interval, related to your signal timeframe.
40+
// All the supported intervals of the binding stream should be multiple of this interval.
41+
func NewKLineBuilder(symbol string, minInterval types.Interval, isBackTesting bool) *kLineBuilder {
42+
logger := logrus.WithField("symbol", symbol)
43+
return &kLineBuilder{
44+
symbol: symbol,
45+
minInterval: minInterval,
46+
intervals: make(map[types.Interval]struct{}),
47+
isBackTesting: isBackTesting,
48+
klinesMap: make(map[types.Interval]*types.KLine),
49+
logger: logger,
50+
}
51+
}
52+
53+
func (c *kLineBuilder) Subscribe(interval types.Interval) {
54+
c.intervals[interval] = struct{}{}
55+
}
56+
57+
func (c *kLineBuilder) BindStream(stream types.Stream) {
58+
if c.isBackTesting {
59+
stream.OnKLineClosed(c.handleKLineClosed)
60+
stream.OnKLine(c.handleKLine)
61+
return
62+
}
63+
stream.OnMarketTrade(c.handleMarketTrade)
64+
}
65+
66+
func (c *kLineBuilder) handleKLine(kline types.KLine) {
67+
c.EmitKLine(kline)
68+
}
69+
70+
func (c *kLineBuilder) handleKLineClosed(kline types.KLine) {
71+
c.EmitKLineClosed(kline)
72+
}
73+
74+
func (c *kLineBuilder) handleMarketTrade(trade types.Trade) {
75+
// the market data stream may subscribe to trades of multiple symbols
76+
// so we need to check if the trade is for the symbol we are interested in
77+
if trade.Symbol != c.symbol {
78+
return
79+
}
80+
c.mu.Lock()
81+
defer c.mu.Unlock()
82+
83+
c.price = trade.Price
84+
c.close = trade.Price
85+
if !c.high.IsZero() {
86+
if c.price.Compare(c.high) > 0 {
87+
c.high = c.price
88+
}
89+
} else {
90+
c.high = c.price
91+
}
92+
if !c.low.IsZero() {
93+
if c.price.Compare(c.low) < 0 {
94+
c.low = c.price
95+
}
96+
} else {
97+
c.low = c.price
98+
}
99+
if c.open.IsZero() {
100+
c.open = c.price
101+
}
102+
c.volume = c.volume.Add(trade.Quantity)
103+
c.quoteVolume = c.quoteVolume.Add(trade.QuoteQuantity)
104+
c.numTrades++
105+
c.updateTime = trade.Time
106+
}
107+
108+
func (c *kLineBuilder) Run(ctx context.Context) {
109+
if len(c.intervals) == 0 {
110+
return
111+
}
112+
minDuration := c.minInterval.Duration()
113+
for interval := range c.intervals {
114+
if interval.Duration()%minDuration != 0 {
115+
err := fmt.Errorf("interval %s is not a multiple of minInterval %s", interval, c.minInterval)
116+
c.logger.Error(err)
117+
panic(err)
118+
}
119+
}
120+
c.logger.Infof("kline updater started for %s (%+v)", c.symbol, c.intervals)
121+
go c.update(ctx)
122+
}
123+
124+
func (c *kLineBuilder) update(ctx context.Context) {
125+
// wait for the next interval
126+
startTime := time.Now()
127+
c.updateTime = types.Time(startTime) // kick-start update time
128+
startTimeRound := startTime.Round(c.minInterval.Duration())
129+
waitDuration := startTimeRound.Sub(startTime)
130+
if waitDuration > 0 {
131+
select {
132+
case <-time.After(waitDuration):
133+
case <-ctx.Done():
134+
return
135+
}
136+
}
137+
// start ticker
138+
intervalDuration := c.minInterval.Duration()
139+
ticker := time.NewTicker(intervalDuration)
140+
defer ticker.Stop()
141+
142+
for {
143+
select {
144+
case <-ctx.Done():
145+
return
146+
case tickTime := <-ticker.C:
147+
startTime := tickTime.Add(-1 * intervalDuration).Round(intervalDuration)
148+
149+
// construct kline and reset converter
150+
c.mu.Lock()
151+
lastTradeTime := c.updateTime
152+
klineMinInterval := types.KLine{
153+
Symbol: c.symbol,
154+
StartTime: types.Time(startTime),
155+
EndTime: types.Time(tickTime),
156+
Interval: c.minInterval,
157+
Closed: true,
158+
}
159+
klineMinInterval.Open = c.open
160+
klineMinInterval.Close = c.close
161+
klineMinInterval.High = c.high
162+
klineMinInterval.Low = c.low
163+
klineMinInterval.Volume = c.volume
164+
klineMinInterval.QuoteVolume = c.quoteVolume
165+
klineMinInterval.NumberOfTrades = c.numTrades
166+
c.open = fixedpoint.Zero
167+
c.close = fixedpoint.Zero
168+
c.high = fixedpoint.Zero
169+
c.low = fixedpoint.Zero
170+
c.volume = fixedpoint.Zero
171+
c.quoteVolume = fixedpoint.Zero
172+
c.numTrades = 0
173+
c.mu.Unlock()
174+
175+
// update klines map
176+
c.updateKLinesMap(&klineMinInterval, lastTradeTime)
177+
}
178+
}
179+
}
180+
181+
func (c *kLineBuilder) updateKLinesMap(klineMinInterval *types.KLine, lastUpdateTime types.Time) {
182+
for interval := range c.intervals {
183+
if interval == c.minInterval {
184+
c.EmitKLineClosed(*klineMinInterval)
185+
continue
186+
}
187+
kline, ok := c.klinesMap[interval]
188+
if !ok {
189+
// interval not in map and kline is nil
190+
kline = &types.KLine{}
191+
kline.Set(klineMinInterval)
192+
c.klinesMap[interval] = kline
193+
} else {
194+
kline.Merge(klineMinInterval)
195+
}
196+
kline.Interval = interval
197+
expectEndTime := kline.StartTime.Time().Add(interval.Duration())
198+
kline.Closed = expectEndTime.Before(klineMinInterval.EndTime.Time()) || lastUpdateTime.After(expectEndTime)
199+
200+
if kline.Closed {
201+
c.EmitKLineClosed(*kline)
202+
delete(c.klinesMap, interval)
203+
} else {
204+
c.EmitKLine(*kline)
205+
}
206+
}
207+
}

pkg/core/klinebuilder_callbacks.go

+27
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/exchange/coinbase/exchange.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -382,8 +382,11 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval type
382382
candles = append(candles, *candle)
383383
}
384384

385-
klines := make([]types.KLine, 0, len(candles))
385+
klines := make([]types.KLine, 0, options.Limit)
386386
for _, candle := range candles {
387+
if len(klines) >= options.Limit {
388+
break
389+
}
387390
kline := toGlobalKline(symbol, interval, &candle)
388391
klines = append(klines, kline)
389392
}

pkg/exchange/coinbase/stream.go

+3
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ type Stream struct {
6262
workingOrdersMap map[string]types.Order
6363

6464
privateChannelSymbols []string
65+
66+
klineCtx context.Context
67+
klineCancel context.CancelFunc
6568
}
6669

6770
func NewStream(

pkg/exchange/coinbase/stream_handlers.go

+56-4
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"strconv"
88
"time"
99

10+
"github.com/c9s/bbgo/pkg/core"
1011
"github.com/c9s/bbgo/pkg/fixedpoint"
1112
"github.com/c9s/bbgo/pkg/types"
1213
)
@@ -25,6 +26,20 @@ const (
2526
balanceChannel types.Channel = "balance"
2627
)
2728

29+
// minInterval: for the kline updater
30+
var minInterval = types.Interval("1s")
31+
var IsBackTesting = false
32+
33+
func init() {
34+
minSeconds := 0
35+
for interval, seconds := range supportedIntervalMap {
36+
if minSeconds == 0 || seconds <= minSeconds {
37+
minInterval = interval
38+
minSeconds = seconds
39+
}
40+
}
41+
}
42+
2843
type channelType struct {
2944
Name types.Channel `json:"name"`
3045
ProductIDs []string `json:"product_ids,omitempty"`
@@ -77,7 +92,10 @@ func (msg subscribeMsgType2) String() string {
7792
}
7893

7994
func (s *Stream) handleConnect() {
95+
s.klineCtx, s.klineCancel = context.WithCancel(context.Background())
96+
8097
subProductsMap := make(map[types.Channel][]string)
98+
klineOptionsMap := make(map[string][]types.SubscribeOptions)
8199

82100
// user data strea, subscribe to user channel for the user order/trade updates
83101
if !s.PublicOnly {
@@ -88,7 +106,13 @@ func (s *Stream) handleConnect() {
88106
// Once subscribe to the user channel, it will receive events for the following types:
89107
// - order life cycle events: receive, open, done, change, activate(for stop orders)
90108
// - order match
91-
subProductsMap[userChannel] = append(subProductsMap[userChannel], s.privateChannelSymbols...)
109+
if len(s.privateChannelSymbols) == 0 {
110+
panic("user channel requires private symbols")
111+
}
112+
for _, symbol := range s.privateChannelSymbols {
113+
localSymbol := toLocalSymbol(symbol)
114+
subProductsMap[userChannel] = append(subProductsMap[userChannel], localSymbol)
115+
}
92116
} else {
93117
// market data stream: subscribe to channels
94118
if len(s.Subscriptions) == 0 {
@@ -115,8 +139,10 @@ func (s *Stream) handleConnect() {
115139
subProductsMap[tickerChannel] = append(subProductsMap[tickerChannel], localSymbol)
116140
logStream.Infof("bridge %s to %s(%s)", sub.Channel, tickerChannel, localSymbol)
117141
case types.KLineChannel:
118-
// TODO: add support to kline channel
119142
// kline stream is available on Advanced Trade API only: https://docs.cdp.coinbase.com/advanced-trade/docs/ws-channels#candles-channel
143+
// We implement the subscription to kline channel by market trade feeds for now
144+
klineOptionsMap[sub.Symbol] = append(klineOptionsMap[sub.Symbol], sub.Options)
145+
subProductsMap[matchesChannel] = append(subProductsMap[matchesChannel], localSymbol)
120146
case types.AggTradeChannel, types.ForceOrderChannel, types.MarkPriceChannel, types.LiquidationOrderChannel, types.ContractInfoChannel:
121147
logStream.Warnf("coinbase stream does not support subscription to %s, skipped", sub.Channel)
122148
default:
@@ -129,7 +155,28 @@ func (s *Stream) handleConnect() {
129155
}
130156
}
131157
}
132-
158+
// kline events
159+
if len(klineOptionsMap) > 0 {
160+
for symbol, options := range klineOptionsMap {
161+
klConverter := core.NewKLineBuilder(symbol, minInterval, IsBackTesting)
162+
for _, option := range options {
163+
if !s.exchange.IsSupportedInterval(option.Interval) {
164+
logStream.Errorf("unsupported interval %s for symbol %s", option.Interval, symbol)
165+
panic(option.Interval)
166+
}
167+
logStream.Infof("subscribe for kline events of symbol %s with interval %s", symbol, option.Interval)
168+
klConverter.Subscribe(option.Interval)
169+
}
170+
klConverter.BindStream(s)
171+
klConverter.OnKLine(func(kline types.KLine) {
172+
s.EmitKLine(kline)
173+
})
174+
klConverter.OnKLineClosed(func(kline types.KLine) {
175+
s.EmitKLineClosed(kline)
176+
})
177+
klConverter.Run(s.klineCtx)
178+
}
179+
}
133180
// do subscription
134181
var subCmds []any
135182
signature, ts := s.generateSignature()
@@ -270,7 +317,7 @@ func (s *Stream) handleConnect() {
270317
if err != nil {
271318
logStream.WithError(err).Errorf("subscription error: %s", subCmd)
272319
} else {
273-
logStream.Infof("subscribed to %s", subCmd)
320+
logStream.Infof("subscribe command: %s", subCmd)
274321
}
275322
}
276323

@@ -316,6 +363,11 @@ func (s *Stream) handleDisconnect() {
316363
// clear sequence numbers
317364
s.clearSequenceNumber()
318365
s.clearWorkingOrders()
366+
367+
// stop all kline workers
368+
if s.klineCancel != nil {
369+
s.klineCancel()
370+
}
319371
}
320372

321373
// Local Handlers: handlers that deal with the messages from the Coinbase WebSocket

0 commit comments

Comments
 (0)