Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch up all deletes/updates in an Update API call #111

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ type DbConnector interface {
Connect(context.Context) (pgx.Tx, CloseFunc, error)
Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error)
CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error)
}
294 changes: 202 additions & 92 deletions internal/db/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
"maps"
"slices"
"strings"

"github.com/gadget-inc/dateilager/internal/pb"
Expand All @@ -23,108 +25,180 @@ func UpdateLatestVersion(ctx context.Context, tx pgx.Tx, project int64, version
return nil
}

func DeleteObject(ctx context.Context, tx pgx.Tx, project int64, version int64, path string) error {
func DeleteObjects(ctx context.Context, tx pgx.Tx, project int64, version int64, paths []string) error {
_, err := tx.Exec(ctx, `
UPDATE dl.objects
SET stop_version = $1
WHERE project = $2
AND path = $3
AND path = ANY($3)
AND stop_version IS NULL
`, version, project, path)
`, version, project, paths)
if err != nil {
return fmt.Errorf("delete object, project %v, version %v, path %v: %w", project, version, path, err)
return fmt.Errorf("delete objects, project %v, version %v, paths %v: %w", project, version, paths, err)
}

return nil
}

// UpdateObject returns true if content changed, false otherwise
func UpdateObject(ctx context.Context, tx pgx.Tx, conn DbConnector, encoder *ContentEncoder, project int64, version int64, object *pb.Object) (bool, error) {
content := object.Content
if content == nil {
content = []byte("")
// UpdateObjects returns true if content changed, false otherwise
func UpdateObjects(ctx context.Context, tx pgx.Tx, conn DbConnector, encoder *ContentEncoder, project int64, version int64, objects []*pb.Object) (bool, error) {
var objectColumnValues [][]any
for _, object := range objects {
content := object.Content
if content == nil {
content = []byte("")
}

encoded, err := encoder.Encode(content)
if err != nil {
return false, fmt.Errorf("encode updated content, project %v, version %v, path %v: %w", project, version, object.Path, err)
}

hash := HashContent(content)
objectColumnValues = append(objectColumnValues, []any{
hash,
encoded,
object.Path,
object.Mode,
object.Size,
})
}
hash := HashContent(content)

encoded, err := encoder.Encode(content)
tableName := fmt.Sprintf("__update_%d_%d", project, version)
_, err := tx.Exec(ctx, fmt.Sprintf(`
CREATE TEMPORARY TABLE
%s (hash hash, bytes bytea, path text, mode bigint, size bigint)
ON COMMIT DROP
`, tableName))
if err != nil {
return false, fmt.Errorf("encode updated content, project %v, version %v, path %v: %w", project, version, object.Path, err)
return false, fmt.Errorf("create temporary table for update failed: %w", err)
}

// insert the content outside the transaction to avoid deadlocks and to keep smaller transactions
_, err = conn.Exec(ctx, `
INSERT INTO dl.contents (hash, bytes)
VALUES (($1, $2), $3)
ON CONFLICT DO NOTHING
`, hash.H1, hash.H2, encoded)
_, err = tx.CopyFrom(ctx, pgx.Identifier{tableName}, []string{"hash", "bytes", "path", "mode", "size"}, pgx.CopyFromRows(objectColumnValues))
if err != nil {
return false, fmt.Errorf("insert objects content, hash %x-%x: %w", hash.H1, hash.H2, err)
return false, fmt.Errorf("insert objects content, %w", err)
}

rows, err := tx.Query(ctx, `
INSERT INTO dl.objects (project, start_version, stop_version, path, hash, mode, size, packed)
VALUES ($1, $2, NULL, $3, ($4, $5), $6, $7, $8)
_, err = tx.Exec(ctx, fmt.Sprintf(`
INSERT INTO
dl.contents (hash, bytes)
SELECT
hash, bytes
FROM
%s
ON CONFLICT
DO NOTHING
RETURNING project
`, project, version, object.Path, hash.H1, hash.H2, object.Mode, object.Size, false)
DO NOTHING
`, tableName))
if err != nil {
return false, fmt.Errorf("insert new object, project %v, version %v, path %v: %w", project, version, object.Path, err)
return false, fmt.Errorf("insert into contents table failed, %w", err)
}

isIdentical := !rows.Next()
rows, err := tx.Query(ctx, fmt.Sprintf(`
INSERT INTO
dl.objects (project, start_version, stop_version, path, hash, mode, size, packed)
SELECT
$1 as project,
$2 as start_version,
NULL as stop_version,
path,
hash,
mode,
size,
false as packed
FROM
%s
ON CONFLICT
DO NOTHING
RETURNING
path
`, tableName), project, version)
if err != nil {
return false, fmt.Errorf("insert new object, project %v, version %v: %w", project, version, err)
}

previousPaths := make(map[string]bool)
for rows.Next() {
var path string
err = rows.Scan(&path)
if err != nil {
return false, fmt.Errorf("scan path, project %v, version %v: %w", project, version, err)
}

previousPaths[path] = true
pathChunks := strings.Split(path, "/")
for i := 1; i < len(pathChunks); i++ {
previousPaths[fmt.Sprintf("%s/", strings.Join(pathChunks[:i], "/"))] = true
}
}
rows.Close()

if isIdentical {
if len(previousPaths) == 0 {
return false, nil
}

previousPaths := []string{object.Path}
pathChunks := strings.Split(object.Path, "/")

for i := 1; i < len(pathChunks); i++ {
previousPaths = append(previousPaths, fmt.Sprintf("%s/", strings.Join(pathChunks[:i], "/")))
previousPathsSlice := make([]string, 0, len(previousPaths))
for path := range previousPaths {
previousPathsSlice = append(previousPathsSlice, path)
}

_, err = tx.Exec(ctx, `
UPDATE dl.objects SET stop_version = $1
WHERE project = $2
UPDATE
dl.objects
SET
stop_version = $1
WHERE
project = $2
AND path = ANY($3)
AND stop_version IS NULL
AND start_version != $4
`, version, project, previousPaths, version)

`, version, project, previousPathsSlice, version)
if err != nil {
return false, fmt.Errorf("update previous object, project %v, version %v, path %v: %w", project, version, object.Path, err)
return false, fmt.Errorf("update previous object, project %v, version %v: %w", project, version, err)
}

return true, nil
}

// UpdatePackedObjects returns true if content changed, false otherwise
func UpdatePackedObjects(ctx context.Context, tx pgx.Tx, conn DbConnector, project int64, version int64, parent string, updates []*pb.Object) (bool, error) {
var hash Hash
var content []byte
func UpdatePackedObjects(ctx context.Context, tx pgx.Tx, conn DbConnector, project int64, version int64, updates map[string][]*pb.Object) (bool, error) {
parents := slices.Collect(maps.Keys(updates))

rows, err := tx.Query(ctx, `
SELECT (o.hash).h1, (o.hash).h2, c.bytes
FROM dl.objects o
JOIN dl.contents c
SELECT
(c.hash).h1,
(c.hash).h2,
c.bytes,
o.path
FROM
dl.objects o
JOIN
dl.contents c
ON o.hash = c.hash
WHERE project = $1
AND path = $2
AND packed IS true
AND stop_version IS NULL
`, project, parent)
WHERE
o.project = $1
AND o.path = ANY($2)
AND o.packed IS true
AND o.stop_version IS NULL
`, project, parents)
if err != nil {
return false, fmt.Errorf("select existing packed object, project %v, version %v, parent %v: %w", project, version, parent, err)
return false, fmt.Errorf("select existing packed object, project %v, version %v: %w", project, version, err)
}

if rows.Next() {
err = rows.Scan(&hash.H1, &hash.H2, &content)
hashes := make(map[string]Hash)
contents := make(map[string][]byte)

for rows.Next() {
var parent string
var hash Hash
var content []byte

err = rows.Scan(&hash.H1, &hash.H2, &content, &parent)
if err != nil {
return false, fmt.Errorf("scan hash and packed content from existing object, project %v, version %v, parent %v: %w", project, version, parent, err)
return false, fmt.Errorf("scan hash and packed content from existing object, project %v, version %v: %w", project, version, err)
}

hashes[parent] = hash
contents[parent] = content
}
rows.Close()

Expand All @@ -133,65 +207,101 @@ func UpdatePackedObjects(ctx context.Context, tx pgx.Tx, conn DbConnector, proje
return false, fmt.Errorf("failed to iterate rows: %w", err)
}

shouldInsert := true
updated, err := updateObjects(content, updates)
if errors.Is(err, ErrEmptyPack) {
// If the newly packed object is empty, we only need to delete the old one.
shouldInsert = false
} else if err != nil {
return false, fmt.Errorf("update packed object: %w", err)
}
objectColumnValues := make([][]any, 0, len(updates))
changedParents := make([]string, 0, len(updates))

newHash := HashContent(updated)
for parent, updates := range updates {
updated, err := updateObjects(contents[parent], updates)
packIsEmpty := errors.Is(err, ErrEmptyPack)
if err != nil && !packIsEmpty {
return false, fmt.Errorf("update packed object: %w", err)
}

newHash := HashContent(updated)
if hashes[parent] != newHash {
changedParents = append(changedParents, parent)

if !packIsEmpty {
objectColumnValues = append(objectColumnValues, []any{
newHash,
updated,
parent,
int64(len(updated)),
})
}
}
}

if hash == newHash {
// content didn't change
if len(changedParents) == 0 {
return false, nil
}

batch := &pgx.Batch{}

batch.Queue(`
UPDATE dl.objects SET stop_version = $1
WHERE project = $2
AND path = $3
AND packed IS true
AND stop_version IS NULL
`, version, project, parent)

if shouldInsert {
// insert the content outside the transaction to avoid deadlocks and to keep smaller transactions
_, err = conn.Exec(ctx, `
INSERT INTO dl.contents (hash, bytes)
VALUES (($1, $2), $3)
ON CONFLICT DO NOTHING
`, newHash.H1, newHash.H2, updated)
UPDATE
dl.objects
SET
stop_version = $1
WHERE
project = $2
AND path = ANY($3)
AND packed IS true
AND stop_version IS NULL
`, version, project, changedParents)

if len(objectColumnValues) > 0 {
tableName := fmt.Sprintf("__update_packed_%d_%d", project, version)

_, err = tx.Exec(ctx, fmt.Sprintf(`
CREATE TEMPORARY TABLE
%s (hash hash, bytes bytea, path text, size bigint)
ON COMMIT DROP
`, tableName))
if err != nil {
return false, fmt.Errorf("create temporary table for update failed: %w", err)
}

_, err = tx.CopyFrom(ctx, pgx.Identifier{tableName}, []string{"hash", "bytes", "path", "size"}, pgx.CopyFromRows(objectColumnValues))
if err != nil {
return false, fmt.Errorf("insert packed content, hash %x-%x: %w", newHash.H1, newHash.H2, err)
return false, fmt.Errorf("insert packaged objects content, %w", err)
}

batch.Queue(`
INSERT INTO dl.objects (project, start_version, stop_version, path, hash, mode, size, packed)
VALUES ($1, $2, NULL, $3, ($4, $5), $6, $7, $8)
`, project, version, parent, newHash.H1, newHash.H2, 0, len(updated), true)
batch.Queue(fmt.Sprintf(`
INSERT INTO
dl.contents (hash, bytes)
SELECT
hash, bytes
FROM
%s
ON CONFLICT
DO NOTHING
`, tableName))

batch.Queue(fmt.Sprintf(`
INSERT INTO
dl.objects (project, start_version, stop_version, path, hash, mode, size, packed)
SELECT
$1 as project,
$2 as start_version,
NULL as stop_version,
path,
hash,
0 as mode,
size,
true as packed
FROM
%s
`, tableName), project, version)
}

results := tx.SendBatch(ctx, batch)
defer results.Close()

_, err = results.Exec()
if err != nil {
return false, fmt.Errorf("update existing object, project %v, version %v, parent %v: %w", project, version, parent, err)
}

if shouldInsert {
_, err = results.Exec()
if err != nil {
return false, fmt.Errorf("insert new packed object, project %v, version %v, parent %v: %w", project, version, parent, err)
}
return false, fmt.Errorf("update existing object, project %v, version %v: %w", project, version, err)
}

// content did change
return true, nil
}
Loading
Loading