Skip to content

client: add aimd limit for create #569

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 18, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 60 additions & 20 deletions core/client/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,20 +129,40 @@ func isRateLimitError(err error) bool {
return strings.Contains(err.Error(), "rate limit exceeded")
}

func newRateLimiters() map[string]*common.AIMDLimiter {
return map[string]*common.AIMDLimiter{
"flush": common.NewAIMDLimiter(0.01, 50, 5),
"createColl": common.NewAIMDLimiter(1, 100, 5),
type limiters struct {
flush *common.AIMDLimiter

createCollection *common.AIMDLimiter
createPartition *common.AIMDLimiter
createDatabase *common.AIMDLimiter
createIndex *common.AIMDLimiter
}

func newLimiters() limiters {
return limiters{
flush: common.NewAIMDLimiter(0.01, 50, 5),
createCollection: common.NewAIMDLimiter(1, 100, 5),
createPartition: common.NewAIMDLimiter(1, 100, 5),
createDatabase: common.NewAIMDLimiter(1, 100, 5),
createIndex: common.NewAIMDLimiter(1, 100, 5),
}
}

func (l *limiters) close() {
l.flush.Stop()
l.createCollection.Stop()
l.createPartition.Stop()
l.createDatabase.Stop()
l.createIndex.Stop()
}

var _ Grpc = (*GrpcClient)(nil)

type GrpcClient struct {
conn *grpc.ClientConn
srv milvuspb.MilvusServiceClient

limiters map[string]*common.AIMDLimiter
limiters limiters

user string
auth string
Expand Down Expand Up @@ -237,7 +257,7 @@ func NewGrpc(cfg *paramtable.MilvusConfig) (*GrpcClient, error) {
conn: conn,
srv: srv,

limiters: newRateLimiters(),
limiters: newLimiters(),

user: cfg.User,
auth: auth,
Expand Down Expand Up @@ -302,10 +322,7 @@ func (g *GrpcClient) connect(ctx context.Context) error {
}

func (g *GrpcClient) Close() error {
for _, limiter := range g.limiters {
limiter.Stop()
}

g.limiters.close()
return g.conn.Close()
}

Expand All @@ -320,15 +337,23 @@ func (g *GrpcClient) GetVersion(ctx context.Context) (string, error) {
}

func (g *GrpcClient) CreateDatabase(ctx context.Context, dbName string) error {
ctx = g.newCtx(ctx)
if g.hasFlags(disableDatabase) {
return errors.New("client: the server does not support database")
}

ctx = g.newCtx(ctx)
if err := g.limiters.createDatabase.Wait(ctx); err != nil {
return fmt.Errorf("client: create database wait: %w", err)
}

resp, err := g.srv.CreateDatabase(ctx, &milvuspb.CreateDatabaseRequest{DbName: dbName})
if err := checkResponse(resp, err); err != nil {
if isRateLimitError(err) {
g.limiters.createDatabase.Failure()
}
return fmt.Errorf("client: create database failed: %w", err)
}
g.limiters.createDatabase.Success()

return nil
}
Expand Down Expand Up @@ -398,20 +423,19 @@ func (g *GrpcClient) GetPersistentSegmentInfo(ctx context.Context, db, collName

func (g *GrpcClient) Flush(ctx context.Context, db, collName string) (*milvuspb.FlushResponse, error) {
ctx = g.newCtxWithDB(ctx, db)
limiter := g.limiters["flush"]

if err := limiter.Wait(ctx); err != nil {
if err := g.limiters.flush.Wait(ctx); err != nil {
return nil, fmt.Errorf("client: flush wait: %w", err)
}

resp, err := g.srv.Flush(ctx, &milvuspb.FlushRequest{CollectionNames: []string{collName}})
if err := checkResponse(resp, err); err != nil {
if isRateLimitError(err) {
limiter.Failure()
g.limiters.flush.Failure()
}
return nil, fmt.Errorf("client: flush failed: %w", err)
}
limiter.Success()
g.limiters.flush.Success()

segmentIDs, has := resp.GetCollSegIDs()[collName]
ids := segmentIDs.GetData()
Expand Down Expand Up @@ -518,9 +542,8 @@ type CreateCollectionInput struct {

func (g *GrpcClient) CreateCollection(ctx context.Context, input CreateCollectionInput) error {
ctx = g.newCtxWithDB(ctx, input.DB)
limiter := g.limiters["createColl"]

if err := limiter.Wait(ctx); err != nil {
if err := g.limiters.createCollection.Wait(ctx); err != nil {
return fmt.Errorf("client: create collection wait: %w", err)
}

Expand All @@ -539,11 +562,11 @@ func (g *GrpcClient) CreateCollection(ctx context.Context, input CreateCollectio
resp, err := g.srv.CreateCollection(ctx, in)
if err := checkResponse(resp, err); err != nil {
if isRateLimitError(err) {
limiter.Failure()
g.limiters.createCollection.Failure()
}
return fmt.Errorf("client: call create collection rpc: %w", err)
}
limiter.Success()
g.limiters.createCollection.Success()

return nil
}
Expand All @@ -560,11 +583,20 @@ func (g *GrpcClient) DropCollection(ctx context.Context, db string, collectionNa

func (g *GrpcClient) CreatePartition(ctx context.Context, db, collName, partitionName string) error {
ctx = g.newCtxWithDB(ctx, db)

if err := g.limiters.createPartition.Wait(ctx); err != nil {
return fmt.Errorf("client: create partition wait: %w", err)
}

in := &milvuspb.CreatePartitionRequest{CollectionName: collName, PartitionName: partitionName}
resp, err := g.srv.CreatePartition(ctx, in)
if err := checkResponse(resp, err); err != nil {
if isRateLimitError(err) {
g.limiters.createPartition.Failure()
}
return fmt.Errorf("client: create partition failed: %w", err)
}
g.limiters.createPartition.Success()

return nil
}
Expand Down Expand Up @@ -598,17 +630,25 @@ type CreateIndexInput struct {

func (g *GrpcClient) CreateIndex(ctx context.Context, input CreateIndexInput) error {
ctx = g.newCtxWithDB(ctx, input.DB)

if err := g.limiters.createIndex.Wait(ctx); err != nil {
return fmt.Errorf("client: create index wait: %w", err)
}

in := &milvuspb.CreateIndexRequest{
CollectionName: input.CollectionName,
FieldName: input.FieldName,
IndexName: input.IndexName,
ExtraParams: mapKvPairs(input.Params),
}

resp, err := g.srv.CreateIndex(ctx, in)
if err := checkResponse(resp, err); err != nil {
if isRateLimitError(err) {
g.limiters.createIndex.Failure()
}
return fmt.Errorf("client: create index failed: %w", err)
}
g.limiters.createIndex.Success()

return nil
}
Expand Down