Skip to content

Commit f423be8

Browse files
committed
Add redis topic prefixing
1 parent 86773a7 commit f423be8

File tree

2 files changed

+85
-3
lines changed

2 files changed

+85
-3
lines changed

src/HotChocolate/Core/src/Subscriptions.Redis/RedisPubSub.cs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ internal sealed class RedisPubSub : DefaultPubSub
1313
private readonly string _completed;
1414
private readonly int _topicBufferCapacity;
1515
private readonly TopicBufferFullMode _topicBufferFullMode;
16+
private readonly string? _topicPrefix;
17+
1618
public RedisPubSub(
1719
IConnectionMultiplexer connection,
1820
IMessageSerializer serializer,
@@ -25,6 +27,7 @@ public RedisPubSub(
2527
_topicBufferCapacity = options.TopicBufferCapacity;
2628
_topicBufferFullMode = options.TopicBufferFullMode;
2729
_completed = serializer.CompleteMessage;
30+
_topicPrefix = options.TopicPrefix;
2831
}
2932

3033
protected override async ValueTask OnSendAsync<TMessage>(
@@ -37,26 +40,36 @@ protected override async ValueTask OnSendAsync<TMessage>(
3740
// The object returned from GetSubscriber is a cheap pass-thru object that does not need
3841
// to be stored.
3942
var subscriber = _connection.GetSubscriber();
40-
await subscriber.PublishAsync(formattedTopic, serialized).ConfigureAwait(false);
43+
await subscriber.PublishAsync(GetPrefixedTopic(formattedTopic), serialized).ConfigureAwait(false);
4144
}
4245

4346
protected override async ValueTask OnCompleteAsync(string formattedTopic)
4447
{
4548
// The object returned from GetSubscriber is a cheap pass-thru object that does not need
4649
// to be stored.
4750
var subscriber = _connection.GetSubscriber();
48-
await subscriber.PublishAsync(formattedTopic, _completed).ConfigureAwait(false);
51+
await subscriber.PublishAsync(GetPrefixedTopic(formattedTopic), _completed).ConfigureAwait(false);
4952
}
5053

5154
protected override DefaultTopic<TMessage> OnCreateTopic<TMessage>(
5255
string formattedTopic,
5356
int? bufferCapacity,
5457
TopicBufferFullMode? bufferFullMode)
5558
=> new RedisTopic<TMessage>(
56-
formattedTopic,
59+
GetPrefixedTopic(formattedTopic),
5760
_connection,
5861
_serializer,
5962
bufferCapacity ?? _topicBufferCapacity,
6063
bufferFullMode ?? _topicBufferFullMode,
6164
DiagnosticEvents);
65+
66+
private string GetPrefixedTopic(string topic)
67+
{
68+
if (string.IsNullOrWhiteSpace(_topicPrefix))
69+
{
70+
return topic;
71+
}
72+
73+
return $"{_topicPrefix}{topic}";
74+
}
6275
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
using HotChocolate.Execution;
2+
using HotChocolate.Execution.Configuration;
3+
using Microsoft.Extensions.DependencyInjection;
4+
using Squadron;
5+
using StackExchange.Redis;
6+
using Xunit.Abstractions;
7+
8+
namespace HotChocolate.Subscriptions.Redis;
9+
10+
public class RedisTopicPrefixIntegrationTests(RedisResource redisResource, ITestOutputHelper output)
11+
: SubscriptionIntegrationTestBase(output), IClassFixture<RedisResource>
12+
{
13+
private const string TopicPrefix = "prefix:";
14+
15+
[Fact]
16+
public override Task Subscribe_Infer_Topic()
17+
=> base.Subscribe_Infer_Topic();
18+
19+
[Fact]
20+
public override Task Subscribe_Static_Topic()
21+
=> base.Subscribe_Static_Topic();
22+
23+
[Fact]
24+
public override Task Subscribe_Topic_With_Arguments()
25+
=> base.Subscribe_Topic_With_Arguments();
26+
27+
[Fact]
28+
public override Task Subscribe_Topic_With_Arguments_2_Subscriber()
29+
=> base.Subscribe_Topic_With_Arguments_2_Subscriber();
30+
31+
[Fact]
32+
public override Task Subscribe_Topic_With_Arguments_2_Topics()
33+
=> base.Subscribe_Topic_With_Arguments_2_Topics();
34+
35+
[Fact]
36+
public override Task Subscribe_Topic_With_2_Arguments()
37+
=> base.Subscribe_Topic_With_2_Arguments();
38+
39+
[Fact]
40+
public override Task Subscribe_And_Complete_Topic()
41+
=> base.Subscribe_And_Complete_Topic();
42+
43+
[Fact]
44+
public override Task Subscribe_And_Complete_Topic_With_ValueTypeMessage()
45+
=> base.Subscribe_And_Complete_Topic_With_ValueTypeMessage();
46+
47+
[Fact]
48+
public async Task Subscribe_Should_Create_Channel_With_Prefix()
49+
{
50+
using var cts = new CancellationTokenSource(Timeout);
51+
await using var services = CreateServer<Subscription>();
52+
53+
await using var result = await services.ExecuteRequestAsync(
54+
"subscription { onMessage }",
55+
cancellationToken: cts.Token);
56+
57+
var activeChannels = await GetActiveChannelsAsync();
58+
59+
Assert.Contains(activeChannels, channel => channel.ToString()!.StartsWith(TopicPrefix));
60+
}
61+
62+
private async Task<RedisResult[]> GetActiveChannelsAsync()
63+
{
64+
return (RedisResult[])(await redisResource.GetConnection().GetDatabase().ExecuteAsync("PUBSUB", "CHANNELS"))!;
65+
}
66+
67+
protected override void ConfigurePubSub(IRequestExecutorBuilder graphqlBuilder)
68+
=> graphqlBuilder.AddRedisSubscriptions(_ => redisResource.GetConnection(), new SubscriptionOptions { TopicPrefix = TopicPrefix });
69+
}

0 commit comments

Comments
 (0)