@@ -62,7 +62,7 @@ type leaseProviderImpl struct {
62
62
mutex sync.Mutex
63
63
opts ProviderOpts
64
64
65
- lastUpdatedAt * time.Time
65
+ lastUpdatedAt time.Time
66
66
67
67
acquired * Request
68
68
known map [string ]* Request
@@ -71,14 +71,14 @@ type leaseProviderImpl struct {
71
71
func NewLeaseProvider (opts ProviderOpts ) Provider {
72
72
return & leaseProviderImpl {
73
73
opts : opts ,
74
- lastUpdatedAt : nil ,
74
+ lastUpdatedAt : time . Now () ,
75
75
known : make (map [string ]* Request ),
76
76
}
77
77
}
78
78
79
79
func (lp * leaseProviderImpl ) MarshalJSON () ([]byte , error ) {
80
80
return json .Marshal (& struct {
81
- LastUpdatedAt * time.Time `json:"last_updated_at"`
81
+ LastUpdatedAt time.Time `json:"last_updated_at"`
82
82
Acquired * Request `json:"acquired"`
83
83
Known map [string ]* Request `json:"known"`
84
84
}{
@@ -148,7 +148,12 @@ func (lp *leaseProviderImpl) insert(ctx context.Context, leaseRequest *Request)
148
148
log .Ctx (ctx ).Debug ().EmbedObject (leaseRequest ).Msg ("Lease request is already existing" )
149
149
// Priority changed, update it
150
150
if existing .Priority != leaseRequest .Priority {
151
- log .Ctx (ctx ).Debug ().EmbedObject (leaseRequest ).Msgf ("Lease request priority has changed (previous: %d, new: %d)" , existing .Priority , leaseRequest .Priority )
151
+ log .Ctx (ctx ).
152
+ Debug ().
153
+ EmbedObject (leaseRequest ).
154
+ Int ("previous_priority" , existing .Priority ).
155
+ Int ("new_priority" , leaseRequest .Priority ).
156
+ Msg ("Lease request priority has changed" )
152
157
existing .Priority = leaseRequest .Priority
153
158
updated = true
154
159
}
@@ -160,7 +165,12 @@ func (lp *leaseProviderImpl) insert(ctx context.Context, leaseRequest *Request)
160
165
allowedTransition := existingStatus == StatusAcquired && (leaseRequestStatus == StatusSuccess || leaseRequestStatus == StatusFailure )
161
166
// condition
162
167
if statusMismatch && allowedTransition {
163
- log .Ctx (ctx ).Debug ().EmbedObject (leaseRequest ).Msgf ("Lease request status has changed (previous: %s, new: %s)" , existingStatus , leaseRequestStatus )
168
+ log .Ctx (ctx ).
169
+ Debug ().
170
+ EmbedObject (leaseRequest ).
171
+ Str ("previous_status" , existingStatus ).
172
+ Str ("new_status" , leaseRequestStatus ).
173
+ Msg ("Lease request status has changed" )
164
174
existing .Status = & leaseRequestStatus
165
175
updated = true
166
176
} else if statusMismatch {
@@ -172,9 +182,12 @@ func (lp *leaseProviderImpl) insert(ctx context.Context, leaseRequest *Request)
172
182
}
173
183
174
184
if updated {
175
- now := time .Now ()
176
- lp .lastUpdatedAt = & now
177
- log .Ctx (ctx ).Debug ().Msgf ("Provider last updated time bumped (new time: %s, StabilizeDuration now ends at %s)" , lp .lastUpdatedAt .Format (time .RFC3339 ), lp .lastUpdatedAt .Add (lp .opts .StabilizeDuration ).Format (time .RFC3339 ))
185
+ lp .lastUpdatedAt = time .Now ()
186
+ log .Ctx (ctx ).
187
+ Debug ().
188
+ Time ("new_last_updated_at" , lp .lastUpdatedAt ).
189
+ Time ("new_stabilize_ends_at" , lp .lastUpdatedAt .Add (lp .opts .StabilizeDuration )).
190
+ Msg ("Provider last updated time bumped" )
178
191
}
179
192
180
193
lp .evictTTL (ctx )
@@ -188,20 +201,40 @@ func (lp *leaseProviderImpl) evaluateRequest(ctx context.Context, req *Request)
188
201
189
202
if lp .acquired != nil && ! (pointer .StringDeref (lp .acquired .Status , StatusAcquired ) == StatusFailure ) {
190
203
// Lock already acquired
191
- log .Ctx (ctx ).Debug ().EmbedObject (req ).Msgf ("Lock already acquired (by sha %s, priority %d)" , lp .acquired .HeadSHA , lp .acquired .Priority )
204
+ log .Ctx (ctx ).
205
+ Debug ().
206
+ EmbedObject (req ).
207
+ Msgf ("Lock already acquired (by sha %s, priority %d)" , lp .acquired .HeadSHA , lp .acquired .Priority )
192
208
return req
193
209
}
194
210
// 1st: we reached the time limit -> lastUpdatedAt + StabilizeDuration > now
195
- passedStabilizeDuration := time .Since (* lp .lastUpdatedAt ) >= lp .opts .StabilizeDuration
196
- log .Ctx (ctx ).Debug ().Msg ("Now: " + time .Now ().Format (time .RFC3339 ))
197
- log .Ctx (ctx ).Debug ().EmbedObject (req ).Msgf ("Stabilize duration check: Duration config: %.0fs, Last updated at: %s, Stabilize duration end: %s, Stabilize duration passed: %t" , lp .opts .StabilizeDuration .Seconds (), lp .lastUpdatedAt .Format (time .RFC3339 ), lp .lastUpdatedAt .Add (lp .opts .StabilizeDuration ).Format (time .RFC3339 ), passedStabilizeDuration )
211
+ passedStabilizeDuration := time .Since (lp .lastUpdatedAt ) >= lp .opts .StabilizeDuration
212
+ log .Ctx (ctx ).
213
+ Debug ().
214
+ EmbedObject (req ).
215
+ Float64 ("config_stabilize_duration_sec" , lp .opts .StabilizeDuration .Seconds ()).
216
+ Time ("last_updated_at" , lp .lastUpdatedAt ).
217
+ Time ("stabilize_ends_at" , lp .lastUpdatedAt .Add (lp .opts .StabilizeDuration )).
218
+ Time ("current_time" , time .Now ()).
219
+ Bool ("stabilize_duration_passed" , passedStabilizeDuration ).
220
+ Msg ("Stabilize duration check" )
221
+
198
222
// 2nd: we received all requests and can take a decision
199
- // 3rd: there has been no previous failure
200
223
reachedExpectedRequestCount := len (lp .known ) >= lp .opts .ExpectedRequestCount
201
- log .Ctx (ctx ).Debug ().EmbedObject (req ).Msgf ("Expected request count check: config: %d, actual: %d, expected request count reached: %t" , lp .opts .ExpectedRequestCount , len (lp .known ), reachedExpectedRequestCount )
224
+ log .Ctx (ctx ).
225
+ Debug ().
226
+ EmbedObject (req ).
227
+ Int ("config_expected_request_count" , lp .opts .ExpectedRequestCount ).
228
+ Int ("actual_request_count" , len (lp .known )).
229
+ Bool ("expected_request_count_reached" , reachedExpectedRequestCount ).
230
+ Msg ("Expected request count check" )
202
231
232
+ // 3rd: there has been no previous failure
203
233
if lp .acquired == nil && (! passedStabilizeDuration && ! reachedExpectedRequestCount ) {
204
- log .Ctx (ctx ).Debug ().EmbedObject (req ).Msg ("Stabilize duration has not been met yet, or we're still waiting for more request to register" )
234
+ log .Ctx (ctx ).
235
+ Debug ().
236
+ EmbedObject (req ).
237
+ Msg ("Stabilize duration has not been met yet, or we're still waiting for more request to register" )
205
238
return req
206
239
}
207
240
@@ -217,8 +250,14 @@ func (lp *leaseProviderImpl) evaluateRequest(ctx context.Context, req *Request)
217
250
if req .Priority == maxPriority {
218
251
req .Status = pointer .String (StatusAcquired )
219
252
lp .acquired = req
220
- log .Ctx (ctx ).Debug ().EmbedObject (req ).Msg ("Current lease request has the higher priority. It then acquires the lock" )
221
- log .Ctx (ctx ).Info ().EmbedObject (req ).Msg ("Lock acquired" )
253
+ log .Ctx (ctx ).
254
+ Debug ().
255
+ EmbedObject (req ).
256
+ Msg ("Current lease request has the higher priority. It then acquires the lock" )
257
+ log .Ctx (ctx ).
258
+ Info ().
259
+ EmbedObject (req ).
260
+ Msg ("Lock acquired" )
222
261
}
223
262
224
263
return req
0 commit comments