-
-
Notifications
You must be signed in to change notification settings - Fork 307
FEATURE: [core] Implement KLine builder, emitting kline events by market trades #2002
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
63c04cf
to
a1c3e7f
Compare
I just be told we have |
ccab19d
to
9572241
Compare
It's ready for review. |
6125959
to
bef40b1
Compare
1d6371a
to
2df742e
Compare
pkg/core/klinebuilder.go
Outdated
for interval := range c.intervals { | ||
if interval.Duration()%minDuration != 0 { | ||
c.logger.Errorf("interval %s is not a multiple of minInterval %s", interval, c.minInterval) | ||
panic(interval) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use fmt.Errorf and pass the error to panic instead of Errorf logging here
it's a logical exception (user configuration faults)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
pkg/core/klinebuilder.go
Outdated
} | ||
} | ||
c.logger.Infof("kline updater started for %s (%+v)", c.symbol, c.intervals) | ||
c.updateTime = types.Time(time.Now()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can't use time.Now when in replay mode (back testing), it could mess up the "real" data time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is just a kick-start value of the update time.
I'll move this line to update
.
And in backtest mode, I will not start the update
goroutine so it won't mess up the real data then.
9c5b6cf
to
f5786c6
Compare
pkg/core/klinebuilder.go
Outdated
if !c.high.IsZero() { | ||
if c.price.Compare(c.high) > 0 { | ||
c.high = c.price | ||
} | ||
} else { | ||
c.high = c.price | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be simplified
c.high = math.Max(c.high, c.price) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, yeah.
Just like my comment below, the high
will be zero when the builder is reset.
I was thinking it's more clear to do the update with a if
here.
But you are right, it can be simplified.
pkg/core/klinebuilder.go
Outdated
if !c.low.IsZero() { | ||
if c.price.Compare(c.low) < 0 { | ||
c.low = c.price | ||
} | ||
} else { | ||
c.low = c.price | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
c.low = math.Min(c.low, c.price)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice.
Fixing it.
pkg/core/klinebuilder.go
Outdated
c.low = c.price | ||
} | ||
} else { | ||
c.low = c.price |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When is "low" equal to 0??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
low
, high
, ...etc will be zero when the builder is being "reset".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the builder is reset, we should use whatever the price is as its low.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add a comment on it for clarity.
@@ -25,6 +26,20 @@ const ( | |||
balanceChannel types.Channel = "balance" | |||
) | |||
|
|||
// minInterval: for the kline updater | |||
var minInterval = types.Interval("1s") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use const
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the init()
, it will be updated according to supported intervals on Coinbase.
In this way, if there is any updates to the supported intervals on Coinbase, we won't need to change the min interval here.
It will be updated accordingly.
pkg/core/klinebuilder.go
Outdated
klineMinInterval := types.KLine{ | ||
Symbol: c.symbol, | ||
StartTime: types.Time(startTime), | ||
EndTime: types.Time(tickTime), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tickTime.Add(-1 millisecond)?
The start time and end time cannot overlap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to do that.
startTime := tickTime.Add(-1 * intervalDuration).Round(intervalDuration)
Since .Round(...)
will round the time to it's neareast multiple of the given duration, startTime
will not overlap with tickTime
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I don't realize what you mean until now.
You mean the next kline start time should not overlap with the end time of the previous one right?
I will update the end time as expected end time minus one millisecond.
pkg/core/klinebuilder.go
Outdated
kline.Merge(klineMinInterval) | ||
} | ||
kline.Interval = interval | ||
expectEndTime := kline.StartTime.Time().Add(interval.Duration()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-1 time.millisecond?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we don't need to do that here.
Ideally, kline
should end at a timestamp which is a multiple of the interval
in the loop.
expectEndTime
here is the ideal end time of the kline
.
So basically, kline.Closed = ...
is checking:
- if the
kline.EndTime
, which in fact is the ticker time, is afterexpectEndTime
. - or the
lastUpdateTime
, which is the time of the last market trade, is afterexpectEndTime
.
Either case, the kline
should be closed for that interval.
93c7052
to
86695dd
Compare
1076b32
to
e870a77
Compare
e870a77
to
b839fd0
Compare
pkg/bbgo/environment.go
Outdated
@@ -49,6 +50,7 @@ var BackTestService *service.BacktestService | |||
func SetBackTesting(s *service.BacktestService) { | |||
BackTestService = s | |||
IsBackTesting = s != nil | |||
coinbase.IsBackTesting = IsBackTesting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why adding a IsBackTestung flag in coinbase?
// - symbol: symbol to trace on | ||
// - minInterval: unit interval, related to your signal timeframe. | ||
// All the supported intervals of the binding stream should be multiple of this interval. | ||
func NewKLineBuilder(symbol string, minInterval types.Interval, isBackTesting bool) *kLineBuilder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you don't need to pass isBackTesting to this builder, we just need to make this component backtest-able by ensuring not using async in the process
} | ||
} | ||
|
||
func (kb *kLineBuilder) Subscribe(interval types.Interval) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
call this Subscribe as "AddInterval" and let it return error
} | ||
|
||
func (kb *kLineBuilder) BindStream(stream types.Stream) { | ||
if kb.isBackTesting { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't handle backtest like this, would prefer pull out this logic to another layer
Close: trade.Price, | ||
High: trade.Price, | ||
Low: trade.Price, | ||
EndTime: types.Time(time.Now()), // use current time to fix the timezone issue (temporary) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the back test mode, we don't have use "time.Now()" because it's a simulation
queryEndTime := queryStartTime.Add(2 * kline.Interval.Duration()) | ||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | ||
defer cancel() | ||
klines, err := kb.exchange.QueryKLines( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can't call QueryKlines in the back-test mode, and it seems like you're mixing 2 difference scenario in the same component?
queryStartTime := kline.StartTime.Time().Add(-kline.Interval.Duration()) | ||
queryEndTime := queryStartTime.Add(2 * kline.Interval.Duration()) | ||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | ||
defer cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
be careful of the defer, it will not be called at the end of loop, it's executed at the end of function call.
func (kb *kLineBuilder) run(ctx context.Context) { | ||
startTime := time.Now() | ||
startTimeTruncated := startTime.Truncate(kb.minInterval.Duration()) | ||
intervalDuration := kb.minInterval.Duration() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since minInterval's purpose is different (for the ticker), we can rename it to tickerInterval, and ensures all the intervals we used are greater than the ticker interval.
} | ||
} | ||
|
||
// Reset will reset the kline of the given interval to a new open kline with the given start time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doc comment uses simple form, "Reset resets"
|
||
} | ||
|
||
func updateKlineInPlace(existing, delta *types.KLine) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not just pass types.Trade ? because you only need the price and the volume
return lastKLine, ok | ||
} | ||
|
||
func (kb *kLineBuilderState) AddTrade(trade types.Trade) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when AddTrade is called, you can return the updated klines to the caller, and these updated klines can be pushed to the stream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e.g.
itv1 --------|---itv2-------|---itv3
td1-td2-----|---td3-------|---td4
itv* is the ticker interval, not the kline interval, the ticker interval should be smaller than all the kline intervals, which is for pushing the updated kline periodically.
when td3 is received, we need to return "closed k1" and "new k2"
No description provided.