Skip to content

More Robust Error Handling For Cache Issues #249

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 95 additions & 23 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,27 @@ const (
columnDelimiter = ","
)

// ErrCacheInconsistent is an error that can occur when an operation
// would cause the cache to be inconsistent
type ErrCacheInconsistent struct {
details string
}

// Error implements the error interface
func (e *ErrCacheInconsistent) Error() string {
msg := "cache inconsistent"
if e.details != "" {
msg += ": " + e.details
}
return msg
}

func NewErrCacheInconsistent(details string) *ErrCacheInconsistent {
return &ErrCacheInconsistent{
details: details,
}
}

// ErrIndexExists is returned when an item in the database cannot be inserted due to existing indexes
type ErrIndexExists struct {
Table string
Expand Down Expand Up @@ -115,7 +136,7 @@ func (r *RowCache) Create(uuid string, m model.Model, checkIndexes bool) error {
r.mutex.Lock()
defer r.mutex.Unlock()
if _, ok := r.cache[uuid]; ok {
return fmt.Errorf("row %s already exists", uuid)
return NewErrCacheInconsistent(fmt.Sprintf("cannot create row %s as it already exists", uuid))
}
if reflect.TypeOf(m) != r.dataType {
return fmt.Errorf("expected data of type %s, but got %s", r.dataType.String(), reflect.TypeOf(m).String())
Expand Down Expand Up @@ -153,7 +174,7 @@ func (r *RowCache) Update(uuid string, m model.Model, checkIndexes bool) error {
r.mutex.Lock()
defer r.mutex.Unlock()
if _, ok := r.cache[uuid]; !ok {
return fmt.Errorf("row %s does not exist", uuid)
return NewErrCacheInconsistent(fmt.Sprintf("cannot update row %s as it does not exist in the cache", uuid))
}
oldRow := model.Clone(r.cache[uuid])
oldInfo, err := mapper.NewInfo(&r.schema, oldRow)
Expand Down Expand Up @@ -249,7 +270,7 @@ func (r *RowCache) Delete(uuid string) error {
r.mutex.Lock()
defer r.mutex.Unlock()
if _, ok := r.cache[uuid]; !ok {
return fmt.Errorf("row %s does not exist", uuid)
return NewErrCacheInconsistent(fmt.Sprintf("cannot delete row %s as it does not exist in the cache", uuid))
}
oldRow := r.cache[uuid]
oldInfo, err := mapper.NewInfo(&r.schema, oldRow)
Expand Down Expand Up @@ -409,6 +430,9 @@ type TableCache struct {
mapper *mapper.Mapper
dbModel *model.DBModel
schema *ovsdb.DatabaseSchema
updates chan ovsdb.TableUpdates
updates2 chan ovsdb.TableUpdates2
errorChan chan error
ovsdb.NotificationHandler
mutex sync.RWMutex
}
Expand Down Expand Up @@ -444,6 +468,9 @@ func NewTableCache(schema *ovsdb.DatabaseSchema, dbModel *model.DBModel, data Da
mapper: mapper.NewMapper(schema),
dbModel: dbModel,
mutex: sync.RWMutex{},
updates: make(chan ovsdb.TableUpdates, bufferSize),
updates2: make(chan ovsdb.TableUpdates2, bufferSize),
errorChan: make(chan error),
}, nil
}

Expand Down Expand Up @@ -479,21 +506,23 @@ func (t *TableCache) Tables() []string {
}

// Update implements the update method of the NotificationHandler interface
// this populates the cache with new updates
// this populates a channel with updates so they can be processed after the initial
// state has been Populated
func (t *TableCache) Update(context interface{}, tableUpdates ovsdb.TableUpdates) {
if len(tableUpdates) == 0 {
return
}
t.Populate(tableUpdates)
t.updates <- tableUpdates
}

// Update2 implements the update method of the NotificationHandler interface
// this populates the cache with new updates
// this populates a channel with updates so they can be processed after the initial
// state has been Populated
func (t *TableCache) Update2(context interface{}, tableUpdates ovsdb.TableUpdates2) {
if len(tableUpdates) == 0 {
return
}
t.Populate2(tableUpdates)
t.updates2 <- tableUpdates
}

// Locked implements the locked method of the NotificationHandler interface
Expand All @@ -513,7 +542,7 @@ func (t *TableCache) Disconnected() {
}

// Populate adds data to the cache and places an event on the channel
func (t *TableCache) Populate(tableUpdates ovsdb.TableUpdates) {
func (t *TableCache) Populate(tableUpdates ovsdb.TableUpdates) error {
t.mutex.Lock()
defer t.mutex.Unlock()

Expand All @@ -527,40 +556,41 @@ func (t *TableCache) Populate(tableUpdates ovsdb.TableUpdates) {
if row.New != nil {
newModel, err := t.CreateModel(table, row.New, uuid)
if err != nil {
panic(err)
return err
}
if existing := tCache.Row(uuid); existing != nil {
if !reflect.DeepEqual(newModel, existing) {
if err := tCache.Update(uuid, newModel, false); err != nil {
panic(err)
return err
}
t.eventProcessor.AddEvent(updateEvent, table, existing, newModel)
}
// no diff
continue
}
if err := tCache.Create(uuid, newModel, false); err != nil {
panic(err)
return err
}
t.eventProcessor.AddEvent(addEvent, table, nil, newModel)
continue
} else {
oldModel, err := t.CreateModel(table, row.Old, uuid)
if err != nil {
panic(err)
return err
}
if err := tCache.Delete(uuid); err != nil {
panic(err)
return err
}
t.eventProcessor.AddEvent(deleteEvent, table, oldModel, nil)
continue
}
}
}
return nil
}

// Populate2 adds data to the cache and places an event on the channel
func (t *TableCache) Populate2(tableUpdates ovsdb.TableUpdates2) {
func (t *TableCache) Populate2(tableUpdates ovsdb.TableUpdates2) error {
t.mutex.Lock()
defer t.mutex.Unlock()
for table := range t.dbModel.Types() {
Expand All @@ -574,19 +604,19 @@ func (t *TableCache) Populate2(tableUpdates ovsdb.TableUpdates2) {
case row.Initial != nil:
m, err := t.CreateModel(table, row.Initial, uuid)
if err != nil {
panic(err)
return err
}
if err := tCache.Create(uuid, m, false); err != nil {
panic(err)
return err
}
t.eventProcessor.AddEvent(addEvent, table, nil, m)
case row.Insert != nil:
m, err := t.CreateModel(table, row.Insert, uuid)
if err != nil {
panic(err)
return err
}
if err := tCache.Create(uuid, m, false); err != nil {
panic(err)
return err
}
t.eventProcessor.AddEvent(addEvent, table, nil, m)
case row.Modify != nil:
Expand All @@ -597,11 +627,11 @@ func (t *TableCache) Populate2(tableUpdates ovsdb.TableUpdates2) {
modified := tCache.Row(uuid)
err := t.ApplyModifications(table, modified, *row.Modify)
if err != nil {
panic(err)
return err
}
if !reflect.DeepEqual(modified, existing) {
if err := tCache.Update(uuid, modified, false); err != nil {
panic(err)
return err
}
t.eventProcessor.AddEvent(updateEvent, table, existing, modified)
}
Expand All @@ -615,12 +645,13 @@ func (t *TableCache) Populate2(tableUpdates ovsdb.TableUpdates2) {
panic(fmt.Errorf("row with uuid %s does not exist", uuid))
}
if err := tCache.Delete(uuid); err != nil {
panic(err)
return err
}
t.eventProcessor.AddEvent(deleteEvent, table, m, nil)
}
}
}
return nil
}

// Purge drops all data in the cache and reinitializes it using the
Expand All @@ -639,9 +670,50 @@ func (t *TableCache) AddEventHandler(handler EventHandler) {
t.eventProcessor.AddEventHandler(handler)
}

// Run starts the event processing loop. It blocks until the channel is closed.
// Run starts the event processing and update processing loops.
// It blocks until the stop channel is closed.
// Once closed, it clears the updates/updates2 channels to ensure we don't process stale updates on a new connection
func (t *TableCache) Run(stopCh <-chan struct{}) {
t.eventProcessor.Run(stopCh)
wg := sync.WaitGroup{}
wg.Add(1)
go t.processUpdates(stopCh)
wg.Add(1)
go t.eventProcessor.Run(stopCh)
wg.Wait()
t.updates = make(chan ovsdb.TableUpdates, bufferSize)
t.updates2 = make(chan ovsdb.TableUpdates2, bufferSize)
}

// Errors returns a channel where errors that occur during cache propagation can be received
func (t *TableCache) Errors() <-chan error {
return t.errorChan
}

func (t *TableCache) processUpdates(stopCh <-chan struct{}) {
for {
select {
case <-stopCh:
return
case update := <-t.updates:
if err := t.Populate(update); err != nil {
select {
case t.errorChan <- err:
// error sent to client
default:
// client not listening for errors
}
}
case update2 := <-t.updates2:
if err := t.Populate2(update2); err != nil {
select {
case t.errorChan <- err:
// error sent to client
default:
// client not listening for errors
}
}
}
}
}

// newRowCache creates a new row cache with the provided data
Expand Down
30 changes: 20 additions & 10 deletions cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,8 @@ func TestTableCache_populate(t *testing.T) {
},
},
}
tc.Populate(updates)
err = tc.Populate(updates)
require.NoError(t, err)

got := tc.Table("Open_vSwitch").Row("test")
assert.Equal(t, testRowModel, got)
Expand All @@ -733,7 +734,8 @@ func TestTableCache_populate(t *testing.T) {
Old: &testRow,
New: &updatedRow,
}
tc.Populate(updates)
err = tc.Populate(updates)
require.NoError(t, err)

got = tc.cache["Open_vSwitch"].cache["test"]
assert.Equal(t, updatedRowModel, got)
Expand All @@ -744,7 +746,8 @@ func TestTableCache_populate(t *testing.T) {
New: nil,
}

tc.Populate(updates)
err = tc.Populate(updates)
require.NoError(t, err)

_, ok := tc.cache["Open_vSwitch"].cache["test"]
assert.False(t, ok)
Expand Down Expand Up @@ -786,7 +789,8 @@ func TestTableCachePopulate(t *testing.T) {
},
},
}
tc.Populate(updates)
err = tc.Populate(updates)
require.NoError(t, err)

got := tc.Table("Open_vSwitch").Row("test")
assert.Equal(t, testRowModel, got)
Expand All @@ -798,7 +802,8 @@ func TestTableCachePopulate(t *testing.T) {
Old: &testRow,
New: &updatedRow,
}
tc.Populate(updates)
err = tc.Populate(updates)
require.NoError(t, err)

got = tc.cache["Open_vSwitch"].cache["test"]
assert.Equal(t, updatedRowModel, got)
Expand All @@ -809,7 +814,8 @@ func TestTableCachePopulate(t *testing.T) {
New: nil,
}

tc.Populate(updates)
err = tc.Populate(updates)
require.NoError(t, err)

_, ok := tc.cache["Open_vSwitch"].cache["test"]
assert.False(t, ok)
Expand Down Expand Up @@ -851,7 +857,8 @@ func TestTableCachePopulate2(t *testing.T) {
}

t.Log("Initial")
tc.Populate2(updates)
err = tc.Populate2(updates)
require.NoError(t, err)
got := tc.Table("Open_vSwitch").Row("test")
assert.Equal(t, testRowModel, got)

Expand All @@ -865,7 +872,8 @@ func TestTableCachePopulate2(t *testing.T) {
},
},
}
tc.Populate2(updates)
err = tc.Populate2(updates)
require.NoError(t, err)
got = tc.Table("Open_vSwitch").Row("test2")
assert.Equal(t, testRowModel2, got)

Expand All @@ -879,7 +887,8 @@ func TestTableCachePopulate2(t *testing.T) {
},
},
}
tc.Populate2(updates)
err = tc.Populate2(updates)
require.NoError(t, err)
got = tc.cache["Open_vSwitch"].cache["test"]
assert.Equal(t, updatedRowModel, got)

Expand All @@ -892,7 +901,8 @@ func TestTableCachePopulate2(t *testing.T) {
},
},
}
tc.Populate2(updates)
err = tc.Populate2(updates)
require.NoError(t, err)
_, ok := tc.cache["Open_vSwitch"].cache["test"]
assert.False(t, ok)
}
Expand Down
Loading