Skip to content

Commit 526aa5a

Browse files
committed
make data a atomic link
1 parent 941468f commit 526aa5a

File tree

7 files changed

+54
-58
lines changed

7 files changed

+54
-58
lines changed

pkg/lmd/a_helper_test.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -353,11 +353,10 @@ func StartTestPeerExtra(numPeers, numHosts, numServices int, extraConfig string)
353353
for {
354354
err := peer.InitAllTables(context.TODO())
355355
if err == nil {
356-
peer.lock.RLock()
357-
peer.data.lock.RLock()
358-
gotPeers := len(peer.data.tables[TableStatus].Data)
359-
peer.data.lock.RUnlock()
360-
peer.lock.RUnlock()
356+
data := peer.data.Load()
357+
data.lock.RLock()
358+
gotPeers := len(data.tables[TableStatus].Data)
359+
data.lock.RUnlock()
361360
if gotPeers == numPeers {
362361
break
363362
}

pkg/lmd/benchmark_test.go

+11-7
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@ func BenchmarkParseResultJSON(b *testing.B) {
1717
PauseTestPeers(peer)
1818

1919
columns := make([]string, 0)
20-
for i := range peer.data.tables[TableServices].Table.Columns {
21-
col := peer.data.tables[TableServices].Table.Columns[i]
20+
data := peer.data.Load()
21+
for i := range data.tables[TableServices].Table.Columns {
22+
col := data.tables[TableServices].Table.Columns[i]
2223
if col.StorageType == LocalStore && col.Optional == NoFlags {
2324
columns = append(columns, col.Name)
2425
}
@@ -57,8 +58,9 @@ func BenchmarkParseResultWrappedJSON(b *testing.B) {
5758
PauseTestPeers(peer)
5859

5960
columns := make([]string, 0)
60-
for i := range peer.data.tables[TableServices].Table.Columns {
61-
col := peer.data.tables[TableServices].Table.Columns[i]
61+
data := peer.data.Load()
62+
for i := range data.tables[TableServices].Table.Columns {
63+
col := data.tables[TableServices].Table.Columns[i]
6264
if col.StorageType == LocalStore && col.Optional == NoFlags {
6365
columns = append(columns, col.Name)
6466
}
@@ -98,8 +100,9 @@ func BenchmarkPeerUpdate(b *testing.B) {
98100

99101
ctx := context.TODO()
100102
b.StartTimer()
103+
data := peer.data.Load()
101104
for range b.N {
102-
err := peer.data.UpdateFull(ctx, Objects.UpdateTables)
105+
err := data.UpdateFull(ctx, Objects.UpdateTables)
103106
if err != nil {
104107
panic("Update failed")
105108
}
@@ -115,7 +118,8 @@ func BenchmarkPeerUpdateServiceInsert(b *testing.B) {
115118
peer, cleanup, _ := StartTestPeer(1, 1000, 10000)
116119
PauseTestPeers(peer)
117120

118-
table := peer.data.tables[TableServices]
121+
data := peer.data.Load()
122+
table := data.tables[TableServices]
119123
req := &Request{
120124
Table: table.Table.Name,
121125
Columns: table.DynamicColumnNamesCache,
@@ -130,7 +134,7 @@ func BenchmarkPeerUpdateServiceInsert(b *testing.B) {
130134

131135
b.StartTimer()
132136
for range b.N {
133-
res := peer.data.insertDeltaDataResult(2, res, meta, table)
137+
res := data.insertDeltaDataResult(2, res, meta, table)
134138
if res != nil {
135139
panic("Update failed")
136140
}

pkg/lmd/datastoreset_test.go

+8-5
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ func TestDSHasChanged(t *testing.T) {
4444
peer, cleanup, _ := StartTestPeer(1, 10, 10)
4545
PauseTestPeers(peer)
4646

47-
err := peer.data.reloadIfNumberOfObjectsChanged(context.TODO())
47+
data := peer.data.Load()
48+
err := data.reloadIfNumberOfObjectsChanged(context.TODO())
4849
require.NoError(t, err)
4950

5051
err = cleanup()
@@ -57,12 +58,13 @@ func TestDSFullUpdate(t *testing.T) {
5758

5859
peer.statusSetLocked(LastUpdate, float64(0))
5960
peer.statusSetLocked(LastFullServiceUpdate, float64(0))
60-
err := peer.data.UpdateDeltaServices(context.TODO(), fmt.Sprintf("Filter: host_name = %s\nFilter: description = %s\n", "test", "test"), false, 0)
61+
data := peer.data.Load()
62+
err := data.UpdateDeltaServices(context.TODO(), fmt.Sprintf("Filter: host_name = %s\nFilter: description = %s\n", "test", "test"), false, 0)
6163
require.NoError(t, err)
6264

6365
peer.statusSetLocked(LastUpdate, float64(0))
6466
peer.statusSetLocked(LastFullServiceUpdate, float64(0))
65-
err = peer.data.UpdateDeltaServices(context.TODO(), fmt.Sprintf("Filter: host_name = %s\nFilter: description = %s\n", "test", "test"), true, time.Now().Unix())
67+
err = data.UpdateDeltaServices(context.TODO(), fmt.Sprintf("Filter: host_name = %s\nFilter: description = %s\n", "test", "test"), true, time.Now().Unix())
6668
require.NoError(t, err)
6769

6870
err = cleanup()
@@ -73,10 +75,11 @@ func TestDSDowntimesComments(t *testing.T) {
7375
peer, cleanup, _ := StartTestPeer(1, 10, 10)
7476
PauseTestPeers(peer)
7577

76-
err := peer.data.buildDowntimeCommentsList(TableComments)
78+
data := peer.data.Load()
79+
err := data.buildDowntimeCommentsList(TableComments)
7780
require.NoError(t, err)
7881

79-
err = peer.data.buildDowntimeCommentsList(TableDowntimes)
82+
err = data.buildDowntimeCommentsList(TableDowntimes)
8083
require.NoError(t, err)
8184

8285
err = cleanup()

pkg/lmd/importer.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ func initializePeersWithImport(lmd *Daemon, importFile string) (err error) {
4747
peer := peers[i]
4848

4949
// finish peer import
50-
err = peer.data.SetReferences()
50+
data := peer.data.Load()
51+
err = data.SetReferences()
5152
if err != nil {
5253
return fmt.Errorf("failed to set references: %s", err.Error())
5354
}
@@ -247,7 +248,7 @@ func importData(peers []*Peer, table *Table, rows ResultSet, columns []string, l
247248
peer.LastOnline = interface2float64(rows[0][colIndex["last_online"]])
248249
peer.Queries = interface2int64(rows[0][colIndex["queries"]])
249250
peer.ResponseTime = interface2float64(rows[0][colIndex["response_time"]])
250-
peer.data = NewDataStoreSet(peer)
251+
peer.data.Store(NewDataStoreSet(peer))
251252

252253
flags := NoFlags
253254
flags.Load(con.Flags)
@@ -259,7 +260,7 @@ func importData(peers []*Peer, table *Table, rows ResultSet, columns []string, l
259260

260261
if peer != nil && peer.isOnline() {
261262
store := NewDataStore(table, peer)
262-
store.DataSet = peer.data
263+
store.DataSet = peer.data.Load()
263264
columnsList := ColumnList{}
264265
for _, name := range columns {
265266
col := store.GetColumn(name)
@@ -276,7 +277,8 @@ func importData(peers []*Peer, table *Table, rows ResultSet, columns []string, l
276277
if err != nil {
277278
return peers, fmt.Errorf("failed to insert data: %s", err.Error())
278279
}
279-
peer.data.Set(table.Name, store)
280+
data := peer.data.Load()
281+
data.Set(table.Name, store)
280282
}
281283

282284
return peers, nil

pkg/lmd/peer.go

+17-32
Original file line numberDiff line numberDiff line change
@@ -77,16 +77,16 @@ type Peer struct { //nolint:govet // not fieldalignment relevant
7777
waitGroup *sync.WaitGroup // wait group used to wait on shutdowns
7878
ConfigTool *string
7979
ThrukExtras *string
80-
lock *deadlock.RWMutex // must be used for Peer.* access
81-
data *DataStoreSet // the cached remote data tables
82-
lmd *Daemon // reference to main lmd instance
83-
Config *Connection // reference to the peer configuration from the config file
84-
SubPeerStatus map[string]interface{} // cached /sites result for sub peer
85-
shutdownChannel chan bool // channel used to wait to finish shutdown
86-
Name string // Name of this peer, aka peer_name
87-
ID string // ID for this peer, aka peer_key
88-
PeerAddr string // Address of the peer
89-
ParentID string // ID of parent Peer
80+
lock *deadlock.RWMutex // must be used for Peer.* access
81+
data atomic.Pointer[DataStoreSet] // the cached remote data tables
82+
lmd *Daemon // reference to main lmd instance
83+
Config *Connection // reference to the peer configuration from the config file
84+
SubPeerStatus map[string]interface{} // cached /sites result for sub peer
85+
shutdownChannel chan bool // channel used to wait to finish shutdown
86+
Name string // Name of this peer, aka peer_name
87+
ID string // ID for this peer, aka peer_key
88+
PeerAddr string // Address of the peer
89+
ParentID string // ID of parent Peer
9090
PeerParent string
9191
LastError string
9292
Section string
@@ -397,9 +397,7 @@ func (p *Peer) periodicUpdate(ctx context.Context) (ok bool, err error) {
397397
lastUpdate := interface2float64(p.LastUpdate.Load())
398398
lastFullUpdate := interface2float64(p.LastFullUpdate.Load())
399399
lastTimeperiodUpdateMinute := p.LastTimeperiodUpdateMinute.Load()
400-
p.lock.RLock()
401-
data := p.data
402-
p.lock.RUnlock()
400+
data := p.data.Load()
403401

404402
lastStatus := PeerStatus(p.PeerState.Load())
405403

@@ -767,8 +765,8 @@ func (p *Peer) InitAllTables(ctx context.Context) (err error) {
767765

768766
duration := time.Since(time1)
769767
peerStatus := PeerStatus(p.PeerState.Load())
768+
p.data.Store(data)
770769
p.lock.Lock()
771-
p.SetDataStoreSet(data, false)
772770
p.ResponseTime = duration.Seconds()
773771
p.lock.Unlock()
774772
logWith(p).Infof("objects created in: %s", duration.String())
@@ -2760,39 +2758,26 @@ func (p *Peer) setQueryOptions(req *Request) {
27602758
}
27612759

27622760
// GetDataStoreSet returns table data or error.
2763-
func (p *Peer) GetDataStoreSet() (data *DataStoreSet, err error) {
2764-
p.lock.RLock()
2765-
data = p.data
2766-
p.lock.RUnlock()
2767-
if data == nil {
2761+
func (p *Peer) GetDataStoreSet() (store *DataStoreSet, err error) {
2762+
store = p.data.Load()
2763+
if store == nil {
27682764
err = fmt.Errorf("peer is down: %s", p.getError())
27692765
}
27702766

27712767
return
27722768
}
27732769

2774-
// SetDataStoreSet resets the data table.
2775-
func (p *Peer) SetDataStoreSet(data *DataStoreSet, lock bool) {
2776-
if lock {
2777-
p.lock.Lock()
2778-
defer p.lock.Unlock()
2779-
}
2780-
p.data = data
2781-
}
2782-
27832770
// ClearData resets the data table.
27842771
func (p *Peer) ClearData(lock bool) {
27852772
if lock {
27862773
p.lock.Lock()
27872774
defer p.lock.Unlock()
27882775
}
2789-
p.data = nil
2776+
p.data.Store(nil)
27902777
}
27912778

27922779
func (p *Peer) ResumeFromIdle(ctx context.Context) (err error) {
2793-
p.lock.RLock()
2794-
data := p.data
2795-
p.lock.RUnlock()
2780+
data := p.data.Load()
27962781
state := PeerStatus(p.PeerState.Load())
27972782
p.statusSetLocked(Idling, false)
27982783
logWith(p).Infof("switched back to normal update interval")

pkg/lmd/peer_test.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,8 @@ func TestPeerUpdate(t *testing.T) {
143143
peer, cleanup, _ := StartTestPeer(1, 10, 10)
144144
PauseTestPeers(peer)
145145

146-
err := peer.data.UpdateFull(context.TODO(), Objects.UpdateTables)
146+
store := peer.data.Load()
147+
err := store.UpdateFull(context.TODO(), Objects.UpdateTables)
147148
require.NoError(t, err)
148149

149150
// fake some last_update entries
@@ -171,7 +172,7 @@ func TestPeerUpdate(t *testing.T) {
171172
_, err = peer.periodicUpdate(context.TODO())
172173
require.NoError(t, err)
173174

174-
err = peer.periodicTimeperiodsUpdate(context.TODO(), peer.data)
175+
err = peer.periodicTimeperiodsUpdate(context.TODO(), store)
175176
require.NoError(t, err)
176177

177178
peer.statusSetLocked(LastUpdate, float64(0))
@@ -188,7 +189,8 @@ func TestPeerDeltaUpdate(t *testing.T) {
188189
peer, cleanup, _ := StartTestPeer(1, 10, 10)
189190
PauseTestPeers(peer)
190191

191-
err := peer.data.UpdateDelta(context.TODO(), 0, 0)
192+
store := peer.data.Load()
193+
err := store.UpdateDelta(context.TODO(), 0, 0)
192194
require.NoError(t, err)
193195

194196
err = cleanup()
@@ -211,7 +213,8 @@ func TestPeerInitSerial(t *testing.T) {
211213
peer, cleanup, _ := StartTestPeer(1, 10, 10)
212214
PauseTestPeers(peer)
213215

214-
err := peer.initAllTablesSerial(context.TODO(), peer.data)
216+
store := peer.data.Load()
217+
err := peer.initAllTablesSerial(context.TODO(), store)
215218
require.NoError(t, err)
216219

217220
err = cleanup()

pkg/lmd/virtstore.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func GetGroupByData(table *Table, peer *Peer) *DataStore {
7373
return nil
7474
}
7575
store := NewDataStore(table, peer)
76-
store.DataSet = peer.data
76+
store.DataSet = peer.data.Load()
7777
data := make(ResultSet, 0)
7878
dataSet := store.DataSet
7979
dataSet.lock.RLock()

0 commit comments

Comments
 (0)