Skip to content

Split other methods in txn into separate files #19826

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions server/etcdserver/txn/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2025 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package txn

import (
"context"

"go.uber.org/zap"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/pkg/v3/traceutil"
"go.etcd.io/etcd/server/v3/storage/mvcc"
)

func DeleteRange(ctx context.Context, lg *zap.Logger, kv mvcc.KV, dr *pb.DeleteRangeRequest) (resp *pb.DeleteRangeResponse, trace *traceutil.Trace, err error) {
ctx, trace = ensureTrace(ctx, lg, "delete_range",
traceutil.Field{Key: "key", Value: string(dr.Key)},
traceutil.Field{Key: "range_end", Value: string(dr.RangeEnd)},
)
txnWrite := kv.Write(trace)
defer txnWrite.End()
resp, err = deleteRange(ctx, txnWrite, dr)
return resp, trace, err
}

func deleteRange(ctx context.Context, txnWrite mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
resp := &pb.DeleteRangeResponse{}
resp.Header = &pb.ResponseHeader{}
end := mkGteRange(dr.RangeEnd)

if dr.PrevKv {
rr, err := txnWrite.Range(ctx, dr.Key, end, mvcc.RangeOptions{})
if err != nil {
return nil, err

Check warning on line 47 in server/etcdserver/txn/delete.go

View check run for this annotation

Codecov / codecov/patch

server/etcdserver/txn/delete.go#L47

Added line #L47 was not covered by tests
}
if rr != nil {
resp.PrevKvs = make([]*mvccpb.KeyValue, len(rr.KVs))
for i := range rr.KVs {
resp.PrevKvs[i] = &rr.KVs[i]
}
}
}

resp.Deleted, resp.Header.Revision = txnWrite.DeleteRange(dr.Key, end)
return resp, nil
}

// mkGteRange determines if the range end is a >= range. This works around grpc
// sending empty byte strings as nil; >= is encoded in the range end as '\0'.
// If it is a GTE range, then []byte{} is returned to indicate the empty byte
// string (vs nil being no byte string).
func mkGteRange(rangeEnd []byte) []byte {
if len(rangeEnd) == 1 && rangeEnd[0] == 0 {
return []byte{}
}
return rangeEnd
}
114 changes: 114 additions & 0 deletions server/etcdserver/txn/put.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2025 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package txn

import (
"context"

"go.uber.org/zap"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/pkg/v3/traceutil"
"go.etcd.io/etcd/server/v3/etcdserver/errors"
"go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/storage/mvcc"
)

func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
ctx, trace = ensureTrace(ctx, lg, "put",
traceutil.Field{Key: "key", Value: string(p.Key)},
traceutil.Field{Key: "req_size", Value: p.Size()},
)
err = checkLease(lessor, p)
if err != nil {
return nil, trace, err
}
txnWrite := kv.Write(trace)
defer txnWrite.End()
prevKV, err := checkAndGetPrevKV(trace, txnWrite, p)
if err != nil {
return nil, trace, err
}
return put(ctx, txnWrite, p, prevKV), trace, nil
}

func put(ctx context.Context, txnWrite mvcc.TxnWrite, p *pb.PutRequest, prevKV *mvcc.RangeResult) *pb.PutResponse {
trace := traceutil.Get(ctx)
resp := &pb.PutResponse{}
resp.Header = &pb.ResponseHeader{}
val, leaseID := p.Value, lease.LeaseID(p.Lease)

if p.IgnoreValue {
val = prevKV.KVs[0].Value
}
if p.IgnoreLease {
leaseID = lease.LeaseID(prevKV.KVs[0].Lease)
}
if p.PrevKv {
if prevKV != nil && len(prevKV.KVs) != 0 {
resp.PrevKv = &prevKV.KVs[0]
}
}

resp.Header.Revision = txnWrite.Put(p.Key, val, leaseID)
trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision})
return resp
}

func checkPut(trace *traceutil.Trace, txnWrite mvcc.ReadView, lessor lease.Lessor, p *pb.PutRequest) error {
err := checkLease(lessor, p)
if err != nil {
return err
}
_, err = checkAndGetPrevKV(trace, txnWrite, p)
return err
}

func checkLease(lessor lease.Lessor, p *pb.PutRequest) error {
leaseID := lease.LeaseID(p.Lease)
if leaseID != lease.NoLease {
if l := lessor.Lookup(leaseID); l == nil {
return lease.ErrLeaseNotFound
}
}
return nil
}

func checkAndGetPrevKV(trace *traceutil.Trace, txnWrite mvcc.ReadView, p *pb.PutRequest) (prevKV *mvcc.RangeResult, err error) {
prevKV, err = getPrevKV(trace, txnWrite, p)
if err != nil {
return nil, err

Check warning on line 92 in server/etcdserver/txn/put.go

View check run for this annotation

Codecov / codecov/patch

server/etcdserver/txn/put.go#L92

Added line #L92 was not covered by tests
}
if p.IgnoreValue || p.IgnoreLease {
if prevKV == nil || len(prevKV.KVs) == 0 {
// ignore_{lease,value} flag expects previous key-value pair
return nil, errors.ErrKeyNotFound
}
}
return prevKV, nil
}

func getPrevKV(trace *traceutil.Trace, txnWrite mvcc.ReadView, p *pb.PutRequest) (prevKV *mvcc.RangeResult, err error) {
if p.IgnoreValue || p.IgnoreLease || p.PrevKv {
trace.StepWithFunction(func() {
prevKV, err = txnWrite.Range(context.TODO(), p.Key, nil, mvcc.RangeOptions{})
}, "get previous kv pair")

if err != nil {
return nil, err

Check warning on line 110 in server/etcdserver/txn/put.go

View check run for this annotation

Codecov / codecov/patch

server/etcdserver/txn/put.go#L110

Added line #L110 was not covered by tests
}
}
return prevKV, nil
}
203 changes: 203 additions & 0 deletions server/etcdserver/txn/range.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
// Copyright 2025 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package txn

import (
"bytes"
"context"
"sort"
"time"

"go.uber.org/zap"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/pkg/v3/traceutil"
"go.etcd.io/etcd/server/v3/storage/mvcc"
)

func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, r *pb.RangeRequest) (resp *pb.RangeResponse, trace *traceutil.Trace, err error) {
ctx, trace = ensureTrace(ctx, lg, "range")
defer func(start time.Time) {
success := err == nil
RangeSecObserve(success, time.Since(start))
}(time.Now())
txnRead := kv.Read(mvcc.ConcurrentReadTxMode, trace)
defer txnRead.End()
resp, err = executeRange(ctx, lg, txnRead, r)
return resp, trace, err
}

func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
trace := traceutil.Get(ctx)

resp := &pb.RangeResponse{}
resp.Header = &pb.ResponseHeader{}

limit := r.Limit
if r.SortOrder != pb.RangeRequest_NONE ||
r.MinModRevision != 0 || r.MaxModRevision != 0 ||
r.MinCreateRevision != 0 || r.MaxCreateRevision != 0 {
// fetch everything; sort and truncate afterwards
limit = 0
}
if limit > 0 {
// fetch one extra for 'more' flag
limit = limit + 1
}

ro := mvcc.RangeOptions{
Limit: limit,
Rev: r.Revision,
Count: r.CountOnly,
}

rr, err := txnRead.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro)
if err != nil {
return nil, err
}

if r.MaxModRevision != 0 {
f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision > r.MaxModRevision }
pruneKVs(rr, f)
}
if r.MinModRevision != 0 {
f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision < r.MinModRevision }
pruneKVs(rr, f)
}
if r.MaxCreateRevision != 0 {
f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision > r.MaxCreateRevision }
pruneKVs(rr, f)
}
if r.MinCreateRevision != 0 {
f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision < r.MinCreateRevision }
pruneKVs(rr, f)
}

sortOrder := r.SortOrder
if r.SortTarget != pb.RangeRequest_KEY && sortOrder == pb.RangeRequest_NONE {
// Since current mvcc.Range implementation returns results
// sorted by keys in lexiographically ascending order,
// sort ASCEND by default only when target is not 'KEY'
sortOrder = pb.RangeRequest_ASCEND
} else if r.SortTarget == pb.RangeRequest_KEY && sortOrder == pb.RangeRequest_ASCEND {
// Since current mvcc.Range implementation returns results
// sorted by keys in lexiographically ascending order,
// don't re-sort when target is 'KEY' and order is ASCEND
sortOrder = pb.RangeRequest_NONE
}
if sortOrder != pb.RangeRequest_NONE {
var sorter sort.Interface
switch {
case r.SortTarget == pb.RangeRequest_KEY:
sorter = &kvSortByKey{&kvSort{rr.KVs}}
case r.SortTarget == pb.RangeRequest_VERSION:
sorter = &kvSortByVersion{&kvSort{rr.KVs}}

Check warning on line 107 in server/etcdserver/txn/range.go

View check run for this annotation

Codecov / codecov/patch

server/etcdserver/txn/range.go#L106-L107

Added lines #L106 - L107 were not covered by tests
case r.SortTarget == pb.RangeRequest_CREATE:
sorter = &kvSortByCreate{&kvSort{rr.KVs}}
case r.SortTarget == pb.RangeRequest_MOD:
sorter = &kvSortByMod{&kvSort{rr.KVs}}
case r.SortTarget == pb.RangeRequest_VALUE:
sorter = &kvSortByValue{&kvSort{rr.KVs}}
default:
lg.Panic("unexpected sort target", zap.Int32("sort-target", int32(r.SortTarget)))

Check warning on line 115 in server/etcdserver/txn/range.go

View check run for this annotation

Codecov / codecov/patch

server/etcdserver/txn/range.go#L112-L115

Added lines #L112 - L115 were not covered by tests
}
switch {
case sortOrder == pb.RangeRequest_ASCEND:
sort.Sort(sorter)
case sortOrder == pb.RangeRequest_DESCEND:
sort.Sort(sort.Reverse(sorter))
}
}

if r.Limit > 0 && len(rr.KVs) > int(r.Limit) {
rr.KVs = rr.KVs[:r.Limit]
resp.More = true
}
trace.Step("filter and sort the key-value pairs")
resp.Header.Revision = rr.Rev
resp.Count = int64(rr.Count)
resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs))
for i := range rr.KVs {
if r.KeysOnly {
rr.KVs[i].Value = nil
}
resp.Kvs[i] = &rr.KVs[i]
}
trace.Step("assemble the response")
return resp, nil
}

func checkRange(rv mvcc.ReadView, req *pb.RangeRequest) error {
switch {
case req.Revision == 0:
return nil
case req.Revision > rv.Rev():
return mvcc.ErrFutureRev
case req.Revision < rv.FirstRev():
return mvcc.ErrCompacted
}
return nil
}

func pruneKVs(rr *mvcc.RangeResult, isPrunable func(*mvccpb.KeyValue) bool) {
j := 0
for i := range rr.KVs {
rr.KVs[j] = rr.KVs[i]
if !isPrunable(&rr.KVs[i]) {
j++
}
}
rr.KVs = rr.KVs[:j]
}

type kvSort struct{ kvs []mvccpb.KeyValue }

func (s *kvSort) Swap(i, j int) {
t := s.kvs[i]
s.kvs[i] = s.kvs[j]
s.kvs[j] = t
}
func (s *kvSort) Len() int { return len(s.kvs) }

type kvSortByKey struct{ *kvSort }

func (s *kvSortByKey) Less(i, j int) bool {
return bytes.Compare(s.kvs[i].Key, s.kvs[j].Key) < 0
}

type kvSortByVersion struct{ *kvSort }

func (s *kvSortByVersion) Less(i, j int) bool {
return (s.kvs[i].Version - s.kvs[j].Version) < 0

Check warning on line 184 in server/etcdserver/txn/range.go

View check run for this annotation

Codecov / codecov/patch

server/etcdserver/txn/range.go#L183-L184

Added lines #L183 - L184 were not covered by tests
}

type kvSortByCreate struct{ *kvSort }

func (s *kvSortByCreate) Less(i, j int) bool {
return (s.kvs[i].CreateRevision - s.kvs[j].CreateRevision) < 0
}

type kvSortByMod struct{ *kvSort }

func (s *kvSortByMod) Less(i, j int) bool {
return (s.kvs[i].ModRevision - s.kvs[j].ModRevision) < 0
}

type kvSortByValue struct{ *kvSort }

func (s *kvSortByValue) Less(i, j int) bool {
return bytes.Compare(s.kvs[i].Value, s.kvs[j].Value) < 0

Check warning on line 202 in server/etcdserver/txn/range.go

View check run for this annotation

Codecov / codecov/patch

server/etcdserver/txn/range.go#L201-L202

Added lines #L201 - L202 were not covered by tests
}
Loading