Skip to content

Commit 11e142d

Browse files
mcamouayazabbas
andauthored
Parallelize account and publisher sync (#43)
* Parallelize account and publisher sync * lint * Try to get concurrency working * lintg * lint * limit concurrency to 50 parallel txs --------- Co-authored-by: Ayaz Abbas <[email protected]>
1 parent edf8567 commit 11e142d

File tree

1 file changed

+34
-2
lines changed

1 file changed

+34
-2
lines changed

program_admin/__init__.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import json
23
import os
34
from pathlib import Path
@@ -47,6 +48,8 @@
4748
"pythtest": "https://api.pythtest.pyth.network",
4849
}
4950

51+
MAX_CONCURRENT_TRANSACTIONS = 50
52+
5053

5154
class ProgramAdmin:
5255
network: Network
@@ -260,6 +263,8 @@ async def sync(
260263

261264
# Sync product/price accounts
262265

266+
product_transactions: List[asyncio.Task[None]] = []
267+
263268
product_updates: bool = False
264269

265270
for jump_symbol, _price_account_map in ref_permissions.items():
@@ -278,12 +283,28 @@ async def sync(
278283

279284
instructions.extend(product_instructions)
280285
if send_transactions:
281-
await self.send_transaction(product_instructions, product_keypairs)
286+
product_transactions.append(
287+
asyncio.create_task(
288+
self.send_transaction(
289+
product_instructions, product_keypairs
290+
)
291+
)
292+
)
293+
294+
if len(product_transactions) == MAX_CONCURRENT_TRANSACTIONS:
295+
await asyncio.gather(*product_transactions)
296+
product_transactions = []
297+
298+
if product_transactions:
299+
await asyncio.gather(*product_transactions)
282300

283301
if product_updates:
284302
await self.refresh_program_accounts()
285303

286304
# Sync publishers
305+
306+
publisher_transactions = []
307+
287308
for jump_symbol, _price_account_map in ref_permissions.items():
288309
ref_product = ref_products[jump_symbol] # type: ignore
289310

@@ -297,7 +318,18 @@ async def sync(
297318
if price_instructions:
298319
instructions.extend(price_instructions)
299320
if send_transactions:
300-
await self.send_transaction(price_instructions, price_keypairs)
321+
publisher_transactions.append(
322+
asyncio.create_task(
323+
self.send_transaction(price_instructions, price_keypairs)
324+
)
325+
)
326+
327+
if len(publisher_transactions) == MAX_CONCURRENT_TRANSACTIONS:
328+
await asyncio.gather(*publisher_transactions)
329+
publisher_transactions = []
330+
331+
if publisher_transactions:
332+
await asyncio.gather(*publisher_transactions)
301333

302334
return instructions
303335

0 commit comments

Comments
 (0)