-
Hi, thanks for awesome piece of software. I have a problem with consuming messages on demand by batches. I'm unsure if it's a bug in nats-io libs or in my code. In documentation of jetstream API it's said that it's OK to use
However, I've found this way to be extremely slow compared to using Details I need to consume messages on demand by batches. This means that I want to get all available messages, but not more than N. This also means that simply writing That being said, I have an implementation based on Implementation based on consumer.next()this.logger.debug("Listening persistent batch", { target });
const { entity, id, op, modifier } = target;
const natsSubject = BrokerUnit.#targetToNatsSubject(target, "EVENT");
const natsConsumerName = [scope, "LISTEN", entity, id, op, modifier]
.filter((v) => v && v !== "*")
.join("_");
const natsConsumer = await this.#ensureNatsConsumer({
streamName: entity,
consumerName: natsConsumerName,
consumerConfig: {
durable_name: natsConsumerName,
deliver_group: natsConsumerName,
ack_policy: JetStreamAckPolicy.Explicit,
ack_wait: 3_000_000_000,
max_ack_pending: -1,
max_deliver: BrokerUnit.persistentMessageDropThreshold + 1,
filter_subject: natsSubject,
},
});
const handleJetStreamSubscription = async () => {
using jetStreamMessagePool = new JetStreamMessagePool<T>();
while (
!this.#natsConnection.isClosed() &&
!this.#natsConnection.isDraining() &&
!signal?.aborted
) {
let jetStreamMessage: JetStreamMessage | undefined;
try {
jetStreamMessage = await natsConsumer.next({
expires: jetStreamMessagePool.size ? 1_000 : 30_000,
}) || undefined;
} catch (e) {
this.logger.error(e, { scope });
}
if (jetStreamMessage) {
const jetStreamMessagePoolItem = {
jetStreamMessage,
target: BrokerUnit.#natsSubjectToTarget(jetStreamMessage.subject),
data: BrokerUnit.#natsDataToData(jetStreamMessage.data, raw) as T,
headers: BrokerUnit.#natsHeadersToHeaders(jetStreamMessage.headers),
};
if (
jetStreamMessage.redelivered &&
jetStreamMessage.info.redeliveryCount >=
BrokerUnit.persistentMessageDropThreshold
) {
this.logger.error(
`Dropping JetStream message #${jetStreamMessage.seq}` +
` as it was redelivered` +
` ${BrokerUnit.persistentMessageDropThreshold} times`,
{ scope, target: jetStreamMessagePoolItem.target },
);
jetStreamMessage.ack();
continue;
}
if (jetStreamMessagePool.has(jetStreamMessage.seq)) {
this.logger.warn(
`JetStream message #${jetStreamMessage.seq} is already in active batch`,
{
scope,
target: jetStreamMessagePoolItem.target,
},
);
continue;
}
jetStreamMessagePool.push(jetStreamMessagePoolItem);
}
console.log("jetStreamMessagePool.size", jetStreamMessagePool.size);
if (
jetStreamMessagePool.size &&
(
jetStreamMessagePool.size >=
BrokerUnit.persistentMessageBatchSizeMax ||
!jetStreamMessage ||
!jetStreamMessage.info.pending
)
) {
try {
this.logger.debug(
`Processing persistent batch of size ${jetStreamMessagePool.size}`,
{ scope },
);
const { errorIndexes = [] } =
await process?.(jetStreamMessagePool.batch) ?? {};
jetStreamMessagePool.ack(errorIndexes);
} catch (e) {
this.logger.error(`Unable to process persistent batch`, { scope });
this.logger.error(e, { scope });
} finally {
jetStreamMessagePool.clear();
}
}
}
};
handleJetStreamSubscription(); It works correctly, but slow. I also have an implementation based on Implementation based on consumer.fetch()this.logger.debug("Listening persistent batch", { target });
const { entity, id, op, modifier } = target;
const natsSubject = BrokerUnit.#targetToNatsSubject(target, "EVENT");
const natsConsumerName = [scope, "LISTEN", entity, id, op, modifier]
.filter((v) => v && v !== "*")
.join("_");
const natsConsumer = await this.#ensureNatsConsumer({
streamName: entity,
consumerName: natsConsumerName,
consumerConfig: {
durable_name: natsConsumerName,
deliver_group: natsConsumerName,
ack_policy: JetStreamAckPolicy.Explicit,
ack_wait: 3_000_000_000,
max_ack_pending: -1,
max_deliver: BrokerUnit.persistentMessageDropThreshold + 1,
filter_subject: natsSubject,
},
});
const handleJetStreamSubscription = async () => {
using jetStreamMessagePool = new JetStreamMessagePool<T>();
const isAborted = () => (
this.#natsConnection.isClosed() ||
this.#natsConnection.isDraining() ||
signal?.aborted
);
while (!isAborted()) {
// delay(0);
try {
const reader = await natsConsumer
.fetch({ max_messages: 1_000 }) || undefined;
for await (const jetStreamMessage of reader) {
if (isAborted()) {
break;
}
const jetStreamMessagePoolItem = {
jetStreamMessage,
target: BrokerUnit
.#natsSubjectToTarget(jetStreamMessage.subject),
data: BrokerUnit
.#natsDataToData(jetStreamMessage.data, raw) as T,
headers: BrokerUnit
.#natsHeadersToHeaders(jetStreamMessage.headers),
};
if (
jetStreamMessage.redelivered &&
jetStreamMessage.info.deliveryCount >=
BrokerUnit.persistentMessageDropThreshold
) {
this.logger.error(
`Dropping JetStream message #${jetStreamMessage.seq}` +
` as it was redelivered` +
` ${BrokerUnit.persistentMessageDropThreshold} times`,
{ scope, target: jetStreamMessagePoolItem.target },
);
jetStreamMessage.ack();
continue;
}
if (jetStreamMessagePool.has(jetStreamMessage.seq)) {
this.logger.warn(
`JetStream message #${jetStreamMessage.seq} is already in active batch`,
{
scope,
target: jetStreamMessagePoolItem.target,
},
);
continue;
}
jetStreamMessagePool.push(jetStreamMessagePoolItem);
if (
jetStreamMessagePool.size &&
(
jetStreamMessagePool.size >=
BrokerUnit.persistentMessageBatchSizeMax ||
!jetStreamMessage.info.pending ||
!reader.getPending()
)
) {
try {
this.logger.debug(
`Processing persistent batch of size ${jetStreamMessagePool.size}`,
{ scope },
);
const { errorIndexes = [] } =
await process?.(jetStreamMessagePool.batch) ?? {};
jetStreamMessagePool.ack(errorIndexes);
} catch (e) {
this.logger
.error(`Unable to process persistent batch`, { scope });
this.logger.error(e, { scope });
} finally {
jetStreamMessagePool.clear();
}
}
}
// ?
reader.stop();
} catch (e) {
this.logger.error(e, { scope });
}
}
};
handleJetStreamSubscription(); Implementation of JetStreamMessagePoolThere's no need of reading this actually. I just wanted to show that pool of messages sends "working" heartbeats and is being cleared properly. interface JetStreamMessagePoolItem<T> {
jetStreamMessage: JetStreamMessage;
target: Target;
data: T;
headers: Record<string, string>;
}
class JetStreamMessagePool<T> {
get batch(): Message<T>[] {
return this.#items.map((v) => ({
target: v.target,
data: v.data,
headers: v.headers,
}));
}
get size(): number {
return this.#items.length;
}
#items: JetStreamMessagePoolItem<T>[] = [];
#seqSet = new Set<number>();
#workingIntervalIdSet = new Set<number>();
has(seq: number) {
return this.#seqSet.has(seq);
}
push(item: JetStreamMessagePoolItem<T>) {
if (this.#seqSet.has(item.jetStreamMessage.seq)) {
return;
}
const workingIntervalId = setInterval(
() => item.jetStreamMessage.working(),
1_000,
);
this.#workingIntervalIdSet.add(workingIntervalId);
this.#items.push(item);
}
ack(errorIndexes: number[]) {
this.#items
.filter((_, i) => !errorIndexes.includes(i))
.map((v) => v.jetStreamMessage.ack());
}
clear() {
this.#items = [];
this.#seqSet.clear();
for (const id of this.#workingIntervalIdSet.values()) {
clearInterval(id);
}
this.#workingIntervalIdSet.clear();
}
[Symbol.dispose]() {
this.clear();
}
} Is there anything obvious to fix? Also, why is I see the following in the docs:
The problem is that if I close reader by breaking async loop or calling |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
The correct use for you use is Here's something that does what you are asking
|
Beta Was this translation helpful? Give feedback.
The correct use for you use is
fetch()
- Fetch will start giving you messages instantly and will wait for additional messages to arrive if you asked for more than it is available - the reason for the wait is that otherwise you'll continue to ask the server for more messages, when it has none. The mitigating case for you is that want to buffer, but bail out early. If you look at the num pending you can decide to continue or stop your loop. If you don't get any messages there's nothing for you to process, so the call will silently wait for messages.Here's something that does what you are asking