Skip to content

Commit ab0445b

Browse files
authored
Merge pull request #898 from NilFoundation/sync-committee/rpc-refactoring
Sync Committee: RPC Refactoring
2 parents 4952763 + 60c3bac commit ab0445b

21 files changed

+500
-502
lines changed

nil/cmd/proof_provider/main.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,9 @@ func addFlags(cmd *cobra.Command, cfg *cmdConfig) {
7575
"sync committee rpc endpoint",
7676
)
7777
cmd.Flags().StringVar(
78-
&cfg.TaskListenerRpcEndpoint,
78+
&cfg.OwnRpcEndpoint,
7979
"own-endpoint",
80-
cfg.TaskListenerRpcEndpoint,
80+
cfg.OwnRpcEndpoint,
8181
"own rpc server endpoint",
8282
)
8383
cmd.Flags().StringVar(

nil/cmd/sync_committee/main.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,14 @@ func loadConfig() (*cmdConfig, error) {
6767
func addFlags(cmd *cobra.Command, cfg *cmdConfig) {
6868
cobrax.AddConfigFlag(cmd.Flags())
6969
cmd.Flags().StringVar(
70-
&cfg.RpcEndpoint,
70+
&cfg.NilRpcEndpoint,
7171
"endpoint",
72-
cfg.RpcEndpoint,
72+
cfg.NilRpcEndpoint,
7373
"rpc endpoint")
7474
cmd.Flags().StringVar(
75-
&cfg.TaskListenerRpcEndpoint,
75+
&cfg.OwnRpcEndpoint,
7676
"own-endpoint",
77-
cfg.TaskListenerRpcEndpoint,
77+
cfg.OwnRpcEndpoint,
7878
"own rpc server endpoint")
7979
cmd.Flags().DurationVar(
8080
&cfg.AggregatorConfig.RpcPollingInterval,

nil/cmd/sync_committee_cli/internal/commands/executor_params.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ const MinRefreshInterval = 100 * time.Millisecond
2020

2121
func DefaultExecutorParams() *ExecutorParams {
2222
return &ExecutorParams{
23-
DebugRpcEndpoint: core.DefaultTaskRpcEndpoint,
23+
DebugRpcEndpoint: core.DefaultOwnRpcEndpoint,
2424
AutoRefresh: false,
2525
RefreshInterval: 5 * time.Second,
2626
}

nil/services/synccommittee/Makefile.inc

-11
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ sync_committee_generate_mocks: \
6060
$(root_sc)/internal/api/task_state_change_handler_generated_mock.go \
6161
$(root_sc)/internal/api/task_request_handler_generated_mock.go \
6262
$(root_sc)/core/state_reset_launcher_generated_mock.go \
63-
$(root_sc)/internal/scheduler/task_scheduler_generated_mock.go \
6463
$(root_sc)/internal/l1client/eth_client_generated_mock.go \
6564
$(root_sc)/internal/srv/worker_generated_mock.go \
6665
$(root_sc)/internal/test_utils/op_context_generated_mock.go \
@@ -89,16 +88,6 @@ $(root_sc)/core/state_reset_launcher_generated_mock.go: \
8988
$(root_sc)/core/task_state_change_handler.go
9089
go generate $(root_sc)/core/task_state_change_handler.go
9190

92-
$(root_sc)/internal/scheduler/task_scheduler_generated_mock.go: \
93-
$(root_sc)/internal/scheduler/task_scheduler.go \
94-
$(root_sc)/internal/api/task_request_handler.go \
95-
$(root_sc)/internal/srv/worker.go \
96-
$(root_sc)/internal/types/prover_tasks.go \
97-
$(root_sc)/internal/types/task_result.go \
98-
$(root_sc)/public/task_debug_api.go \
99-
$(root_sc)/public/task_view.go
100-
go generate $(root_sc)/internal/scheduler
101-
10291
$(root_sc)/internal/srv/worker_generated_mock.go: \
10392
$(root_sc)/internal/srv/worker.go
10493
go generate $(root_sc)/internal/srv/worker.go

nil/services/synccommittee/core/block_tasks_integration_test.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type BlockTasksIntegrationTestSuite struct {
2828
taskStorage *storage.TaskStorage
2929
blockStorage *storage.BlockStorage
3030

31-
scheduler scheduler.TaskScheduler
31+
requestHandler api.TaskRequestHandler
3232
}
3333

3434
func TestBlockTasksTestSuite(t *testing.T) {
@@ -51,7 +51,7 @@ func (s *BlockTasksIntegrationTestSuite) SetupSuite() {
5151
s.taskStorage = storage.NewTaskStorage(s.db, s.clock, metricsHandler, logger)
5252
s.blockStorage = storage.NewBlockStorage(s.db, storage.DefaultBlockStorageConfig(), s.clock, metricsHandler, logger)
5353

54-
s.scheduler = scheduler.New(
54+
s.requestHandler = scheduler.New(
5555
s.taskStorage,
5656
newTaskStateChangeHandler(s.blockStorage, &StateResetLauncherMock{}, logger),
5757
metricsHandler,
@@ -86,19 +86,19 @@ func (s *BlockTasksIntegrationTestSuite) Test_Provide_Tasks_And_Handle_Success_R
8686
executorId := types.NewRandomExecutorId()
8787

8888
// requesting batch proof task for execution
89-
taskToExecute, err := s.scheduler.GetTask(s.ctx, api.NewTaskRequest(executorId))
89+
taskToExecute, err := s.requestHandler.GetTask(s.ctx, api.NewTaskRequest(executorId))
9090
s.Require().NoError(err)
9191
s.Require().NotNil(taskToExecute)
9292
s.Require().Equal(types.ProofBatch, taskToExecute.TaskType)
9393

9494
// no new tasks available yet
95-
nonAvailableTask, err := s.scheduler.GetTask(s.ctx, api.NewTaskRequest(executorId))
95+
nonAvailableTask, err := s.requestHandler.GetTask(s.ctx, api.NewTaskRequest(executorId))
9696
s.Require().NoError(err)
9797
s.Require().Nil(nonAvailableTask)
9898

9999
// successfully completing batch proof task
100100
batchProofResult := newTestSuccessProviderResult(taskToExecute, executorId)
101-
err = s.scheduler.SetTaskResult(s.ctx, batchProofResult)
101+
err = s.requestHandler.SetTaskResult(s.ctx, batchProofResult)
102102
s.Require().NoError(err)
103103

104104
// once top-level task is completed, proposal data for the main block should become available
@@ -126,7 +126,7 @@ func (s *BlockTasksIntegrationTestSuite) Test_Provide_Tasks_And_Handle_Failure_R
126126
executorId := types.NewRandomExecutorId()
127127

128128
// requesting batch proof task
129-
taskToExecute, err := s.scheduler.GetTask(s.ctx, api.NewTaskRequest(executorId))
129+
taskToExecute, err := s.requestHandler.GetTask(s.ctx, api.NewTaskRequest(executorId))
130130
s.Require().NoError(err)
131131
s.Require().NotNil(taskToExecute)
132132
s.Require().Equal(types.ProofBatch, taskToExecute.TaskType)
@@ -138,7 +138,7 @@ func (s *BlockTasksIntegrationTestSuite) Test_Provide_Tasks_And_Handle_Failure_R
138138
types.NewTaskExecError(types.TaskErrProofGenerationFailed, "batch proof generation failed"),
139139
)
140140

141-
err = s.scheduler.SetTaskResult(s.ctx, batchProofFailed)
141+
err = s.requestHandler.SetTaskResult(s.ctx, batchProofFailed)
142142
s.Require().NoError(err)
143143

144144
// proposal data should not become available

nil/services/synccommittee/core/config.go

+10-9
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,13 @@ import (
88
)
99

1010
const (
11-
DefaultTaskRpcEndpoint = "tcp://127.0.0.1:8530"
11+
DefaultNilRpcEndpoint = "tcp://127.0.0.1:8529"
12+
DefaultOwnRpcEndpoint = "tcp://127.0.0.1:8530"
1213
)
1314

1415
type Config struct {
15-
RpcEndpoint string `yaml:"endpoint,omitempty"`
16-
TaskListenerRpcEndpoint string `yaml:"ownEndpoint,omitempty"`
16+
NilRpcEndpoint string `yaml:"endpoint,omitempty"`
17+
OwnRpcEndpoint string `yaml:"ownEndpoint,omitempty"`
1718
AggregatorConfig fetching.AggregatorConfig `yaml:",inline"`
1819
ProposerParams ProposerConfig `yaml:"-"`
1920
ContractWrapperConfig rollupcontract.WrapperConfig `yaml:",inline"`
@@ -24,12 +25,12 @@ type Config struct {
2425

2526
func NewDefaultConfig() *Config {
2627
return &Config{
27-
RpcEndpoint: "tcp://127.0.0.1:8529",
28-
TaskListenerRpcEndpoint: DefaultTaskRpcEndpoint,
29-
AggregatorConfig: fetching.NewDefaultAggregatorConfig(),
30-
ProposerParams: NewDefaultProposerConfig(),
31-
ContractWrapperConfig: rollupcontract.NewDefaultWrapperConfig(),
32-
L1FeeUpdateConfig: feeupdater.DefaultConfig(),
28+
NilRpcEndpoint: DefaultNilRpcEndpoint,
29+
OwnRpcEndpoint: DefaultOwnRpcEndpoint,
30+
AggregatorConfig: fetching.NewDefaultAggregatorConfig(),
31+
ProposerParams: NewDefaultProposerConfig(),
32+
ContractWrapperConfig: rollupcontract.NewDefaultWrapperConfig(),
33+
L1FeeUpdateConfig: feeupdater.DefaultConfig(),
3334
Telemetry: &telemetry.Config{
3435
ServiceName: "sync_committee",
3536
},

nil/services/synccommittee/core/service.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ func New(ctx context.Context, cfg *Config, database db.DB) (*SyncCommittee, erro
3838
return nil, fmt.Errorf("error initializing metrics: %w", err)
3939
}
4040

41-
logger.Info().Msgf("Use RPC endpoint %v", cfg.RpcEndpoint)
41+
logger.Info().Msgf("Use RPC endpoint %v", cfg.NilRpcEndpoint)
4242
fetcher := fetching.NewFetcher(
43-
rpc.NewRetryClient(cfg.RpcEndpoint, logger),
43+
rpc.NewRetryClient(cfg.NilRpcEndpoint, logger),
4444
logger,
4545
)
4646

@@ -113,10 +113,11 @@ func New(ctx context.Context, cfg *Config, database db.DB) (*SyncCommittee, erro
113113
logger,
114114
)
115115

116-
taskListener := rpc.NewTaskListener(
117-
&rpc.TaskListenerConfig{HttpEndpoint: cfg.TaskListenerRpcEndpoint},
118-
taskScheduler,
116+
rpcServer := rpc.NewServerWithTasks(
117+
rpc.NewServerConfig(cfg.OwnRpcEndpoint),
119118
logger,
119+
taskScheduler,
120+
scheduler.NewTaskDebugger(taskStorage, logger),
120121
)
121122

122123
feeUpdaterMetrics, err := metrics.NewFeeUpdaterMetrics()
@@ -150,8 +151,7 @@ func New(ctx context.Context, cfg *Config, database db.DB) (*SyncCommittee, erro
150151
syncCommittee.Service = srv.NewServiceWithHeartbeat(
151152
metricsHandler,
152153
logger,
153-
syncRunner, proposer, agg, lagTracker, taskScheduler, taskListener,
154-
feeUpdater,
154+
syncRunner, proposer, agg, lagTracker, taskScheduler, feeUpdater, rpcServer,
155155
)
156156

157157
return syncCommittee, nil

nil/services/synccommittee/core/service_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func (s *SyncCommitteeTestSuite) newService() *SyncCommittee {
7676
s.T().Helper()
7777

7878
cfg := NewDefaultConfig()
79-
cfg.RpcEndpoint = s.url
79+
cfg.NilRpcEndpoint = s.url
8080
cfg.ContractWrapperConfig.DisableL1 = true
8181

8282
cfg.L1FeeUpdateContractConfig.ContractAddress = "0xaddr"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package rpc
2+
3+
import (
4+
"context"
5+
6+
"github.com/NilFoundation/nil/nil/common/logging"
7+
"github.com/NilFoundation/nil/nil/services/rpc"
8+
"github.com/NilFoundation/nil/nil/services/rpc/httpcfg"
9+
"github.com/NilFoundation/nil/nil/services/rpc/transport"
10+
"github.com/rs/zerolog"
11+
)
12+
13+
type ServerConfig struct {
14+
Endpoint string
15+
}
16+
17+
func NewServerConfig(endpoint string) ServerConfig {
18+
return ServerConfig{
19+
Endpoint: endpoint,
20+
}
21+
}
22+
23+
type Handler struct {
24+
Namespace string
25+
Service any
26+
}
27+
28+
func NewHandler(namespace string, service any) Handler {
29+
return Handler{
30+
Namespace: namespace,
31+
Service: service,
32+
}
33+
}
34+
35+
type server struct {
36+
logger logging.Logger
37+
config ServerConfig
38+
handlers []Handler
39+
}
40+
41+
func NewServer(
42+
config ServerConfig,
43+
logger logging.Logger,
44+
handlers ...Handler,
45+
) *server {
46+
return &server{
47+
logger: logger,
48+
config: config,
49+
handlers: handlers,
50+
}
51+
}
52+
53+
func (s *server) Name() string {
54+
return "rpc_server"
55+
}
56+
57+
func (s *server) Run(context context.Context, started chan<- struct{}) error {
58+
httpConfig := &httpcfg.HttpCfg{
59+
HttpURL: s.config.Endpoint,
60+
HttpCompression: true,
61+
TraceRequests: s.logger.GetLevel() < zerolog.InfoLevel, // debug or trace
62+
HTTPTimeouts: httpcfg.DefaultHTTPTimeouts,
63+
}
64+
65+
apiList := make([]transport.API, 0, len(s.handlers))
66+
for _, handler := range s.handlers {
67+
apiList = append(apiList, transport.API{
68+
Namespace: handler.Namespace,
69+
Public: true,
70+
Service: handler.Service,
71+
Version: "1.0",
72+
})
73+
}
74+
75+
return rpc.StartRpcServer(context, httpConfig, apiList, s.logger, started)
76+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package rpc
2+
3+
import (
4+
"github.com/NilFoundation/nil/nil/common/logging"
5+
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/api"
6+
"github.com/NilFoundation/nil/nil/services/synccommittee/public"
7+
)
8+
9+
func NewServerWithTasks(
10+
config ServerConfig,
11+
logger logging.Logger,
12+
requestHandler api.TaskRequestHandler,
13+
debugApi public.TaskDebugApi,
14+
additionalHandlers ...Handler,
15+
) *server {
16+
handlers := make([]Handler, 0, len(additionalHandlers)+2)
17+
handlers = append(handlers, NewHandler(api.TaskRequestHandlerNamespace, requestHandler))
18+
handlers = append(handlers, NewHandler(public.DebugNamespace, debugApi))
19+
handlers = append(handlers, additionalHandlers...)
20+
return NewServer(config, logger, handlers...)
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package rpc
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/NilFoundation/nil/nil/common/logging"
8+
"github.com/NilFoundation/nil/nil/internal/db"
9+
"github.com/NilFoundation/nil/nil/services/rpc"
10+
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/api"
11+
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/metrics"
12+
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/scheduler"
13+
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/storage"
14+
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/testaide"
15+
"github.com/jonboulle/clockwork"
16+
"github.com/stretchr/testify/suite"
17+
)
18+
19+
type RpcServerTestSuite struct {
20+
suite.Suite
21+
22+
context context.Context
23+
cancellation context.CancelFunc
24+
clock *clockwork.FakeClock
25+
26+
logger logging.Logger
27+
metricsHandler *metrics.SyncCommitteeMetricsHandler
28+
29+
database db.DB
30+
storage *storage.TaskStorage
31+
32+
serverEndpoint string
33+
}
34+
35+
func (s *RpcServerTestSuite) SetupSuite() {
36+
s.context, s.cancellation = context.WithCancel(context.Background())
37+
s.clock = testaide.NewTestClock()
38+
s.logger = logging.NewLogger("rpc_server_test")
39+
40+
var err error
41+
s.database, err = db.NewBadgerDbInMemory()
42+
s.Require().NoError(err)
43+
s.metricsHandler, err = metrics.NewSyncCommitteeMetrics()
44+
s.Require().NoError(err)
45+
s.storage = storage.NewTaskStorage(s.database, s.clock, s.metricsHandler, s.logger)
46+
47+
started := make(chan struct{})
48+
s.serverEndpoint = rpc.GetSockPath(s.T())
49+
go func() {
50+
err := s.runRpcServer(started)
51+
s.NoError(err)
52+
}()
53+
err = testaide.WaitFor(s.context, started, 10*time.Second)
54+
s.Require().NoError(err, "rpc server did not start in time")
55+
}
56+
57+
func (s *RpcServerTestSuite) TearDownSuite() {
58+
s.cancellation()
59+
}
60+
61+
func (s *RpcServerTestSuite) TearDownTest() {
62+
err := s.database.DropAll()
63+
s.Require().NoError(err)
64+
}
65+
66+
func (s *RpcServerTestSuite) runRpcServer(started chan<- struct{}) error {
67+
noopStateHandler := &api.TaskStateChangeHandlerMock{}
68+
taskScheduler := scheduler.New(s.storage, noopStateHandler, s.metricsHandler, s.logger)
69+
taskDebugger := scheduler.NewTaskDebugger(s.storage, s.logger)
70+
71+
rpcServer := NewServerWithTasks(
72+
NewServerConfig(s.serverEndpoint),
73+
s.logger,
74+
taskScheduler,
75+
taskDebugger,
76+
)
77+
78+
return rpcServer.Run(s.context, started)
79+
}

0 commit comments

Comments
 (0)