Skip to content

Commit b6cb9b0

Browse files
committed
Force close all open connections at shutdown
1 parent 9712e0e commit b6cb9b0

File tree

2 files changed

+28
-8
lines changed

2 files changed

+28
-8
lines changed

server/internal/interface/grpc/handlers/arkservice.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@ type handler struct {
3333
transactionsListenerHandler *listenerHanlder[*arkv1.GetTransactionsStreamResponse]
3434
addressSubsHandler *listenerHanlder[*arkv1.SubscribeForAddressResponse]
3535

36-
stopCh <-chan struct{}
36+
stopCh <-chan struct{}
37+
stopRoundEventsCh chan struct{}
38+
stopTransactionEventsCh chan struct{}
39+
stopAddressEventsCh chan struct{}
3740
}
3841

3942
func NewHandler(version string, service application.Service, stopCh <-chan struct{}) service {
@@ -44,8 +47,12 @@ func NewHandler(version string, service application.Service, stopCh <-chan struc
4447
transactionsListenerHandler: newListenerHandler[*arkv1.GetTransactionsStreamResponse](),
4548
addressSubsHandler: newListenerHandler[*arkv1.SubscribeForAddressResponse](),
4649
stopCh: stopCh,
50+
stopRoundEventsCh: make(chan struct{}, 1),
51+
stopTransactionEventsCh: make(chan struct{}, 1),
52+
stopAddressEventsCh: make(chan struct{}, 1),
4753
}
4854

55+
go h.listenToStop()
4956
go h.listenToEvents()
5057
go h.listenToTxEvents()
5158

@@ -304,7 +311,7 @@ func (h *handler) GetEventStream(
304311

305312
for {
306313
select {
307-
case <-h.stopCh:
314+
case <-h.stopRoundEventsCh:
308315
return nil
309316
case <-stream.Context().Done():
310317
return nil
@@ -456,6 +463,8 @@ func (h *handler) GetTransactionsStream(
456463

457464
for {
458465
select {
466+
case <-h.stopTransactionEventsCh:
467+
return nil
459468
case <-stream.Context().Done():
460469
return nil
461470
case ev := <-listener.ch:
@@ -524,6 +533,8 @@ func (h *handler) SubscribeForAddress(
524533

525534
for {
526535
select {
536+
case <-h.stopAddressEventsCh:
537+
return nil
527538
case <-stream.Context().Done():
528539
return nil
529540
case ev := <-listener.ch:
@@ -534,6 +545,14 @@ func (h *handler) SubscribeForAddress(
534545
}
535546
}
536547

548+
func (h *handler) listenToStop() {
549+
<-h.stopCh
550+
h.stopRoundEventsCh <- struct{}{}
551+
h.stopTransactionEventsCh <- struct{}{}
552+
h.stopAddressEventsCh <- struct{}{}
553+
554+
}
555+
537556
// listenToEvents forwards events from the application layer to the set of listeners
538557
func (h *handler) listenToEvents() {
539558
channel := h.svc.GetEventsChannel(context.Background())

server/internal/interface/grpc/service.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ import (
44
"context"
55
"crypto/tls"
66
"fmt"
7+
"net/http"
8+
"path/filepath"
9+
"strings"
10+
711
arkv1 "github.com/ark-network/ark/api-spec/protobuf/gen/ark/v1"
812
"github.com/ark-network/ark/server/internal/config"
913
"github.com/ark-network/ark/server/internal/core/application"
@@ -23,9 +27,6 @@ import (
2327
"google.golang.org/grpc/credentials/insecure"
2428
grpchealth "google.golang.org/grpc/health/grpc_health_v1"
2529
"google.golang.org/protobuf/encoding/protojson"
26-
"net/http"
27-
"path/filepath"
28-
"strings"
2930
)
3031

3132
const (
@@ -151,9 +152,6 @@ func (s *service) start(withAppSvc bool) error {
151152
}
152153

153154
func (s *service) stop(withAppSvc bool) {
154-
//nolint:all
155-
s.server.Shutdown(context.Background())
156-
log.Info("stopped grpc server")
157155
if withAppSvc {
158156
s.stopCh <- struct{}{}
159157
appSvc, _ := s.appConfig.AppService()
@@ -162,6 +160,9 @@ func (s *service) stop(withAppSvc bool) {
162160
log.Info("stopped app service")
163161
}
164162
}
163+
//nolint:all
164+
s.server.Shutdown(context.Background())
165+
log.Info("stopped grpc server")
165166
}
166167

167168
func (s *service) newServer(tlsConfig *tls.Config, withAppSvc bool) error {

0 commit comments

Comments
 (0)