Skip to content

Commit e15a4a0

Browse files
authored
[Docs] Dispose registry which contains rate limiters (#2578)
* Shift and lift * Add missing header * Add runnable to wordlist * Optimize sample code * Optimize sample code * Apply suggestions from code review
1 parent 68a8cd6 commit e15a4a0

File tree

3 files changed

+173
-0
lines changed

3 files changed

+173
-0
lines changed

.github/wordlist.txt

+1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ rethrow
5454
rethrows
5555
retryable
5656
reusability
57+
runnable
5758
runtime
5859
saas
5960
sdk

docs/pipelines/resilience-pipeline-registry.md

+90
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,96 @@ Both `AddReloadToken(...)` and `OnPipelineDisposed(...)` are used to implement t
207207

208208
Resource disposal occurs when the registry is disposed of or when the pipeline undergoes changes due to [dynamic reloads](#dynamic-reloads). Upon disposal, all callbacks registered through the `OnPipelineDisposed` method are invoked. However, actual resource disposal is deferred until the pipeline completes all outgoing executions. It's vital to note that dispose callbacks are associated only with a specific instance of the pipeline.
209209

210+
### Disposal of encapsulated rate limiters
211+
212+
If you are using custom rate limiters and want to dispose them on pipeline reload or when a registry is disposed, then you should use the `OnPipelineDisposed` callback.
213+
214+
Consider the following runnable example. It creates a registry with a concurrency strategy and a chained rate limiter strategy (which contains multiple rate limiters):
215+
216+
<!-- snippet: registry-ratelimiter-dispose -->
217+
```cs
218+
public static class Program
219+
{
220+
public static void Main()
221+
{
222+
using var registryAdapter = new PipelineRegistryAdapter();
223+
registryAdapter.GetOrCreateResiliencePipeline("Pipeline foo", 1, 10, 100, 1000);
224+
registryAdapter.GetOrCreateResiliencePipeline("Pipeline bar", 2, 20, 200, 2000);
225+
}
226+
}
227+
228+
public sealed class PipelineRegistryAdapter : IDisposable
229+
{
230+
private readonly ResiliencePipelineRegistry<string> _resiliencePipelineRegistry = new();
231+
private bool _disposed;
232+
233+
public void Dispose()
234+
{
235+
if (!_disposed)
236+
{
237+
_resiliencePipelineRegistry.Dispose();
238+
_disposed = true;
239+
}
240+
}
241+
242+
private static PartitionedRateLimiter<ResilienceContext> CreateConcurrencyLimiter(string partitionKey, int permitLimit) =>
243+
PartitionedRateLimiter.Create<ResilienceContext, string>(context =>
244+
RateLimitPartition.GetConcurrencyLimiter(
245+
partitionKey: partitionKey,
246+
factory: partitionKey => new ConcurrencyLimiterOptions { PermitLimit = permitLimit, QueueLimit = 0 }));
247+
248+
private static PartitionedRateLimiter<ResilienceContext> CreateFixedWindowLimiter(string partitionKey, int permitLimit, TimeSpan window) =>
249+
PartitionedRateLimiter.Create<ResilienceContext, string>(context =>
250+
RateLimitPartition.GetFixedWindowLimiter(
251+
partitionKey: partitionKey,
252+
factory: partitionKey => new FixedWindowRateLimiterOptions { PermitLimit = permitLimit, QueueLimit = 0, Window = window }));
253+
254+
public ResiliencePipeline GetOrCreateResiliencePipeline(string partitionKey, int maximumConcurrentThreads, int sendLimitPerSecond, int sendLimitPerHour, int sendLimitPerDay)
255+
{
256+
return _resiliencePipelineRegistry.GetOrAddPipeline(partitionKey, (builder, context) =>
257+
{
258+
PartitionedRateLimiter<ResilienceContext>? threadLimiter = null;
259+
PartitionedRateLimiter<ResilienceContext>? requestLimiter = null;
260+
261+
// outer strategy: limit threads
262+
builder.AddRateLimiter(new RateLimiterStrategyOptions
263+
{
264+
RateLimiter = args =>
265+
{
266+
threadLimiter = CreateConcurrencyLimiter(partitionKey, maximumConcurrentThreads);
267+
return threadLimiter.AcquireAsync(args.Context, permitCount: 1, args.Context.CancellationToken);
268+
}
269+
});
270+
271+
// inner strategy: limit requests (by second, hour, day)
272+
builder.AddRateLimiter(new RateLimiterStrategyOptions
273+
{
274+
RateLimiter = args =>
275+
{
276+
PartitionedRateLimiter<ResilienceContext>[] limiters = [
277+
CreateFixedWindowLimiter(partitionKey, sendLimitPerSecond, TimeSpan.FromSeconds(1)),
278+
CreateFixedWindowLimiter(partitionKey, sendLimitPerHour, TimeSpan.FromHours(1)),
279+
CreateFixedWindowLimiter(partitionKey, sendLimitPerDay, TimeSpan.FromDays(1)),
280+
];
281+
requestLimiter = PartitionedRateLimiter.CreateChained(limiters);
282+
return requestLimiter.AcquireAsync(args.Context, permitCount: 1, args.Context.CancellationToken);
283+
}
284+
});
285+
286+
// unlike other strategies, rate limiters disposed manually
287+
context.OnPipelineDisposed(() =>
288+
{
289+
threadLimiter?.Dispose();
290+
requestLimiter?.Dispose();
291+
});
292+
});
293+
}
294+
}
295+
```
296+
<!-- endSnippet -->
297+
298+
Notice how the rate limiters are disposed manually in the `OnPipelineDisposed` callback.
299+
210300
## Complex registry keys
211301

212302
Though the pipeline registry supports complex keys, we suggest you use them when defining pipelines with the [Dependency Injection](../advanced/dependency-injection.md) (DI) containers. For further information, see the [section on complex pipeline keys](../advanced/dependency-injection.md#complex-pipeline-keys).

src/Snippets/Docs/ResiliencePipelineRegistry.cs

+82
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
using System.Net.Http;
2+
using System.Threading.RateLimiting;
3+
using Polly.RateLimiting;
24
using Polly.Registry;
35
using Polly.Retry;
46

@@ -171,5 +173,85 @@ private static void RegisterCancellationSource(CancellationTokenSource cancellat
171173
{
172174
// Register the source
173175
}
176+
177+
#region registry-ratelimiter-dispose
178+
public static class Program
179+
{
180+
public static void Main()
181+
{
182+
using var registryAdapter = new PipelineRegistryAdapter();
183+
registryAdapter.GetOrCreateResiliencePipeline("Pipeline foo", 1, 10, 100, 1000);
184+
registryAdapter.GetOrCreateResiliencePipeline("Pipeline bar", 2, 20, 200, 2000);
185+
}
186+
}
187+
188+
public sealed class PipelineRegistryAdapter : IDisposable
189+
{
190+
private readonly ResiliencePipelineRegistry<string> _resiliencePipelineRegistry = new();
191+
private bool _disposed;
192+
193+
public void Dispose()
194+
{
195+
if (!_disposed)
196+
{
197+
_resiliencePipelineRegistry.Dispose();
198+
_disposed = true;
199+
}
200+
}
201+
202+
private static PartitionedRateLimiter<ResilienceContext> CreateConcurrencyLimiter(string partitionKey, int permitLimit) =>
203+
PartitionedRateLimiter.Create<ResilienceContext, string>(context =>
204+
RateLimitPartition.GetConcurrencyLimiter(
205+
partitionKey: partitionKey,
206+
factory: partitionKey => new ConcurrencyLimiterOptions { PermitLimit = permitLimit, QueueLimit = 0 }));
207+
208+
private static PartitionedRateLimiter<ResilienceContext> CreateFixedWindowLimiter(string partitionKey, int permitLimit, TimeSpan window) =>
209+
PartitionedRateLimiter.Create<ResilienceContext, string>(context =>
210+
RateLimitPartition.GetFixedWindowLimiter(
211+
partitionKey: partitionKey,
212+
factory: partitionKey => new FixedWindowRateLimiterOptions { PermitLimit = permitLimit, QueueLimit = 0, Window = window }));
213+
214+
public ResiliencePipeline GetOrCreateResiliencePipeline(string partitionKey, int maximumConcurrentThreads, int sendLimitPerSecond, int sendLimitPerHour, int sendLimitPerDay)
215+
{
216+
return _resiliencePipelineRegistry.GetOrAddPipeline(partitionKey, (builder, context) =>
217+
{
218+
PartitionedRateLimiter<ResilienceContext>? threadLimiter = null;
219+
PartitionedRateLimiter<ResilienceContext>? requestLimiter = null;
220+
221+
// outer strategy: limit threads
222+
builder.AddRateLimiter(new RateLimiterStrategyOptions
223+
{
224+
RateLimiter = args =>
225+
{
226+
threadLimiter = CreateConcurrencyLimiter(partitionKey, maximumConcurrentThreads);
227+
return threadLimiter.AcquireAsync(args.Context, permitCount: 1, args.Context.CancellationToken);
228+
}
229+
});
230+
231+
// inner strategy: limit requests (by second, hour, day)
232+
builder.AddRateLimiter(new RateLimiterStrategyOptions
233+
{
234+
RateLimiter = args =>
235+
{
236+
PartitionedRateLimiter<ResilienceContext>[] limiters = [
237+
CreateFixedWindowLimiter(partitionKey, sendLimitPerSecond, TimeSpan.FromSeconds(1)),
238+
CreateFixedWindowLimiter(partitionKey, sendLimitPerHour, TimeSpan.FromHours(1)),
239+
CreateFixedWindowLimiter(partitionKey, sendLimitPerDay, TimeSpan.FromDays(1)),
240+
];
241+
requestLimiter = PartitionedRateLimiter.CreateChained(limiters);
242+
return requestLimiter.AcquireAsync(args.Context, permitCount: 1, args.Context.CancellationToken);
243+
}
244+
});
245+
246+
// unlike other strategies, rate limiters disposed manually
247+
context.OnPipelineDisposed(() =>
248+
{
249+
threadLimiter?.Dispose();
250+
requestLimiter?.Dispose();
251+
});
252+
});
253+
}
254+
}
255+
#endregion
174256
}
175257

0 commit comments

Comments
 (0)