Skip to content

Commit 2855da2

Browse files
committed
nodeoptimisation
Signed-off-by: mineme0110 <[email protected]> nodeoptimisation Signed-off-by: mineme0110 <[email protected]> nodeoptimisation Signed-off-by: mineme0110 <[email protected]> nodeoptimisation Signed-off-by: mineme0110 <[email protected]> nodeoptimisation Signed-off-by: mineme0110 <[email protected]> nodeoptimisation Signed-off-by: mineme0110 <[email protected]>
1 parent 5eb399e commit 2855da2

19 files changed

+983
-120
lines changed

src/main/scala/io/iohk/atala/prism/node/NodeApp.scala

+26-8
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,31 @@
11
package io.iohk.atala.prism.node
22

3+
import cats.effect.ExitCode
4+
import cats.effect.IO
5+
import cats.effect.IOApp
6+
import cats.effect.Resource
37
import cats.effect.unsafe.IORuntime
4-
import cats.effect.{ExitCode, IO, IOApp, Resource}
58
import cats.implicits.toFunctorOps
6-
import com.typesafe.config.{Config, ConfigFactory}
9+
import com.typesafe.config.Config
10+
import com.typesafe.config.ConfigFactory
711
import doobie.hikari.HikariTransactor
8-
import io.grpc.{Server, ServerBuilder}
12+
import io.grpc.Server
13+
import io.grpc.ServerBuilder
14+
import io.iohk.atala.prism.node.cardano.CardanoClient
915
import io.iohk.atala.prism.node.logging.TraceId
1016
import io.iohk.atala.prism.node.logging.TraceId.IOWithTraceIdContext
17+
import io.iohk.atala.prism.node.metrics.NodeReporter
1118
import io.iohk.atala.prism.node.metrics.UptimeReporter
1219
import io.iohk.atala.prism.node.models.DidSuffix
13-
import io.iohk.atala.prism.node.cardano.CardanoClient
14-
import io.iohk.atala.prism.node.metrics.NodeReporter
1520
import io.iohk.atala.prism.node.operations.ApplyOperationConfig
21+
import io.iohk.atala.prism.node.repositories.SchemaMigrations
22+
import io.iohk.atala.prism.node.repositories.TransactorFactory
1623
import io.iohk.atala.prism.node.repositories._
1724
import io.iohk.atala.prism.node.services.CardanoLedgerService.CardanoBlockHandler
1825
import io.iohk.atala.prism.node.services._
19-
import io.iohk.atala.prism.node.services.models.AtalaObjectNotification
20-
import io.iohk.atala.prism.protos.node_api._
21-
import io.iohk.atala.prism.node.repositories.{SchemaMigrations, TransactorFactory}
26+
import io.iohk.atala.prism.node.services.models.{AtalaObjectBulkNotificationHandler, AtalaObjectNotification}
2227
import io.iohk.atala.prism.node.utils.IOUtils._
28+
import io.iohk.atala.prism.protos.node_api._
2329
import kamon.Kamon
2430
import kamon.module.Module
2531
import org.slf4j.LoggerFactory
@@ -84,12 +90,14 @@ class NodeApp(executionContext: ExecutionContext) { self =>
8490
)
8591
onCardanoBlock = onCardanoBlockOp(protocolVersionRepository)
8692
onAtalaObject = onAtalaObjectOp(objectManagementService)
93+
onAtalaObjectBulk = onAtalaObjectBulkOp(objectManagementService)
8794
keyValueService <- KeyValueService.resource(keyValuesRepository, logs)
8895
ledger <- createLedger(
8996
globalConfig,
9097
keyValueService,
9198
onCardanoBlock,
9299
onAtalaObject,
100+
onAtalaObjectBulk,
93101
logs
94102
)
95103
didDataRepository <- DIDDataRepository.resource(liftedTransactor, logs)
@@ -145,6 +153,7 @@ class NodeApp(executionContext: ExecutionContext) { self =>
145153
globalConfig: Config,
146154
onCardanoBlock: CardanoBlockHandler[IOWithTraceIdContext],
147155
onAtalaObject: AtalaObjectNotification => IOWithTraceIdContext[Unit],
156+
onAtalaObjectBulk: AtalaObjectBulkNotificationHandler[IOWithTraceIdContext],
148157
logs: Logs[IO, IOWithTraceIdContext]
149158
): Resource[IO, UnderlyingLedger[IOWithTraceIdContext]] = {
150159
val config = NodeConfig.cardanoConfig(globalConfig.getConfig("cardano"))
@@ -159,6 +168,7 @@ class NodeApp(executionContext: ExecutionContext) { self =>
159168
keyValueService,
160169
onCardanoBlock,
161170
onAtalaObject,
171+
onAtalaObjectBulk,
162172
logs
163173
)
164174
}
@@ -193,11 +203,18 @@ class NodeApp(executionContext: ExecutionContext) { self =>
193203
.void
194204
}
195205

206+
private def onAtalaObjectBulkOp(
207+
objectManagementService: ObjectManagementService[IOWithTraceIdContext]
208+
): AtalaObjectBulkNotificationHandler[IOWithTraceIdContext] = notifications => {
209+
objectManagementService.saveObjects(notifications).void
210+
}
211+
196212
private def createLedger(
197213
config: Config,
198214
keyValueService: KeyValueService[IOWithTraceIdContext],
199215
onCardanoBlock: CardanoBlockHandler[IOWithTraceIdContext],
200216
onAtalaObject: AtalaObjectNotification => IOWithTraceIdContext[Unit],
217+
onAtalaObjectBulk: AtalaObjectBulkNotificationHandler[IOWithTraceIdContext],
201218
logs: Logs[IO, IOWithTraceIdContext]
202219
): Resource[IO, UnderlyingLedger[IOWithTraceIdContext]] =
203220
config.getString("ledger") match {
@@ -207,6 +224,7 @@ class NodeApp(executionContext: ExecutionContext) { self =>
207224
config,
208225
onCardanoBlock,
209226
onAtalaObject,
227+
onAtalaObjectBulk,
210228
logs
211229
)
212230
case "in-memory" =>

src/main/scala/io/iohk/atala/prism/node/cardano/CardanoClient.scala

+18-14
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,34 @@
11
package io.iohk.atala.prism.node.cardano
22

3-
import cats.{Comonad, Functor}
3+
import cats.Comonad
4+
import cats.Functor
5+
import cats.MonadThrow
46
import cats.effect.Resource
7+
import cats.effect.kernel.Async
58
import cats.syntax.comonad._
6-
import io.iohk.atala.prism.node.models.{TransactionDetails, TransactionId, WalletDetails}
7-
import io.iohk.atala.prism.node.cardano.dbsync.CardanoDbSyncClient
8-
import io.iohk.atala.prism.node.cardano.models._
9-
import io.iohk.atala.prism.node.cardano.wallet.CardanoWalletApiClient
10-
import tofu.logging.{Logs, ServiceLogging}
11-
import tofu.syntax.monadic._
129
import cats.syntax.either._
1310
import derevo.derive
1411
import derevo.tagless.applyK
12+
import io.iohk.atala.prism.node.cardano.dbsync.CardanoDbSyncClient
1513
import io.iohk.atala.prism.node.cardano.logs.CardanoClientLogs
14+
import io.iohk.atala.prism.node.cardano.models._
15+
import io.iohk.atala.prism.node.cardano.wallet.CardanoWalletApiClient
1616
import io.iohk.atala.prism.node.metrics.TimeMeasureMetric
17+
import io.iohk.atala.prism.node.models.TransactionDetails
18+
import io.iohk.atala.prism.node.models.TransactionId
19+
import io.iohk.atala.prism.node.models.WalletDetails
1720
import tofu.higherKind.Mid
18-
import cats.MonadThrow
19-
import cats.effect.kernel.Async
21+
import tofu.logging.Logs
22+
import tofu.logging.ServiceLogging
23+
import tofu.syntax.monadic._
2024

2125
@derive(applyK)
2226
trait CardanoClient[F[_]] {
2327
def getFullBlock(blockNo: Int): F[Either[BlockError.NotFound, Block.Full]]
2428

2529
def getLatestBlock: F[Either[BlockError.NoneAvailable.type, Block.Canonical]]
26-
30+
def getAllPrismIndexBlocksWithTransactions(
31+
): F[Either[BlockError.NotFound, List[Block.Full]]]
2732
def postTransaction(
2833
walletId: WalletId,
2934
payments: List[Payment],
@@ -99,12 +104,11 @@ object CardanoClient {
99104
cardanoDbSyncClient: CardanoDbSyncClient[F],
100105
cardanoWalletApiClient: CardanoWalletApiClient[F]
101106
) extends CardanoClient[F] {
107+
def getAllPrismIndexBlocksWithTransactions(): F[Either[BlockError.NotFound, List[Block.Full]]] =
108+
cardanoDbSyncClient.getAllPrismIndexBlocksWithTransactions()
102109

103-
def getFullBlock(
104-
blockNo: Int
105-
): F[Either[BlockError.NotFound, Block.Full]] = {
110+
def getFullBlock(blockNo: Int): F[Either[BlockError.NotFound, Block.Full]] =
106111
cardanoDbSyncClient.getFullBlock(blockNo)
107-
}
108112

109113
def getLatestBlock: F[Either[BlockError.NoneAvailable.type, Block.Canonical]] =
110114
cardanoDbSyncClient.getLatestBlock

src/main/scala/io/iohk/atala/prism/node/cardano/dbsync/CardanoDbSyncClient.scala

+9-3
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
11
package io.iohk.atala.prism.node.cardano.dbsync
22

33
import cats.Comonad
4-
import cats.effect.{Async, Resource}
4+
import cats.effect.Async
5+
import cats.effect.Resource
6+
import io.iohk.atala.prism.node.cardano.dbsync.repositories.CardanoBlockRepository
7+
import io.iohk.atala.prism.node.cardano.models.Block
8+
import io.iohk.atala.prism.node.cardano.models.BlockError
59
import io.iohk.atala.prism.node.metrics.TimeMeasureMetric
610
import io.iohk.atala.prism.node.repositories.TransactorFactory
7-
import io.iohk.atala.prism.node.cardano.dbsync.repositories.CardanoBlockRepository
8-
import io.iohk.atala.prism.node.cardano.models.{Block, BlockError}
911
import tofu.logging.Logs
1012

1113
trait CardanoDbSyncClient[F[_]] {
1214
def getFullBlock(blockNo: Int): F[Either[BlockError.NotFound, Block.Full]]
1315
def getLatestBlock: F[Either[BlockError.NoneAvailable.type, Block.Canonical]]
16+
def getAllPrismIndexBlocksWithTransactions(): F[Either[BlockError.NotFound, List[Block.Full]]]
1417
}
1518

1619
final class CardanoDbSyncClientImpl[F[_]](
@@ -21,6 +24,9 @@ final class CardanoDbSyncClientImpl[F[_]](
2124

2225
def getLatestBlock: F[Either[BlockError.NoneAvailable.type, Block.Canonical]] =
2326
cardanoBlockRepository.getLatestBlock
27+
28+
def getAllPrismIndexBlocksWithTransactions(): F[Either[BlockError.NotFound, List[Block.Full]]] =
29+
cardanoBlockRepository.getAllPrismIndexBlocksWithTransactions()
2430
}
2531

2632
object CardanoDbSyncClient {

src/main/scala/io/iohk/atala/prism/node/cardano/dbsync/repositories/CardanoBlockRepository.scala

+30-7
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,34 @@
11
package io.iohk.atala.prism.node.cardano.dbsync.repositories
22

3-
import cats.{Comonad, Functor}
3+
import cats.Comonad
4+
import cats.Functor
5+
import cats.effect.MonadCancelThrow
46
import cats.syntax.comonad._
57
import cats.syntax.either._
68
import cats.syntax.functor._
79
import derevo.derive
810
import derevo.tagless.applyK
911
import doobie.implicits._
1012
import doobie.util.transactor.Transactor
11-
import io.iohk.atala.prism.node.metrics.TimeMeasureMetric
12-
import io.iohk.atala.prism.node.utils.syntax.DBConnectionOps
13-
import io.iohk.atala.prism.node.cardano.dbsync.repositories.daos.{BlockDAO, TransactionDAO}
13+
import io.iohk.atala.prism.node.cardano.dbsync.repositories.daos.BlockDAO
14+
import io.iohk.atala.prism.node.cardano.dbsync.repositories.daos.TransactionDAO
1415
import io.iohk.atala.prism.node.cardano.dbsync.repositories.logs.CardanoBlockRepositoryLogs
1516
import io.iohk.atala.prism.node.cardano.dbsync.repositories.metrics.CardanoBlockRepositoryMetrics
16-
import io.iohk.atala.prism.node.cardano.models.{Block, BlockError}
17+
import io.iohk.atala.prism.node.cardano.models.Block
18+
import io.iohk.atala.prism.node.cardano.models.BlockError
19+
import io.iohk.atala.prism.node.metrics.TimeMeasureMetric
20+
import io.iohk.atala.prism.node.utils.syntax.DBConnectionOps
1721
import tofu.higherKind.Mid
18-
import tofu.logging.{Logs, ServiceLogging}
22+
import tofu.logging.Logs
23+
import tofu.logging.ServiceLogging
1924
import tofu.syntax.monoid.TofuSemigroupOps
20-
import cats.effect.MonadCancelThrow
2125

2226
@derive(applyK)
2327
trait CardanoBlockRepository[F[_]] {
2428
def getFullBlock(blockNo: Int): F[Either[BlockError.NotFound, Block.Full]]
2529
def getLatestBlock: F[Either[BlockError.NoneAvailable.type, Block.Canonical]]
30+
31+
def getAllPrismIndexBlocksWithTransactions(): F[Either[BlockError.NotFound, List[Block.Full]]]
2632
}
2733

2834
object CardanoBlockRepository {
@@ -76,4 +82,21 @@ private final class CardanoBlockRepositoryImpl[F[_]: MonadCancelThrow](
7682
)(header => Block.Canonical(header).asRight)
7783
)
7884
}
85+
86+
def getAllPrismIndexBlocksWithTransactions(): F[Either[BlockError.NotFound, List[Block.Full]]] = {
87+
BlockDAO
88+
.findAllPrismIndex()
89+
.map { results =>
90+
Option.when(results.nonEmpty) {
91+
results.map { case (header, transactions) =>
92+
Block.Full(header, transactions)
93+
}
94+
}
95+
}
96+
.logSQLErrorsV2("getting all prism index blocks with transactions")
97+
.transact(xa)
98+
.map { result =>
99+
result.toRight(BlockError.NotFound(0))
100+
}
101+
}
79102
}

src/main/scala/io/iohk/atala/prism/node/cardano/dbsync/repositories/daos/BlockDAO.scala

+55-1
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ package io.iohk.atala.prism.node.cardano.dbsync.repositories.daos
22

33
import doobie.free.connection.ConnectionIO
44
import doobie.implicits._
5+
import doobie.util.log.LogHandler
6+
import io.iohk.atala.prism.node.cardano.models.AtalaObjectMetadata
57
import io.iohk.atala.prism.node.cardano.models.BlockHeader
6-
8+
import io.iohk.atala.prism.node.cardano.models.Transaction
79
private[repositories] object BlockDAO {
810
def find(blockNo: Int): ConnectionIO[Option[BlockHeader]] = {
911
sql"""
@@ -24,4 +26,56 @@ private[repositories] object BlockDAO {
2426
|LIMIT 1
2527
""".stripMargin.query[BlockHeader].option
2628
}
29+
30+
def findAllPrismIndex(): ConnectionIO[List[(BlockHeader, List[Transaction])]] = {
31+
32+
val query = sql"""
33+
WITH block_data AS (
34+
SELECT
35+
b.hash as block_hash,
36+
b.block_no,
37+
b.time as block_time,
38+
tx.hash as tx_hash,
39+
tx.block_index,
40+
tx_metadata.key as metadata_key,
41+
tx_metadata.json as metadata_json
42+
FROM block b
43+
LEFT JOIN tx ON tx.block_id = b.id
44+
INNER JOIN tx_metadata ON tx_metadata.tx_id = tx.id
45+
AND tx_metadata.key = ${AtalaObjectMetadata.METADATA_PRISM_INDEX}
46+
ORDER BY b.block_no, tx.block_index
47+
)
48+
SELECT * FROM block_data;
49+
""".stripMargin
50+
query
51+
.queryWithLogHandler[BlockTransactionData](LogHandler.jdkLogHandler)
52+
.to[List]
53+
.map { results =>
54+
results
55+
.groupBy(row =>
56+
BlockHeader(
57+
hash = row.blockHash,
58+
blockNo = row.blockNo,
59+
time = row.time,
60+
previousBlockHash = None // No previous block hash needed
61+
)
62+
)
63+
.map { case (header, rows) =>
64+
val transactions = rows
65+
.map(row =>
66+
Transaction(
67+
id = row.id,
68+
blockHash = row.blockHash,
69+
blockIndex = row.blockIndex,
70+
metadata = row.metadata
71+
)
72+
)
73+
.toList
74+
75+
(header, transactions)
76+
}
77+
.toList
78+
.sortBy(_._1.blockNo)
79+
}
80+
}
2781
}

0 commit comments

Comments
 (0)