Skip to content

FEATURE: [core] Implement KLine builder, emitting kline events by market trades #2002

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

dboyliao
Copy link
Collaborator

No description provided.

@dboyliao dboyliao force-pushed the dboy/coinbase-kline branch 2 times, most recently from 63c04cf to a1c3e7f Compare April 24, 2025 05:27
@dboyliao
Copy link
Collaborator Author

I just be told we have SerialMarketDataStore in bbgo.
I will update this PR with it.
Don't review it now.

@dboyliao dboyliao changed the title FEATURE: [coinbase] Implement K-Line subscription handling via RESTful API (WIP) FEATURE: [coinbase] Implement K-Line subscription handling via RESTful API Apr 24, 2025
@dboyliao dboyliao changed the title (WIP) FEATURE: [coinbase] Implement K-Line subscription handling via RESTful API WIP: [coinbase] Implement K-Line subscription handling via RESTful API Apr 24, 2025
@dboyliao dboyliao force-pushed the dboy/coinbase-kline branch 3 times, most recently from ccab19d to 9572241 Compare April 24, 2025 13:48
@dboyliao dboyliao requested a review from c9s as a code owner April 24, 2025 13:48
@dboyliao dboyliao changed the title WIP: [coinbase] Implement K-Line subscription handling via RESTful API FEATURE: [coinbase] Implement K-Line subscription handling via RESTful API Apr 24, 2025
@dboyliao
Copy link
Collaborator Author

It's ready for review.

@dboyliao dboyliao force-pushed the dboy/coinbase-kline branch 11 times, most recently from 6125959 to bef40b1 Compare April 25, 2025 00:02
@dboyliao dboyliao changed the title FEATURE: [coinbase] Implement K-Line subscription handling via RESTful API FEATURE: [coinbase] Implement K-Line subscription handling via Market Trade Feeds Apr 25, 2025
@dboyliao dboyliao force-pushed the dboy/coinbase-kline branch 2 times, most recently from 1d6371a to 2df742e Compare April 25, 2025 00:12
for interval := range c.intervals {
if interval.Duration()%minDuration != 0 {
c.logger.Errorf("interval %s is not a multiple of minInterval %s", interval, c.minInterval)
panic(interval)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use fmt.Errorf and pass the error to panic instead of Errorf logging here

it's a logical exception (user configuration faults)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

}
}
c.logger.Infof("kline updater started for %s (%+v)", c.symbol, c.intervals)
c.updateTime = types.Time(time.Now())
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't use time.Now when in replay mode (back testing), it could mess up the "real" data time

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is just a kick-start value of the update time.
I'll move this line to update.
And in backtest mode, I will not start the update goroutine so it won't mess up the real data then.

@dboyliao dboyliao force-pushed the dboy/coinbase-kline branch 2 times, most recently from 9c5b6cf to f5786c6 Compare April 25, 2025 00:43
Comment on lines 85 to 96
if !c.high.IsZero() {
if c.price.Compare(c.high) > 0 {
c.high = c.price
}
} else {
c.high = c.price
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be simplified

c.high = math.Max(c.high, c.price) ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yeah.
Just like my comment below, the high will be zero when the builder is reset.
I was thinking it's more clear to do the update with a if here.
But you are right, it can be simplified.

Comment on lines 92 to 96
if !c.low.IsZero() {
if c.price.Compare(c.low) < 0 {
c.low = c.price
}
} else {
c.low = c.price
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c.low = math.Min(c.low, c.price)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice.
Fixing it.

c.low = c.price
}
} else {
c.low = c.price
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is "low" equal to 0??

Copy link
Collaborator Author

@dboyliao dboyliao Apr 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

low, high, ...etc will be zero when the builder is being "reset".

Copy link
Collaborator Author

@dboyliao dboyliao Apr 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the builder is reset, we should use whatever the price is as its low.

Copy link
Collaborator Author

@dboyliao dboyliao Apr 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add a comment on it for clarity.

@@ -25,6 +26,20 @@ const (
balanceChannel types.Channel = "balance"
)

// minInterval: for the kline updater
var minInterval = types.Interval("1s")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use const

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the init(), it will be updated according to supported intervals on Coinbase.
In this way, if there is any updates to the supported intervals on Coinbase, we won't need to change the min interval here.
It will be updated accordingly.

klineMinInterval := types.KLine{
Symbol: c.symbol,
StartTime: types.Time(startTime),
EndTime: types.Time(tickTime),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tickTime.Add(-1 millisecond)?

The start time and end time cannot overlap

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to do that.
startTime := tickTime.Add(-1 * intervalDuration).Round(intervalDuration)
Since .Round(...) will round the time to it's neareast multiple of the given duration, startTime will not overlap with tickTime.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I don't realize what you mean until now.
You mean the next kline start time should not overlap with the end time of the previous one right?
I will update the end time as expected end time minus one millisecond.

kline.Merge(klineMinInterval)
}
kline.Interval = interval
expectEndTime := kline.StartTime.Time().Add(interval.Duration())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-1 time.millisecond?

Copy link
Collaborator Author

@dboyliao dboyliao Apr 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we don't need to do that here.

Ideally, kline should end at a timestamp which is a multiple of the interval in the loop.
expectEndTime here is the ideal end time of the kline.

So basically, kline.Closed = ... is checking:

  1. if the kline.EndTime, which in fact is the ticker time, is after expectEndTime.
  2. or the lastUpdateTime, which is the time of the last market trade, is after expectEndTime.

Either case, the kline should be closed for that interval.

@dboyliao dboyliao requested a review from c9s April 25, 2025 04:04
@dboyliao dboyliao requested a review from bailantaotao April 25, 2025 04:04
@dboyliao dboyliao force-pushed the dboy/coinbase-kline branch 12 times, most recently from 93c7052 to 86695dd Compare May 1, 2025 14:32
@dboyliao dboyliao changed the title FEATURE: [coinbase] Implement K-Line subscription handling via Market Trade Feeds FEATURE: [core] Implement KLine builder, emitting kline events by market trades May 1, 2025
@dboyliao dboyliao force-pushed the dboy/coinbase-kline branch 4 times, most recently from 1076b32 to e870a77 Compare May 1, 2025 15:29
@dboyliao dboyliao force-pushed the dboy/coinbase-kline branch from e870a77 to b839fd0 Compare May 2, 2025 03:32
@@ -49,6 +50,7 @@ var BackTestService *service.BacktestService
func SetBackTesting(s *service.BacktestService) {
BackTestService = s
IsBackTesting = s != nil
coinbase.IsBackTesting = IsBackTesting
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why adding a IsBackTestung flag in coinbase?

// - symbol: symbol to trace on
// - minInterval: unit interval, related to your signal timeframe.
// All the supported intervals of the binding stream should be multiple of this interval.
func NewKLineBuilder(symbol string, minInterval types.Interval, isBackTesting bool) *kLineBuilder {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need to pass isBackTesting to this builder, we just need to make this component backtest-able by ensuring not using async in the process

}
}

func (kb *kLineBuilder) Subscribe(interval types.Interval) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call this Subscribe as "AddInterval" and let it return error

}

func (kb *kLineBuilder) BindStream(stream types.Stream) {
if kb.isBackTesting {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't handle backtest like this, would prefer pull out this logic to another layer

Close: trade.Price,
High: trade.Price,
Low: trade.Price,
EndTime: types.Time(time.Now()), // use current time to fix the timezone issue (temporary)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the back test mode, we don't have use "time.Now()" because it's a simulation

queryEndTime := queryStartTime.Add(2 * kline.Interval.Duration())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
klines, err := kb.exchange.QueryKLines(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't call QueryKlines in the back-test mode, and it seems like you're mixing 2 difference scenario in the same component?

queryStartTime := kline.StartTime.Time().Add(-kline.Interval.Duration())
queryEndTime := queryStartTime.Add(2 * kline.Interval.Duration())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

be careful of the defer, it will not be called at the end of loop, it's executed at the end of function call.

func (kb *kLineBuilder) run(ctx context.Context) {
startTime := time.Now()
startTimeTruncated := startTime.Truncate(kb.minInterval.Duration())
intervalDuration := kb.minInterval.Duration()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since minInterval's purpose is different (for the ticker), we can rename it to tickerInterval, and ensures all the intervals we used are greater than the ticker interval.

}
}

// Reset will reset the kline of the given interval to a new open kline with the given start time
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc comment uses simple form, "Reset resets"


}

func updateKlineInPlace(existing, delta *types.KLine) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just pass types.Trade ? because you only need the price and the volume

return lastKLine, ok
}

func (kb *kLineBuilderState) AddTrade(trade types.Trade) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when AddTrade is called, you can return the updated klines to the caller, and these updated klines can be pushed to the stream

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g.

itv1 --------|---itv2-------|---itv3
td1-td2-----|---td3-------|---td4

itv* is the ticker interval, not the kline interval, the ticker interval should be smaller than all the kline intervals, which is for pushing the updated kline periodically.

when td3 is received, we need to return "closed k1" and "new k2"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants