Skip to content

Adds support for XPENDING IDLE parameter #2822

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ Current package versions:
| [![StackExchange.Redis](https://img.shields.io/nuget/v/StackExchange.Redis.svg)](https://www.nuget.org/packages/StackExchange.Redis/) | [![StackExchange.Redis](https://img.shields.io/nuget/vpre/StackExchange.Redis.svg)](https://www.nuget.org/packages/StackExchange.Redis/) | [![StackExchange.Redis MyGet](https://img.shields.io/myget/stackoverflow/vpre/StackExchange.Redis.svg)](https://www.myget.org/feed/stackoverflow/package/nuget/StackExchange.Redis) |

## Unreleased
No pending unreleased changes.

- Add support for XPENDING Idle time filter ([#2822 by david-brink-talogy](https://github.com/StackExchange/StackExchange.Redis/pull/2822))


## 2.8.22

Expand Down
20 changes: 18 additions & 2 deletions src/StackExchange.Redis/Interfaces/IDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2649,6 +2649,21 @@ IEnumerable<SortedSetEntry> SortedSetScan(
/// <remarks><seealso href="https://redis.io/commands/xpending"/></remarks>
StreamPendingInfo StreamPending(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None);

/// <summary>
/// View information about each pending message.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="groupName">The name of the consumer group.</param>
/// <param name="count">The maximum number of pending messages to return.</param>
/// <param name="consumerName">The consumer name for the pending messages. Pass RedisValue.Null to include pending messages for all consumers.</param>
/// <param name="minId">The minimum ID from which to read the stream of pending messages. Pass null to read from the beginning of the stream.</param>
/// <param name="maxId">The maximum ID to read to within the stream of pending messages. Pass null to read to the end of the stream.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>An instance of <see cref="StreamPendingMessageInfo"/> for each pending message.</returns>
/// <remarks>Equivalent of calling XPENDING key group start-id end-id count consumer-name.</remarks>
/// <remarks><seealso href="https://redis.io/commands/xpending"/></remarks>
StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId, RedisValue? maxId, CommandFlags flags);

/// <summary>
/// View information about each pending message.
/// </summary>
Expand All @@ -2658,11 +2673,12 @@ IEnumerable<SortedSetEntry> SortedSetScan(
/// <param name="consumerName">The consumer name for the pending messages. Pass RedisValue.Null to include pending messages for all consumers.</param>
/// <param name="minId">The minimum ID from which to read the stream of pending messages. The method will default to reading from the beginning of the stream.</param>
/// <param name="maxId">The maximum ID to read to within the stream of pending messages. The method will default to reading to the end of the stream.</param>
/// <param name="minIdleTimeInMs">The minimum idle time threshold for pending messages to be claimed.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>An instance of <see cref="StreamPendingMessageInfo"/> for each pending message.</returns>
/// <remarks>Equivalent of calling XPENDING key group start-id end-id count consumer-name.</remarks>
/// <remarks>Equivalent of calling XPENDING key group IDLE min-idle-time start-id end-id count consumer-name.</remarks>
/// <remarks><seealso href="https://redis.io/commands/xpending"/></remarks>
StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None);
StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to add an overload here, i.e. a second method with different parameters; otherwise, this is a hard binary break - we try very hard not to do that. If the compiler complains about two methods with optional parameters, we can work around that

(yes: technically adding methods to the interface is also problematic, but: it is problematic in different ways, and in reality we don't expect custom implementations of the IDatabase etc APIs)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on this. I'd leave the existing public method signatures as-is, and add new methods with additional required params as needed.

Copy link
Author

@david-brink-talogy david-brink-talogy Jan 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a suggestion on the overloads? This Redis API can satisfy a number of use cases depending on the parameters passed. minIdleTimeInMs can be used with various combinations of consumerName, minId, and maxId. If new overloads do not have defaults, several overloads would likely be needed to satisfy all use cases which complicates the public API.

With this many arguments I'd generally lean toward an class like StreamPendingMessagesArgs, but that seems to run afoul of the existing design.

The PR as it currently exists seemed like a reasonable compromise between backwards compatibility and adhering to the design.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking something like this (but @mgravell and @NickCraver are really the experts):

StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, long minIdleTimeInMs = null, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None);

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgravell, would you like me to amend the PR to restore the existing overload and add a new one that places minIdleTimeInMs before min/maxId?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going to try one last time. @mgravell , @NickCraver, what would you like to see? I'm happy to amend the PR if I can get some direction on the desired arguments.


/// <summary>
/// Read a stream using the given range of IDs.
Expand Down
5 changes: 4 additions & 1 deletion src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,10 @@ IAsyncEnumerable<SortedSetEntry> SortedSetScanAsync(
Task<StreamPendingInfo> StreamPendingAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None);

/// <inheritdoc cref="IDatabase.StreamPendingMessages(RedisKey, RedisValue, int, RedisValue, RedisValue?, RedisValue?, CommandFlags)"/>
Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None);
Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId, RedisValue? maxId, CommandFlags flags);

/// <inheritdoc cref="IDatabase.StreamPendingMessages(RedisKey, RedisValue, int, RedisValue, RedisValue?, RedisValue?, long?, CommandFlags)"/>
Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None);

/// <inheritdoc cref="IDatabase.StreamRange(RedisKey, RedisValue?, RedisValue?, int?, Order, CommandFlags)"/>
Task<StreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None);
Expand Down
5 changes: 4 additions & 1 deletion src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -615,9 +615,12 @@ public Task<bool> StreamDeleteConsumerGroupAsync(RedisKey key, RedisValue groupN
public Task<StreamPendingInfo> StreamPendingAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) =>
Inner.StreamPendingAsync(ToInner(key), groupName, flags);

public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) =>
public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId, RedisValue? maxId, CommandFlags flags) =>
Inner.StreamPendingMessagesAsync(ToInner(key), groupName, count, consumerName, minId, maxId, flags);

public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None) =>
Inner.StreamPendingMessagesAsync(ToInner(key), groupName, count, consumerName, minId, maxId, minIdleTimeInMs, flags);

public Task<StreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None) =>
Inner.StreamRangeAsync(ToInner(key), minId, maxId, count, messageOrder, flags);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,9 +597,12 @@ public bool StreamDeleteConsumerGroup(RedisKey key, RedisValue groupName, Comman
public StreamPendingInfo StreamPending(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) =>
Inner.StreamPending(ToInner(key), groupName, flags);

public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) =>
public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId, RedisValue? maxId, CommandFlags flags) =>
Inner.StreamPendingMessages(ToInner(key), groupName, count, consumerName, minId, maxId, flags);

public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None) =>
Inner.StreamPendingMessages(ToInner(key), groupName, count, consumerName, minId, maxId, minIdleTimeInMs, flags);

public StreamEntry[] StreamRange(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None) =>
Inner.StreamRange(ToInner(key), minId, maxId, count, messageOrder, flags);

Expand Down
6 changes: 4 additions & 2 deletions src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,8 @@ StackExchange.Redis.IDatabase.StreamGroupInfo(StackExchange.Redis.RedisKey key,
StackExchange.Redis.IDatabase.StreamInfo(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamInfo
StackExchange.Redis.IDatabase.StreamLength(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long
StackExchange.Redis.IDatabase.StreamPending(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamPendingInfo
StackExchange.Redis.IDatabase.StreamPendingMessages(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamPendingMessageInfo[]!
StackExchange.Redis.IDatabase.StreamPendingMessages(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId, StackExchange.Redis.RedisValue? maxId, StackExchange.Redis.CommandFlags flags) -> StackExchange.Redis.StreamPendingMessageInfo[]!
StackExchange.Redis.IDatabase.StreamPendingMessages(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, long? minIdleTimeInMs = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamPendingMessageInfo[]!
StackExchange.Redis.IDatabase.StreamRange(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, int? count = null, StackExchange.Redis.Order messageOrder = StackExchange.Redis.Order.Ascending, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamEntry[]!
StackExchange.Redis.IDatabase.StreamRead(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue position, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamEntry[]!
StackExchange.Redis.IDatabase.StreamRead(StackExchange.Redis.StreamPosition[]! streamPositions, int? countPerStream = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisStream[]!
Expand Down Expand Up @@ -971,7 +972,8 @@ StackExchange.Redis.IDatabaseAsync.StreamGroupInfoAsync(StackExchange.Redis.Redi
StackExchange.Redis.IDatabaseAsync.StreamInfoAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamInfo>!
StackExchange.Redis.IDatabaseAsync.StreamLengthAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<long>!
StackExchange.Redis.IDatabaseAsync.StreamPendingAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamPendingInfo>!
StackExchange.Redis.IDatabaseAsync.StreamPendingMessagesAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamPendingMessageInfo[]!>!
StackExchange.Redis.IDatabaseAsync.StreamPendingMessagesAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId, StackExchange.Redis.RedisValue? maxId, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamPendingMessageInfo[]!>!
StackExchange.Redis.IDatabaseAsync.StreamPendingMessagesAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, long? minIdleTimeInMs = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamPendingMessageInfo[]!>!
StackExchange.Redis.IDatabaseAsync.StreamRangeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, int? count = null, StackExchange.Redis.Order messageOrder = StackExchange.Redis.Order.Ascending, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamEntry[]!>!
StackExchange.Redis.IDatabaseAsync.StreamReadAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue position, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamEntry[]!>!
StackExchange.Redis.IDatabaseAsync.StreamReadAsync(StackExchange.Redis.StreamPosition[]! streamPositions, int? countPerStream = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.RedisStream[]!>!
Expand Down
45 changes: 35 additions & 10 deletions src/StackExchange.Redis/RedisDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2803,7 +2803,10 @@ public Task<StreamPendingInfo> StreamPendingAsync(RedisKey key, RedisValue group
return ExecuteAsync(msg, ResultProcessor.StreamPendingInfo);
}

public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None)
public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) =>
StreamPendingMessages(key, groupName, count, consumerName, minId, maxId, null, flags);

public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamPendingMessagesMessage(
key,
Expand All @@ -2812,12 +2815,16 @@ public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue
maxId,
count,
consumerName,
minIdleTimeInMs,
flags);

return ExecuteSync(msg, ResultProcessor.StreamPendingMessages, defaultValue: Array.Empty<StreamPendingMessageInfo>());
}

public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None)
public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) =>
StreamPendingMessagesAsync(key, groupName, count, consumerName, minId, maxId, null, flags);

public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamPendingMessagesMessage(
key,
Expand All @@ -2826,6 +2833,7 @@ public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key,
maxId,
count,
consumerName,
minIdleTimeInMs,
flags);

return ExecuteAsync(msg, ResultProcessor.StreamPendingMessages, defaultValue: Array.Empty<StreamPendingMessageInfo>());
Expand Down Expand Up @@ -4300,9 +4308,9 @@ private Message GetStreamCreateConsumerGroupMessage(RedisKey key, RedisValue gro
/// Gets a message for <see href="https://redis.io/commands/xpending/"/>.
/// </summary>
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
private Message GetStreamPendingMessagesMessage(RedisKey key, RedisValue groupName, RedisValue? minId, RedisValue? maxId, int count, RedisValue consumerName, CommandFlags flags)
private Message GetStreamPendingMessagesMessage(RedisKey key, RedisValue groupName, RedisValue? minId, RedisValue? maxId, int count, RedisValue consumerName, long? minIdleTimeInMs, CommandFlags flags)
{
// > XPENDING mystream mygroup - + 10 [consumer name]
// > XPENDING mystream mygroup [IDLE min-idle-time] - + 10 [consumer name]
// 1) 1) 1526569498055 - 0
// 2) "Bob"
// 3) (integer)74170458
Expand All @@ -4316,16 +4324,33 @@ private Message GetStreamPendingMessagesMessage(RedisKey key, RedisValue groupNa
throw new ArgumentOutOfRangeException(nameof(count), "count must be greater than 0.");
}

var values = new RedisValue[consumerName == RedisValue.Null ? 4 : 5];
var valuesLength = 4;
if (consumerName != RedisValue.Null)
{
valuesLength++;
}

values[0] = groupName;
values[1] = minId ?? StreamConstants.ReadMinValue;
values[2] = maxId ?? StreamConstants.ReadMaxValue;
values[3] = count;
if (minIdleTimeInMs is not null)
{
valuesLength += 2;
}
var values = new RedisValue[valuesLength];

var offset = 0;

values[offset++] = groupName;
if (minIdleTimeInMs is not null)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

{
values[offset++] = "IDLE";
values[offset++] = minIdleTimeInMs;
}
values[offset++] = minId ?? StreamConstants.ReadMinValue;
values[offset++] = maxId ?? StreamConstants.ReadMaxValue;
values[offset++] = count;

if (consumerName != RedisValue.Null)
{
values[4] = consumerName;
values[offset++] = consumerName;
}

return Message.Create(
Expand Down
Loading
Loading