Skip to content

Commit 4e36efc

Browse files
feat: add logs subscription (#16045)
* feat: logs subscription * fix: address review comments * fix: use processed commitment * fix: sleep before triggering log transaction
1 parent 03b1acd commit 4e36efc

File tree

2 files changed

+158
-1
lines changed

2 files changed

+158
-1
lines changed

src/connection.ts

Lines changed: 124 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1556,6 +1556,54 @@ type RootSubscriptionInfo = {
15561556
subscriptionId: SubscriptionId | null; // null when there's no current server subscription id
15571557
};
15581558

1559+
/**
1560+
* @internal
1561+
*/
1562+
const LogsResult = pick({
1563+
err: TransactionErrorResult,
1564+
logs: array(string()),
1565+
signature: string(),
1566+
});
1567+
1568+
/**
1569+
* Logs result.
1570+
*
1571+
* @typedef {Object} Logs.
1572+
*/
1573+
export type Logs = {
1574+
err: TransactionError | null;
1575+
logs: string[];
1576+
signature: string;
1577+
};
1578+
1579+
/**
1580+
* Expected JSON RPC response for the "logsNotification" message.
1581+
*/
1582+
const LogsNotificationResult = pick({
1583+
result: notificationResultAndContext(LogsResult),
1584+
subscription: number(),
1585+
});
1586+
1587+
/**
1588+
* Filter for log subscriptions.
1589+
*/
1590+
export type LogsFilter = PublicKey | 'all' | 'allWithVotes';
1591+
1592+
/**
1593+
* Callback function for log notifications.
1594+
*/
1595+
export type LogsCallback = (logs: Logs, ctx: Context) => void;
1596+
1597+
/**
1598+
* @private
1599+
*/
1600+
type LogsSubscriptionInfo = {
1601+
callback: LogsCallback;
1602+
filter: LogsFilter;
1603+
subscriptionId: SubscriptionId | null; // null when there's no current server subscription id
1604+
commitment?: Commitment;
1605+
};
1606+
15591607
/**
15601608
* Signature result
15611609
*
@@ -1669,6 +1717,11 @@ export class Connection {
16691717
[id: number]: SlotSubscriptionInfo;
16701718
} = {};
16711719

1720+
/** @internal */ _logsSubscriptionCounter: number = 0;
1721+
/** @internal */ _logsSubscriptions: {
1722+
[id: number]: LogsSubscriptionInfo;
1723+
} = {};
1724+
16721725
/**
16731726
* Establish a JSON RPC connection
16741727
*
@@ -1730,6 +1783,10 @@ export class Connection {
17301783
'rootNotification',
17311784
this._wsOnRootNotification.bind(this),
17321785
);
1786+
this._rpcWebSocket.on(
1787+
'logsNotification',
1788+
this._wsOnLogsNotification.bind(this),
1789+
);
17331790
}
17341791

17351792
/**
@@ -2991,12 +3048,14 @@ export class Connection {
29913048
const slotKeys = Object.keys(this._slotSubscriptions).map(Number);
29923049
const signatureKeys = Object.keys(this._signatureSubscriptions).map(Number);
29933050
const rootKeys = Object.keys(this._rootSubscriptions).map(Number);
3051+
const logsKeys = Object.keys(this._logsSubscriptions).map(Number);
29943052
if (
29953053
accountKeys.length === 0 &&
29963054
programKeys.length === 0 &&
29973055
slotKeys.length === 0 &&
29983056
signatureKeys.length === 0 &&
2999-
rootKeys.length === 0
3057+
rootKeys.length === 0 &&
3058+
logsKeys.length === 0
30003059
) {
30013060
if (this._rpcWebSocketConnected) {
30023061
this._rpcWebSocketConnected = false;
@@ -3053,6 +3112,21 @@ export class Connection {
30533112
const sub = this._rootSubscriptions[id];
30543113
this._subscribe(sub, 'rootSubscribe', []);
30553114
}
3115+
3116+
for (let id of logsKeys) {
3117+
const sub = this._logsSubscriptions[id];
3118+
let filter;
3119+
if (typeof sub.filter === 'object') {
3120+
filter = {mentions: [sub.filter.toString()]};
3121+
} else {
3122+
filter = sub.filter;
3123+
}
3124+
this._subscribe(
3125+
sub,
3126+
'logsSubscribe',
3127+
this._buildArgs([filter], sub.commitment),
3128+
);
3129+
}
30563130
}
30573131

30583132
/**
@@ -3169,6 +3243,55 @@ export class Connection {
31693243
}
31703244
}
31713245

3246+
/**
3247+
* Registers a callback to be invoked whenever logs are emitted.
3248+
*/
3249+
onLogs(
3250+
filter: LogsFilter,
3251+
callback: LogsCallback,
3252+
commitment?: Commitment,
3253+
): number {
3254+
const id = ++this._logsSubscriptionCounter;
3255+
this._logsSubscriptions[id] = {
3256+
filter,
3257+
callback,
3258+
commitment,
3259+
subscriptionId: null,
3260+
};
3261+
this._updateSubscriptions();
3262+
return id;
3263+
}
3264+
3265+
/**
3266+
* Deregister a logs callback.
3267+
*
3268+
* @param id subscription id to deregister.
3269+
*/
3270+
async removeOnLogsListener(id: number): Promise<void> {
3271+
if (!this._logsSubscriptions[id]) {
3272+
throw new Error(`Unknown logs id: ${id}`);
3273+
}
3274+
const subInfo = this._logsSubscriptions[id];
3275+
delete this._logsSubscriptions[id];
3276+
await this._unsubscribe(subInfo, 'logsUnsubscribe');
3277+
this._updateSubscriptions();
3278+
}
3279+
3280+
/**
3281+
* @internal
3282+
*/
3283+
_wsOnLogsNotification(notification: Object) {
3284+
const res = create(notification, LogsNotificationResult);
3285+
const keys = Object.keys(this._logsSubscriptions).map(Number);
3286+
for (let id of keys) {
3287+
const sub = this._logsSubscriptions[id];
3288+
if (sub.subscriptionId === res.subscription) {
3289+
sub.callback(res.result.value, res.result.context);
3290+
return;
3291+
}
3292+
}
3293+
}
3294+
31723295
/**
31733296
* @internal
31743297
*/

test/connection.test.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2268,6 +2268,40 @@ describe('Connection', () => {
22682268
await connection.removeRootChangeListener(subscriptionId);
22692269
});
22702270

2271+
it('logs notification', async () => {
2272+
let listener: number | undefined;
2273+
const owner = new Account();
2274+
const [logsRes, ctx] = await new Promise(resolve => {
2275+
listener = connection.onLogs(
2276+
'all',
2277+
(logs, ctx) => {
2278+
resolve([logs, ctx]);
2279+
},
2280+
'processed',
2281+
);
2282+
// Sleep to allow the subscription time to be setup.
2283+
//
2284+
// Without this, there's a race condition between setting up the log
2285+
// subscription and executing the transaction to trigger the log.
2286+
// If the transaction to trigger the log executes before the
2287+
// subscription is setup, the log event listener never fires and so the
2288+
// promise never resolves.
2289+
sleep(1000).then(() => {
2290+
// Execute a transaction so that we can pickup its logs.
2291+
connection.requestAirdrop(owner.publicKey, 1);
2292+
});
2293+
});
2294+
expect(ctx.slot).to.be.greaterThan(0);
2295+
expect(logsRes.logs.length).to.eq(2);
2296+
expect(logsRes.logs[0]).to.eq(
2297+
'Program 11111111111111111111111111111111 invoke [1]',
2298+
);
2299+
expect(logsRes.logs[1]).to.eq(
2300+
'Program 11111111111111111111111111111111 success',
2301+
);
2302+
await connection.removeOnLogsListener(listener!);
2303+
});
2304+
22712305
it('https request', async () => {
22722306
const connection = new Connection('https://devnet.solana.com');
22732307
const version = await connection.getVersion();

0 commit comments

Comments
 (0)