Skip to content

Commit 2c1180c

Browse files
authored
Fix sqlite busy errors (#347)
Fix sqlite busy errors with session based learning There were a couple different errors * We were still enqueing block ids and not sessions in some places * When we call Enqueue we should filter out sessions that aren't suitable for learning; this should minimize DB contention * Improve logging By default sqlite doesn't have any retries for SQLITE_BUSY errors. We could configure a timeout for retries using a PRAGMA; we should probably do that but I want to hold off to see if this PR fixes the contention. * Filed #348 to follow up on this Use ko to create docker images as part of the development process * This is convenient when we want to build a docker image from our latest changes without doing a relase. * Change the Dockerfile to not include any arguments so that it is compatible with the image ko builds
1 parent 93ff799 commit 2c1180c

12 files changed

+286
-129
lines changed

app/.ko.yaml

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# https://ko.build/configuration/#setting-build-flags-and-ldflags
2+
defaultFlags:
3+
# https://ko.build/configuration/#naming-images
4+
# --base-import-paths removes the md5hash from the image name
5+
# but it appends to the repo path a name based on the path of the module which in this case is "app"
6+
# TODO(jeremy): This doesn't seem to be working I still have to set the flag when I run ko
7+
- --base-import-paths
8+
builds:
9+
- id: foyle
10+
ldflags:
11+
- -s
12+
- -w
13+
- -X 'github.com/jlewi/foyle/app/cmd.date={{.Date}}'
14+
- -X 'github.com/jlewi/foyle/app/cmd.version=dev'
15+
- -X 'github.com/jlewi/foyle/app/cmd.commit={{.Git.ShortCommit}}'

app/Dockerfile

+2-1
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,5 @@ FROM ${RUNTIME_IMAGE}
2929

3030
COPY --from=builder /workspace/app/foyle /
3131

32-
ENTRYPOINT ["/foyle", "serve"]
32+
# N.B. Don't set any arguments because we want to be compatible with the images ko builds
33+
ENTRYPOINT ["/foyle"]

app/pkg/analyze/analyzer.go

+1-17
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ type blockItem struct {
153153
}
154154

155155
// PostSessionEvent interface for functions to post session events.
156-
type PostSessionEvent func(id string) error
156+
type PostSessionEvent func(session *logspb.Session) error
157157

158158
// Run runs the analyzer; continually processing logs.
159159
// learnNotifier is an optional function that will be called when a block is updated.
@@ -466,14 +466,6 @@ func (a *Analyzer) processLogEvent(ctx context.Context, entry *api.LogEntry) {
466466
}); err != nil {
467467
log.Error(err, "Failed to update block with execution", "blockId", bid)
468468
}
469-
// We need to enqueue the block for processing since it was executed.
470-
// The learner will decide whether the blockLog has all the information it needs otherwise it will
471-
// disregard the block item and wait for further events.
472-
if a.learnNotifier != nil {
473-
if err := a.learnNotifier(bid); err != nil {
474-
log.Error(err, "Error notifying block event", "blockId", bid)
475-
}
476-
}
477469
case v1alpha1.LogEventType_ACCEPTED:
478470
fallthrough
479471
case v1alpha1.LogEventType_REJECTED:
@@ -666,14 +658,6 @@ func (a *Analyzer) handleBlockEvents(ctx context.Context) {
666658
if err != nil {
667659
log.Error(err, "Error processing block", "blockId", blockItem.id)
668660
}
669-
// We need to enqueue the block for processing since it was executed.
670-
// The learner will decide whether the blockLog has all the information it needs otherwise it will
671-
// disregard the block item and wait for further events.
672-
if a.learnNotifier != nil {
673-
if err := a.learnNotifier(blockItem.id); err != nil {
674-
log.Error(err, "Error notifying block event", "blockId", blockItem.id)
675-
}
676-
}
677661
if a.signalBlockDone != nil {
678662
a.signalBlockDone <- blockItem.id
679663
}

app/pkg/analyze/analyzer_test.go

+5-10
Original file line numberDiff line numberDiff line change
@@ -174,15 +174,15 @@ type fakeNotifier struct {
174174
counts map[string]int
175175
}
176176

177-
func (f *fakeNotifier) PostBlockEvent(blockID string) error {
177+
func (f *fakeNotifier) PostSession(session *logspb.Session) error {
178178
if f.counts == nil {
179179
f.counts = make(map[string]int)
180180
}
181-
if _, ok := f.counts[blockID]; !ok {
182-
f.counts[blockID] = 0
181+
if _, ok := f.counts[session.GetContextId()]; !ok {
182+
f.counts[session.GetContextId()] = 0
183183

184184
}
185-
f.counts[blockID] += 1
185+
f.counts[session.GetContextId()] += 1
186186
return nil
187187
}
188188

@@ -276,7 +276,7 @@ func Test_Analyzer(t *testing.T) {
276276
a.signalBlockDone = blockProccessed
277277

278278
fakeNotifier := &fakeNotifier{}
279-
if err := a.Run(context.Background(), []string{rawDir}, fakeNotifier.PostBlockEvent); err != nil {
279+
if err := a.Run(context.Background(), []string{rawDir}, fakeNotifier.PostSession); err != nil {
280280
t.Fatalf("Analyze failed: %v", err)
281281
}
282282

@@ -323,11 +323,6 @@ func Test_Analyzer(t *testing.T) {
323323
t.Errorf("Expected ExecutedBlock to be set")
324324
}
325325

326-
// Check the block notifier was called twice; once after the generated block and once after the executed block
327-
if fakeNotifier.counts[expectedBlockID] != 2 {
328-
t.Errorf("Expected block notifier to be called twice but got %d", fakeNotifier.counts[expectedBlockID])
329-
}
330-
331326
// Now append some logs to the logFile and see that they get processed
332327
f, err := os.OpenFile(logFile, os.O_APPEND|os.O_WRONLY, 0644)
333328
if err != nil {

app/pkg/analyze/dbs.go

+17
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
package analyze
22

33
import (
4+
"context"
5+
46
"github.com/cockroachdb/pebble"
57
"github.com/jlewi/foyle/app/pkg/dbutil"
8+
"github.com/jlewi/foyle/app/pkg/logs"
69
logspb "github.com/jlewi/foyle/protos/go/foyle/logs"
10+
"github.com/pkg/errors"
11+
"modernc.org/sqlite"
12+
sqlite3 "modernc.org/sqlite/lib"
713
)
814

915
// NewLockingBlocksDB helper function to create a new LockingDB for BlockLog.
@@ -39,3 +45,14 @@ func getLogEntriesVersion(m *logspb.LogEntries) string {
3945
func setLogEntriesVersion(m *logspb.LogEntries, version string) {
4046
m.ResourceVersion = version
4147
}
48+
49+
func logDBErrors(ctx context.Context, err error) {
50+
log := logs.FromContext(ctx)
51+
var sqlLiteErr *sqlite.Error
52+
if errors.As(err, &sqlLiteErr) {
53+
if sqlLiteErr.Code() == sqlite3.SQLITE_BUSY {
54+
sqlLiteBusyErrs.Inc()
55+
log.Error(err, "SQLITE_BUSY")
56+
}
57+
}
58+
}

app/pkg/analyze/session_builder.go

+10-6
Original file line numberDiff line numberDiff line change
@@ -65,18 +65,22 @@ func (p *sessionBuilder) processLogEvent(entry *api.LogEntry, notifier PostSessi
6565
return
6666
}
6767

68+
var session *logspb.Session
6869
updateFunc := func(s *logspb.Session) error {
69-
return updateSessionFromEvent(event, entry.Time(), s)
70+
err := updateSessionFromEvent(event, entry.Time(), s)
71+
// Make a copy of the updated session because we will process it down below
72+
session = s
73+
return err
7074
}
7175

7276
if err := p.sessions.Update(context.Background(), event.GetContextId(), updateFunc); err != nil {
73-
log.Error(err, "Failed to update session", "event", event)
77+
log.Error(err, "Failed to update session", "event", event, "contextId", event.GetContextId())
7478
return
7579
}
7680

7781
if event.Type == v1alpha1.LogEventType_SESSION_END {
78-
if err := notifier(event.GetContextId()); err != nil {
79-
log.Error(err, "Failed to send session process event")
82+
if err := notifier(session); err != nil {
83+
log.Error(err, "Failed to send session process event", "contextId", event.GetContextId())
8084
}
8185
}
8286
}
@@ -90,7 +94,7 @@ func (p *sessionBuilder) processLLMUsage(entry *api.LogEntry) {
9094
}
9195
contextId, ok := entry.GetString("contextId")
9296
if !ok {
93-
log.Error(errors.New("Failed to handle LLMUsage log entry"), "LLMUsage is missing contextId", "entry", entry)
97+
log.Error(errors.New("Failed to handle LLMUsage log entry"), "LLMUsage is missing contextId", "entry", entry, "contextId", contextId)
9498
return
9599
}
96100

@@ -99,7 +103,7 @@ func (p *sessionBuilder) processLLMUsage(entry *api.LogEntry) {
99103
}
100104

101105
if err := p.sessions.Update(context.Background(), contextId, updateFunc); err != nil {
102-
log.Error(err, "Failed to update session", "usage", usage)
106+
log.Error(err, "Failed to update session", "usage", usage, "contextId", contextId)
103107
}
104108
}
105109

app/pkg/analyze/session_builder_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ func setup() (testTuple, error) {
7272
}
7373

7474
// Process the log entry
75-
func testNotifier(contextId string) error {
76-
fmt.Printf("Received session end event for context: %v", contextId)
75+
func testNotifier(session *logspb.Session) error {
76+
fmt.Printf("Received session end event for context: %v", session.GetContextId())
7777
return nil
7878
}
7979

app/pkg/analyze/session_manager.go

+87-53
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ import (
88
"os"
99
"path/filepath"
1010

11+
"github.com/prometheus/client_golang/prometheus"
12+
"github.com/prometheus/client_golang/prometheus/promauto"
13+
1114
"connectrpc.com/connect"
1215
"github.com/jlewi/foyle/app/pkg/logs"
1316
"github.com/jlewi/foyle/app/pkg/runme/converters"
@@ -28,6 +31,21 @@ const (
2831
SQLLiteDriver = "sqlite"
2932
)
3033

34+
var (
35+
sessCounter = promauto.NewCounterVec(
36+
prometheus.CounterOpts{
37+
Name: "session_updates",
38+
Help: "Number of sessions updated",
39+
},
40+
[]string{"status"},
41+
)
42+
43+
sqlLiteBusyErrs = promauto.NewCounter(prometheus.CounterOpts{
44+
Name: "sqlite_busy",
45+
Help: "Number of operations that failed because sqlite was busy",
46+
})
47+
)
48+
3149
// GetDDL return the DDL for the database.
3250
// This is a hack because the DDL statements for the sessions and eval results tables are in the same file and package.
3351
// The Evaluator needs to be able to get the DDL in order to create the eval results table. We should clean this up
@@ -68,6 +86,7 @@ func (db *SessionsManager) Get(ctx context.Context, contextID string) (*logspb.S
6886
sessRow, err := queries.GetSession(ctx, contextID)
6987

7088
if err != nil {
89+
logDBErrors(ctx, err)
7190
return nil, err
7291
}
7392

@@ -92,81 +111,96 @@ func (db *SessionsManager) Update(ctx context.Context, contextID string, updateF
92111
}
93112
log = log.WithValues("contextId", contextID)
94113

114+
sessCounter.WithLabelValues("start").Inc()
115+
95116
tx, err := db.db.BeginTx(ctx, &sql.TxOptions{})
96117
if err != nil {
118+
// DO NOT COMMIT
119+
sessCounter.WithLabelValues("failedstart").Inc()
97120
return errors.Wrapf(err, "Failed to start transaction")
98121
}
99122

100-
queries := db.queries.WithTx(tx)
101-
// Read the record
102-
sessRow, err := queries.GetSession(ctx, contextID)
123+
err = func() error {
124+
queries := db.queries.WithTx(tx)
125+
// Read the record
126+
sessRow, err := queries.GetSession(ctx, contextID)
103127

104-
// If the session doesn't exist then we do nothing because session is initializeed to empty session
105-
session := &logspb.Session{
106-
ContextId: contextID,
107-
}
108-
if err != nil {
109-
if err != sql.ErrNoRows {
110-
if txErr := tx.Rollback(); txErr != nil {
111-
log.Error(txErr, "Failed to rollback transaction")
112-
}
113-
return errors.Wrapf(err, "Failed to get session with id %v", contextID)
128+
// If the session doesn't exist then we do nothing because session is initializeed to empty session
129+
session := &logspb.Session{
130+
ContextId: contextID,
114131
}
115-
} else {
116-
// Deserialize the proto
117-
if err := proto.Unmarshal(sessRow.Proto, session); err != nil {
118-
if txErr := tx.Rollback(); txErr != nil {
119-
log.Error(txErr, "Failed to rollback transaction")
132+
if err != nil {
133+
logDBErrors(ctx, err)
134+
if err != sql.ErrNoRows {
135+
// DO NOT COMMIT
136+
sessCounter.WithLabelValues("failedget").Inc()
137+
return errors.Wrapf(err, "Failed to get session with id %v", contextID)
138+
}
139+
// ErrNoRows means the session doesn't exist so we just continue with the empty session
140+
} else {
141+
// Deserialize the proto
142+
if err := proto.Unmarshal(sessRow.Proto, session); err != nil {
143+
return errors.Wrapf(err, "Failed to deserialize session")
120144
}
121-
return errors.Wrapf(err, "Failed to deserialize session")
122145
}
123-
}
124146

125-
if err := updateFunc(session); err != nil {
126-
if txErr := tx.Rollback(); txErr != nil {
127-
log.Error(txErr, "Failed to rollback transaction")
147+
// DO NOT COMMIT
148+
sessCounter.WithLabelValues("callupdatefunc").Inc()
149+
150+
if err := updateFunc(session); err != nil {
151+
return errors.Wrapf(err, "Failed to update session")
128152
}
129-
return errors.Wrapf(err, "Failed to update session")
130-
}
131153

132-
newRow, err := protoToRow(session)
133-
if err != nil {
134-
if txErr := tx.Rollback(); txErr != nil {
135-
log.Error(txErr, "Failed to rollback transaction")
154+
newRow, err := protoToRow(session)
155+
if err != nil {
156+
return errors.Wrapf(err, "Failed to convert session proto to table row")
136157
}
137-
return errors.Wrapf(err, "Failed to convert session proto to table row")
138-
}
139158

140-
if newRow.Contextid != contextID {
141-
if txErr := tx.Rollback(); txErr != nil {
142-
log.Error(txErr, "Failed to rollback transaction")
159+
if newRow.Contextid != contextID {
160+
return errors.WithStack(errors.Errorf("contextID in session doesn't match contextID. Update was called with contextID: %v but session has contextID: %v", contextID, newRow.Contextid))
143161
}
144-
return errors.WithStack(errors.Errorf("contextID in session doesn't match contextID. Update was called with contextID: %v but session has contextID: %v", contextID, newRow.Contextid))
145-
}
146162

147-
update := fsql.UpdateSessionParams{
148-
Contextid: contextID,
149-
Proto: newRow.Proto,
150-
Starttime: newRow.Starttime,
151-
Endtime: newRow.Endtime,
152-
Selectedid: newRow.Selectedid,
153-
Selectedkind: newRow.Selectedkind,
154-
TotalInputTokens: newRow.TotalInputTokens,
155-
TotalOutputTokens: newRow.TotalOutputTokens,
156-
NumGenerateTraces: newRow.NumGenerateTraces,
157-
}
163+
update := fsql.UpdateSessionParams{
164+
Contextid: contextID,
165+
Proto: newRow.Proto,
166+
Starttime: newRow.Starttime,
167+
Endtime: newRow.Endtime,
168+
Selectedid: newRow.Selectedid,
169+
Selectedkind: newRow.Selectedkind,
170+
TotalInputTokens: newRow.TotalInputTokens,
171+
TotalOutputTokens: newRow.TotalOutputTokens,
172+
NumGenerateTraces: newRow.NumGenerateTraces,
173+
}
158174

159-
if err := queries.UpdateSession(ctx, update); err != nil {
175+
// DO NOT COMMIT
176+
sessCounter.WithLabelValues("callupdatesession").Inc()
177+
if err := queries.UpdateSession(ctx, update); err != nil {
178+
logDBErrors(ctx, err)
179+
return errors.Wrapf(err, "Failed to update session")
180+
}
181+
return nil
182+
}()
183+
184+
if err == nil {
185+
if err := tx.Commit(); err != nil {
186+
logDBErrors(ctx, err)
187+
log.Error(err, "Failed to commit transaction")
188+
sessCounter.WithLabelValues("commitfail").Inc()
189+
return errors.Wrapf(err, "Failed to commit transaction")
190+
}
191+
sessCounter.WithLabelValues("success").Inc()
192+
} else {
193+
logDBErrors(ctx, err)
194+
sessCounter.WithLabelValues("fail").Inc()
195+
log.Error(err, "Failed to update session")
160196
if txErr := tx.Rollback(); txErr != nil {
161197
log.Error(txErr, "Failed to rollback transaction")
162198
}
163-
return errors.Wrapf(err, "Failed to update session")
164-
}
165-
166-
if err := tx.Commit(); err != nil {
167-
return errors.Wrapf(err, "Failed to commit transaction")
199+
return err
168200
}
169201

202+
// DO NOT COMMIT
203+
sessCounter.WithLabelValues("done").Inc()
170204
return nil
171205
}
172206

0 commit comments

Comments
 (0)