From 9efc689d1acfa6d99e599e7caddb72d8541db436 Mon Sep 17 00:00:00 2001 From: mcamou Date: Wed, 14 Aug 2024 12:39:11 +0200 Subject: [PATCH 1/6] Parallelize account and publisher sync --- program_admin/__init__.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/program_admin/__init__.py b/program_admin/__init__.py index 36b62a2..02f92c5 100644 --- a/program_admin/__init__.py +++ b/program_admin/__init__.py @@ -1,7 +1,9 @@ +from asyncio import Future +import asyncio import json import os from pathlib import Path -from typing import Dict, List, Literal, Optional, Tuple +from typing import Any, Coroutine, Dict, List, Literal, Optional, Tuple from loguru import logger from solana import system_program @@ -260,6 +262,8 @@ async def sync( # Sync product/price accounts + transactions: list[Coroutine[Any, Any, None]] = [] + product_updates: bool = False for jump_symbol, _price_account_map in ref_permissions.items(): @@ -278,12 +282,17 @@ async def sync( instructions.extend(product_instructions) if send_transactions: - await self.send_transaction(product_instructions, product_keypairs) + transactions.append(self.send_transaction(product_instructions, product_keypairs)) + + await asyncio.gather(*transactions) if product_updates: await self.refresh_program_accounts() # Sync publishers + + transactions = [] + for jump_symbol, _price_account_map in ref_permissions.items(): ref_product = ref_products[jump_symbol] # type: ignore @@ -297,7 +306,9 @@ async def sync( if price_instructions: instructions.extend(price_instructions) if send_transactions: - await self.send_transaction(price_instructions, price_keypairs) + transactions.append(self.send_transaction(price_instructions, price_keypairs)) + + await asyncio.gather(*transactions) return instructions From ee1beffe31ac9eb8fa5de1938852dd1663e93dd7 Mon Sep 17 00:00:00 2001 From: mcamou Date: Wed, 14 Aug 2024 12:44:19 +0200 Subject: [PATCH 2/6] lint --- program_admin/__init__.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/program_admin/__init__.py b/program_admin/__init__.py index 02f92c5..6176933 100644 --- a/program_admin/__init__.py +++ b/program_admin/__init__.py @@ -1,4 +1,3 @@ -from asyncio import Future import asyncio import json import os @@ -282,7 +281,9 @@ async def sync( instructions.extend(product_instructions) if send_transactions: - transactions.append(self.send_transaction(product_instructions, product_keypairs)) + transactions.append( + self.send_transaction(product_instructions, product_keypairs) + ) await asyncio.gather(*transactions) @@ -306,7 +307,9 @@ async def sync( if price_instructions: instructions.extend(price_instructions) if send_transactions: - transactions.append(self.send_transaction(price_instructions, price_keypairs)) + transactions.append( + self.send_transaction(price_instructions, price_keypairs) + ) await asyncio.gather(*transactions) From 16c64c674c6c644c56af25f2fc10dcbaeece076b Mon Sep 17 00:00:00 2001 From: mcamou Date: Wed, 14 Aug 2024 18:30:22 +0200 Subject: [PATCH 3/6] Try to get concurrency working --- program_admin/__init__.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/program_admin/__init__.py b/program_admin/__init__.py index 6176933..9611bc6 100644 --- a/program_admin/__init__.py +++ b/program_admin/__init__.py @@ -261,7 +261,7 @@ async def sync( # Sync product/price accounts - transactions: list[Coroutine[Any, Any, None]] = [] + transactions: List[asyncio.Task[None]] = [] product_updates: bool = False @@ -282,7 +282,9 @@ async def sync( instructions.extend(product_instructions) if send_transactions: transactions.append( - self.send_transaction(product_instructions, product_keypairs) + asyncio.create_task( + self.send_transaction(product_instructions, product_keypairs) + ) ) await asyncio.gather(*transactions) @@ -308,7 +310,9 @@ async def sync( instructions.extend(price_instructions) if send_transactions: transactions.append( - self.send_transaction(price_instructions, price_keypairs) + asyncio.create_task( + self.send_transaction(price_instructions, price_keypairs) + ) ) await asyncio.gather(*transactions) From d8aa9974a5608cf2e5d5beaa52669fea5dd2f6c2 Mon Sep 17 00:00:00 2001 From: mcamou Date: Wed, 14 Aug 2024 18:43:32 +0200 Subject: [PATCH 4/6] lintg --- program_admin/__init__.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/program_admin/__init__.py b/program_admin/__init__.py index 9611bc6..abdb8c6 100644 --- a/program_admin/__init__.py +++ b/program_admin/__init__.py @@ -283,7 +283,8 @@ async def sync( if send_transactions: transactions.append( asyncio.create_task( - self.send_transaction(product_instructions, product_keypairs) + self.send_transaction( + product_instructions, product_keypairs) ) ) @@ -311,7 +312,9 @@ async def sync( if send_transactions: transactions.append( asyncio.create_task( - self.send_transaction(price_instructions, price_keypairs) + self.send_transaction( + price_instructions, price_keypairs + ) ) ) From b36969445fed3a913e5104a49aea0e0c77215605 Mon Sep 17 00:00:00 2001 From: mcamou Date: Wed, 14 Aug 2024 18:47:54 +0200 Subject: [PATCH 5/6] lint --- program_admin/__init__.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/program_admin/__init__.py b/program_admin/__init__.py index abdb8c6..dd9e005 100644 --- a/program_admin/__init__.py +++ b/program_admin/__init__.py @@ -2,7 +2,7 @@ import json import os from pathlib import Path -from typing import Any, Coroutine, Dict, List, Literal, Optional, Tuple +from typing import Dict, List, Literal, Optional, Tuple from loguru import logger from solana import system_program @@ -284,7 +284,8 @@ async def sync( transactions.append( asyncio.create_task( self.send_transaction( - product_instructions, product_keypairs) + product_instructions, product_keypairs + ) ) ) @@ -312,9 +313,7 @@ async def sync( if send_transactions: transactions.append( asyncio.create_task( - self.send_transaction( - price_instructions, price_keypairs - ) + self.send_transaction(price_instructions, price_keypairs) ) ) From 760ab22c20cd3a98f13f304d051c3ebf8f98d79d Mon Sep 17 00:00:00 2001 From: Ayaz Abbas Date: Thu, 15 Aug 2024 10:49:22 +0100 Subject: [PATCH 6/6] limit concurrency to 50 parallel txs --- program_admin/__init__.py | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/program_admin/__init__.py b/program_admin/__init__.py index dd9e005..4561252 100644 --- a/program_admin/__init__.py +++ b/program_admin/__init__.py @@ -48,6 +48,8 @@ "pythtest": "https://api.pythtest.pyth.network", } +MAX_CONCURRENT_TRANSACTIONS = 50 + class ProgramAdmin: network: Network @@ -261,7 +263,7 @@ async def sync( # Sync product/price accounts - transactions: List[asyncio.Task[None]] = [] + product_transactions: List[asyncio.Task[None]] = [] product_updates: bool = False @@ -281,7 +283,7 @@ async def sync( instructions.extend(product_instructions) if send_transactions: - transactions.append( + product_transactions.append( asyncio.create_task( self.send_transaction( product_instructions, product_keypairs @@ -289,14 +291,19 @@ async def sync( ) ) - await asyncio.gather(*transactions) + if len(product_transactions) == MAX_CONCURRENT_TRANSACTIONS: + await asyncio.gather(*product_transactions) + product_transactions = [] + + if product_transactions: + await asyncio.gather(*product_transactions) if product_updates: await self.refresh_program_accounts() # Sync publishers - transactions = [] + publisher_transactions = [] for jump_symbol, _price_account_map in ref_permissions.items(): ref_product = ref_products[jump_symbol] # type: ignore @@ -311,13 +318,18 @@ async def sync( if price_instructions: instructions.extend(price_instructions) if send_transactions: - transactions.append( + publisher_transactions.append( asyncio.create_task( self.send_transaction(price_instructions, price_keypairs) ) ) - await asyncio.gather(*transactions) + if len(publisher_transactions) == MAX_CONCURRENT_TRANSACTIONS: + await asyncio.gather(*publisher_transactions) + publisher_transactions = [] + + if publisher_transactions: + await asyncio.gather(*publisher_transactions) return instructions