Skip to content

Commit 1f42c35

Browse files
committed
Introduce buls sync for prism node to sync the prism operations more efficiently and quicker
Signed-off-by: mineme0110 <[email protected]>
1 parent 2855da2 commit 1f42c35

File tree

3 files changed

+4
-104
lines changed

3 files changed

+4
-104
lines changed

src/main/scala/io/iohk/atala/prism/node/services/BlockProcessingService.scala

-2
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,11 @@ class BlockProcessingServiceImpl(applyOperationConfig: ApplyOperationConfig) ext
5757
blocks: List[BlockProcessingInfo]
5858
): ConnectionIO[Boolean] = {
5959
val methodName = "processBlockBatch"
60-
println(s"processBlockBatch: Processing batch of ${blocks.size} blocks")
6160
val (allInvalid, allValid) = blocks
6261
.flatMap { blockInfo =>
6362
val operations = blockInfo.block.operations.toList
6463
val operationsWithSeqNumbers = operations.zipWithIndex
6564
operationsWithSeqNumbers.map { case (signedOperation, index) =>
66-
println(s"processBlockBatch: Processing operation ${index} of ${blockInfo.transactionId}")
6765
parseOperation(
6866
signedOperation,
6967
LedgerData(

src/main/scala/io/iohk/atala/prism/node/services/CardanoLedgerService.scala

+1-8
Original file line numberDiff line numberDiff line change
@@ -257,14 +257,7 @@ class CardanoLedgerService[F[_]] private[services] (
257257
}
258258

259259
val batchSize = 5000 // TODO: make it configurable
260-
261-
// notifications.grouped(batchSize).toList.traverse_(onAtalaObjectBulk)
262-
notifications.grouped(batchSize).toList.traverse_ { batch =>
263-
println(
264-
s"Processing ${batch.size} notifications in batches of size: ${batch.size}"
265-
)
266-
onAtalaObjectBulk(batch)
267-
}
260+
notifications.grouped(batchSize).toList.traverse_(onAtalaObjectBulk)
268261
}
269262

270263
// Sync blocks in the given range.

src/main/scala/io/iohk/atala/prism/node/services/ObjectManagementService.scala

+3-94
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,14 @@ import io.iohk.atala.prism.node.services.models.AtalaObjectNotification
4242
import io.iohk.atala.prism.node.utils.syntax.DBConnectionOps
4343
import io.iohk.atala.prism.protos.node_models
4444
import io.iohk.atala.prism.protos.node_models.SignedAtalaOperation
45+
import org.slf4j.LoggerFactory
4546
import tofu.higherKind.Mid
4647
import tofu.logging.Logs
4748
import tofu.logging.ServiceLogging
4849
import tofu.logging.derivation.loggable
4950
import tofu.syntax.feither._
5051
import tofu.syntax.monadic._
51-
import org.slf4j.LoggerFactory
52+
5253
import java.time.Instant
5354

5455
@derive(applyK)
@@ -98,98 +99,6 @@ private final class ObjectManagementServiceImpl[F[_]: MonadCancelThrow](
9899
) extends ObjectManagementService[F] {
99100
private val logger = LoggerFactory.getLogger(getClass)
100101

101-
def saveObjects2(
102-
notifications: List[AtalaObjectNotification]
103-
): F[Either[SaveObjectError, Boolean]] = {
104-
if (notifications.isEmpty) {
105-
true.asRight[SaveObjectError].pure[F]
106-
} else {
107-
108-
def applyTransactions(atalaObjectInfos: List[AtalaObjectInfo]): F[Either[SaveObjectError, Boolean]] = {
109-
for {
110-
// Process all objects and apply them to the state
111-
transactions <- atalaObjectInfos.traverse { case (obj) =>
112-
Monad[F].pure(processObject(obj))
113-
}
114-
result <- transactions.sequence.flatTraverse { txs =>
115-
txs
116-
.traverse(_.logSQLErrorsV2("Transaction saving objects").attemptSql)
117-
.transact(xa)
118-
.map(_.sequence)
119-
.map(_.map(_ => true))
120-
.leftMapIn(err => SaveObjectError(err.getMessage))
121-
}
122-
123-
} yield result
124-
}
125-
126-
def processObjectList(
127-
notifications: List[AtalaObjectNotification]
128-
): F[Either[SaveObjectError, Boolean]] = {
129-
val pairedInserts: List[
130-
(AtalaObjectId, AtalaObjectsDAO.AtalaObjectCreateData, AtalaObjectsDAO.AtalaObjectSetTransactionInfo)
131-
] =
132-
notifications.map { notification =>
133-
val objectBytes = notification.atalaObject.toByteArray
134-
val objId = AtalaObjectId.of(objectBytes)
135-
(
136-
objId,
137-
AtalaObjectsDAO.AtalaObjectCreateData(
138-
objId,
139-
objectBytes,
140-
AtalaObjectStatus.Processed
141-
),
142-
AtalaObjectsDAO.AtalaObjectSetTransactionInfo(
143-
objId,
144-
notification.transaction
145-
)
146-
)
147-
}
148-
149-
val (objectIds, objectInserts, transactionInserts) = pairedInserts.unzip3
150-
151-
// Bulk database operations
152-
val bulkQuery = for {
153-
count1 <- AtalaObjectsDAO.insertMany(objectInserts)
154-
count2 <- AtalaObjectsDAO.setManyTransactionInfo(transactionInserts)
155-
count3 <- AtalaObjectTransactionSubmissionsDAO
156-
.updateStatusBatch(
157-
transactionInserts.map(d =>
158-
(
159-
d.transactionInfo.ledger,
160-
d.transactionInfo.transactionId,
161-
AtalaObjectTransactionSubmissionStatus.InLedger
162-
)
163-
)
164-
)
165-
atalaObjectsInfo <- AtalaObjectsDAO.getAtalaObjectsInfo(objectIds)
166-
} yield (count1, count2, count3, atalaObjectsInfo)
167-
168-
bulkQuery
169-
.logSQLErrorsV2("bulk processing atala objects")
170-
.attemptSql
171-
.transact(xa)
172-
.flatMap {
173-
case Left(err) => SaveObjectError(err.getMessage).asLeft[Boolean].pure[F]
174-
case Right((count1, count2, count3, atalaObjectsInfo)) =>
175-
if (
176-
count1 != objectInserts.size || count2 != transactionInserts.size || count3 != transactionInserts.size
177-
) {
178-
logger.info(
179-
s"Count mismatches: Create(exp=${objectInserts.size},got=$count1), " +
180-
s"TxInfo(exp=${transactionInserts.size},got=$count2), " +
181-
s"Status(exp=${transactionInserts.size},got=$count3)"
182-
)
183-
}
184-
// Apply transactions to the state finally this sequencial operation similar to applyTransaction
185-
applyTransactions(atalaObjectsInfo)
186-
}
187-
}
188-
189-
processObjectList(notifications)
190-
}
191-
}
192-
193102
def saveObjects(
194103
notifications: List[AtalaObjectNotification]
195104
): F[Either[SaveObjectError, Boolean]] = {
@@ -262,7 +171,7 @@ private final class ObjectManagementServiceImpl[F[_]: MonadCancelThrow](
262171
if (
263172
count1 != objectInserts.size || count2 != transactionInserts.size || count3 != transactionInserts.size
264173
) {
265-
println(
174+
logger.info(
266175
s"Count mismatches: Create(exp=${objectInserts.size},got=$count1), " +
267176
s"TxInfo(exp=${transactionInserts.size},got=$count2), " +
268177
s"Status(exp=${transactionInserts.size},got=$count3)"

0 commit comments

Comments
 (0)