Skip to content

Commit 0c5f27f

Browse files
Jason YellickGerrit Code Review
Jason Yellick
authored and
Gerrit Code Review
committed
Merge changes I16b47247,Ibbd47540
* changes: FAB-16544 Override orderer endpoints if specified FAB-16544 Add orderer overrides to config
2 parents 6707428 + abd9e4f commit 0c5f27f

File tree

9 files changed

+279
-27
lines changed

9 files changed

+279
-27
lines changed

core/deliverservice/config.go

+54
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@ SPDX-License-Identifier: Apache-2.0
77
package deliverservice
88

99
import (
10+
"crypto/x509"
1011
"io/ioutil"
1112
"time"
1213

1314
"github.com/hyperledger/fabric/core/comm"
1415
"github.com/hyperledger/fabric/core/config"
16+
"github.com/hyperledger/fabric/internal/pkg/peer/orderers"
1517

1618
"github.com/pkg/errors"
1719
"github.com/spf13/viper"
@@ -38,6 +40,16 @@ type DeliverServiceConfig struct {
3840
KeepaliveOptions comm.KeepaliveOptions
3941
// SecOpts provides the TLS info for connections
4042
SecOpts comm.SecureOptions
43+
44+
// OrdererEndpointOverrides is a map of orderer addresses which should be
45+
// re-mapped to a different orderer endpoint.
46+
OrdererEndpointOverrides map[string]*orderers.Endpoint
47+
}
48+
49+
type AddressOverride struct {
50+
From string `mapstructure:"from"`
51+
To string `mapstructure:"to"`
52+
CACertsFile string `mapstructure:"caCertsFile"`
4153
}
4254

4355
// GlobalConfig obtains a set of configuration from viper, build and returns the config struct.
@@ -47,6 +59,41 @@ func GlobalConfig() *DeliverServiceConfig {
4759
return c
4860
}
4961

62+
func LoadOverridesMap() (map[string]*orderers.Endpoint, error) {
63+
var overrides []AddressOverride
64+
err := viper.UnmarshalKey("peer.deliveryclient.addressOverrides", &overrides)
65+
if err != nil {
66+
return nil, errors.WithMessage(err, "could not unmarshal peer.deliveryclient.addressOverrides")
67+
}
68+
69+
if len(overrides) == 0 {
70+
return nil, nil
71+
}
72+
73+
overrideMap := map[string]*orderers.Endpoint{}
74+
for _, override := range overrides {
75+
certPool := x509.NewCertPool()
76+
if override.CACertsFile != "" {
77+
pem, err := ioutil.ReadFile(override.CACertsFile)
78+
if err != nil {
79+
logger.Warningf("could not read file '%s' specified for caCertsFile of orderer endpoint override from '%s' to '%s': %s", override.CACertsFile, override.From, override.To, err)
80+
continue
81+
}
82+
success := certPool.AppendCertsFromPEM(pem)
83+
if !success {
84+
logger.Warningf("Attempted to create a cert pool for override of orderer address '%s' to '%s' but did not find any valid certs in '%s'", override.From, override.To, override.CACertsFile)
85+
continue
86+
}
87+
}
88+
overrideMap[override.From] = &orderers.Endpoint{
89+
Address: override.To,
90+
CertPool: certPool,
91+
}
92+
}
93+
94+
return overrideMap, nil
95+
}
96+
5097
func (c *DeliverServiceConfig) loadDeliverServiceConfig() {
5198
c.PeerTLSEnabled = viper.GetBool("peer.tls.enabled")
5299

@@ -100,4 +147,11 @@ func (c *DeliverServiceConfig) loadDeliverServiceConfig() {
100147
}
101148
c.SecOpts.Certificate = certPEM
102149
}
150+
151+
overridesMap, err := LoadOverridesMap()
152+
if err != nil {
153+
panic(err)
154+
}
155+
156+
c.OrdererEndpointOverrides = overridesMap
103157
}

core/deliverservice/config_test.go

+100
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
77
package deliverservice_test
88

99
import (
10+
"bytes"
1011
"io/ioutil"
1112
"os"
1213
"path/filepath"
@@ -127,3 +128,102 @@ func TestGlobalConfigDefault(t *testing.T) {
127128

128129
assert.Equal(t, expectedConfig, coreConfig)
129130
}
131+
132+
func TestLoadOverridesMap(t *testing.T) {
133+
defer viper.Reset()
134+
135+
t.Run("GreenPath", func(t *testing.T) {
136+
config := `
137+
peer:
138+
deliveryclient:
139+
addressOverrides:
140+
- from: addressFrom1
141+
to: addressTo1
142+
caCertsFile: testdata/cert.pem
143+
- from: addressFrom2
144+
to: addressTo2
145+
caCertsFile: testdata/cert.pem
146+
`
147+
148+
viper.Reset()
149+
viper.SetConfigType("yaml")
150+
viper.ReadConfig(bytes.NewBuffer([]byte(config)))
151+
res, err := deliverservice.LoadOverridesMap()
152+
require.NoError(t, err)
153+
require.Len(t, res, 2)
154+
ep1, ok := res["addressFrom1"]
155+
require.True(t, ok)
156+
assert.Equal(t, "addressTo1", ep1.Address)
157+
ep2, ok := res["addressFrom2"]
158+
require.True(t, ok)
159+
assert.Equal(t, "addressTo2", ep2.Address)
160+
})
161+
162+
t.Run("MissingCAFiles", func(t *testing.T) {
163+
config := `
164+
peer:
165+
deliveryclient:
166+
addressOverrides:
167+
- from: addressFrom1
168+
to: addressTo1
169+
caCertsFile: missing/cert.pem
170+
- from: addressFrom2
171+
to: addressTo2
172+
caCertsFile: testdata/cert.pem
173+
`
174+
175+
viper.Reset()
176+
viper.SetConfigType("yaml")
177+
viper.ReadConfig(bytes.NewBuffer([]byte(config)))
178+
res, err := deliverservice.LoadOverridesMap()
179+
require.NoError(t, err)
180+
assert.Len(t, res, 1)
181+
})
182+
183+
t.Run("EmptyCAFiles", func(t *testing.T) {
184+
config := `
185+
peer:
186+
deliveryclient:
187+
addressOverrides:
188+
- from: addressFrom1
189+
to: addressTo1
190+
- from: addressFrom2
191+
to: addressTo2
192+
`
193+
194+
viper.Reset()
195+
viper.SetConfigType("yaml")
196+
viper.ReadConfig(bytes.NewBuffer([]byte(config)))
197+
res, err := deliverservice.LoadOverridesMap()
198+
require.NoError(t, err)
199+
assert.Len(t, res, 2)
200+
})
201+
202+
t.Run("BadYaml", func(t *testing.T) {
203+
config := `
204+
peer:
205+
deliveryclient:
206+
addressOverrides: foo
207+
`
208+
209+
viper.Reset()
210+
viper.SetConfigType("yaml")
211+
viper.ReadConfig(bytes.NewBuffer([]byte(config)))
212+
_, err := deliverservice.LoadOverridesMap()
213+
require.EqualError(t, err, "could not unmarshal peer.deliveryclient.addressOverrides: '': source data must be an array or slice, got string")
214+
})
215+
216+
t.Run("EmptyYaml", func(t *testing.T) {
217+
config := `
218+
peer:
219+
deliveryclient:
220+
`
221+
222+
viper.Reset()
223+
viper.SetConfigType("yaml")
224+
viper.ReadConfig(bytes.NewBuffer([]byte(config)))
225+
res, err := deliverservice.LoadOverridesMap()
226+
require.NoError(t, err)
227+
assert.Nil(t, res)
228+
})
229+
}

core/peer/peer.go

+9-8
Original file line numberDiff line numberDiff line change
@@ -174,13 +174,14 @@ func (c *configSupport) GetChannelConfig(cid string) cc.Config {
174174

175175
// A Peer holds references to subsystems and channels associated with a Fabric peer.
176176
type Peer struct {
177-
Server *comm.GRPCServer
178-
ServerConfig comm.ServerConfig
179-
CredentialSupport *comm.CredentialSupport
180-
StoreProvider transientstore.StoreProvider
181-
GossipService *gossipservice.GossipService
182-
LedgerMgr *ledgermgmt.LedgerMgr
183-
CryptoProvider bccsp.BCCSP
177+
Server *comm.GRPCServer
178+
ServerConfig comm.ServerConfig
179+
CredentialSupport *comm.CredentialSupport
180+
StoreProvider transientstore.StoreProvider
181+
GossipService *gossipservice.GossipService
182+
LedgerMgr *ledgermgmt.LedgerMgr
183+
OrdererEndpointOverrides map[string]*orderers.Endpoint
184+
CryptoProvider bccsp.BCCSP
184185

185186
// validationWorkersSemaphore is used to limit the number of concurrent validation
186187
// go routines.
@@ -290,7 +291,7 @@ func (p *Peer) createChannel(
290291

291292
osLogger := flogging.MustGetLogger("peer.orderers")
292293
namedOSLogger := osLogger.With("channel", cid)
293-
ordererSource := orderers.NewConnectionSource(namedOSLogger)
294+
ordererSource := orderers.NewConnectionSource(namedOSLogger, p.OrdererEndpointOverrides)
294295

295296
ordererSourceCallback := func(bundle *channelconfig.Bundle) {
296297
globalAddresses := bundle.ChannelConfig().OrdererAddresses()

gossip/service/gossip_service_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ func TestLeaderElectionWithDeliverClient(t *testing.T) {
192192
gossips[i].deliveryFactory = deliverServiceFactory
193193
deliverServiceFactory.service.running[channelName] = false
194194

195-
gossips[i].InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers")), store.Store, Support{
195+
gossips[i].InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), store.Store, Support{
196196
Committer: &mockLedgerInfo{1},
197197
})
198198
service, exist := gossips[i].leaderElection[channelName]
@@ -256,7 +256,7 @@ func TestWithStaticDeliverClientLeader(t *testing.T) {
256256
for i := 0; i < n; i++ {
257257
gossips[i].deliveryFactory = deliverServiceFactory
258258
deliverServiceFactory.service.running[channelName] = false
259-
gossips[i].InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers")), store.Store, Support{
259+
gossips[i].InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), store.Store, Support{
260260
Committer: &mockLedgerInfo{1},
261261
})
262262
}
@@ -269,7 +269,7 @@ func TestWithStaticDeliverClientLeader(t *testing.T) {
269269
channelName = "chanB"
270270
for i := 0; i < n; i++ {
271271
deliverServiceFactory.service.running[channelName] = false
272-
gossips[i].InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers")), store.Store, Support{
272+
gossips[i].InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), store.Store, Support{
273273
Committer: &mockLedgerInfo{1},
274274
})
275275
}
@@ -317,7 +317,7 @@ func TestWithStaticDeliverClientNotLeader(t *testing.T) {
317317
for i := 0; i < n; i++ {
318318
gossips[i].deliveryFactory = deliverServiceFactory
319319
deliverServiceFactory.service.running[channelName] = false
320-
gossips[i].InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers")), store.Store, Support{
320+
gossips[i].InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), store.Store, Support{
321321
Committer: &mockLedgerInfo{1},
322322
})
323323
}
@@ -365,7 +365,7 @@ func TestWithStaticDeliverClientBothStaticAndLeaderElection(t *testing.T) {
365365
for i := 0; i < n; i++ {
366366
gossips[i].deliveryFactory = deliverServiceFactory
367367
assert.Panics(t, func() {
368-
gossips[i].InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers")), store.Store, Support{
368+
gossips[i].InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), store.Store, Support{
369369
Committer: &mockLedgerInfo{1},
370370
})
371371
}, "Dynamic leader election based and static connection to ordering service can't exist simultaneously")
@@ -943,7 +943,7 @@ func TestInvalidInitialization(t *testing.T) {
943943
go grpcServer.Serve(socket)
944944
defer grpcServer.Stop()
945945

946-
dc := gService.deliveryFactory.Service(gService, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers")), &naiveCryptoService{})
946+
dc := gService.deliveryFactory.Service(gService, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), &naiveCryptoService{})
947947
assert.NotNil(t, dc)
948948
}
949949

gossip/service/integration_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func TestLeaderYield(t *testing.T) {
126126
},
127127
deliverGRPCClient: grpcClient,
128128
}}
129-
gs.InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers")), store.Store, Support{
129+
gs.InitializeChannel(channelName, orderers.NewConnectionSource(flogging.MustGetLogger("peer.orderers"), nil), store.Store, Support{
130130
Committer: &mockLedgerInfo{1},
131131
})
132132
return gs

internal/peer/node/start.go

+9-7
Original file line numberDiff line numberDiff line change
@@ -252,12 +252,16 @@ func serve(args []string) error {
252252
if err != nil {
253253
return errors.WithMessage(err, "failed to open transient store")
254254
}
255+
256+
deliverServiceConfig := deliverservice.GlobalConfig()
257+
255258
peerInstance := &peer.Peer{
256-
Server: peerServer,
257-
ServerConfig: serverConfig,
258-
CredentialSupport: cs,
259-
StoreProvider: transientStoreProvider,
260-
CryptoProvider: factory.GetDefault(),
259+
Server: peerServer,
260+
ServerConfig: serverConfig,
261+
CredentialSupport: cs,
262+
StoreProvider: transientStoreProvider,
263+
CryptoProvider: factory.GetDefault(),
264+
OrdererEndpointOverrides: deliverServiceConfig.OrdererEndpointOverrides,
261265
}
262266

263267
localMSP := mgmt.GetLocalMSP(factory.GetDefault())
@@ -267,8 +271,6 @@ func serve(args []string) error {
267271
}
268272
policyMgr := policies.PolicyManagerGetterFunc(peerInstance.GetPolicyManager)
269273

270-
deliverServiceConfig := deliverservice.GlobalConfig()
271-
272274
deliverGRPCClient, err := comm.NewGRPCClient(comm.ClientConfig{
273275
Timeout: deliverServiceConfig.ConnectionTimeout,
274276
KaOpts: deliverServiceConfig.KeepaliveOptions,

internal/pkg/peer/orderers/connection.go

+24-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type ConnectionSource struct {
2424
allEndpoints []*Endpoint
2525
orgToEndpointsHash map[string][]byte
2626
logger *flogging.FabricLogger
27+
overrides map[string]*Endpoint
2728
}
2829

2930
type Endpoint struct {
@@ -37,10 +38,11 @@ type OrdererOrg struct {
3738
RootCerts [][]byte
3839
}
3940

40-
func NewConnectionSource(logger *flogging.FabricLogger) *ConnectionSource {
41+
func NewConnectionSource(logger *flogging.FabricLogger, overrides map[string]*Endpoint) *ConnectionSource {
4142
return &ConnectionSource{
4243
orgToEndpointsHash: map[string][]byte{},
4344
logger: logger,
45+
overrides: overrides,
4446
}
4547
}
4648

@@ -132,7 +134,7 @@ func (cs *ConnectionSource) Update(globalAddrs []string, orgs map[string]Orderer
132134
// Alert any existing consumers that have a reference to the old endpoints
133135
// that their reference is now stale and they should get a new one.
134136
// This is done even for endpoints which have the same TLS certs and address
135-
// but this is deseriable to help load balance. For instance if only
137+
// but this is desirable to help load balance. For instance if only
136138
// one orderer were defined, and the config is updated to include 4 more, we
137139
// want the peers to disconnect from that original orderer and reconnect
138140
// evenly across the now five.
@@ -161,6 +163,16 @@ func (cs *ConnectionSource) Update(globalAddrs []string, orgs map[string]Orderer
161163
// Note, if !hasOrgEndpoints, this for loop is a no-op, so
162164
// certPool is never referenced.
163165
for _, address := range org.Addresses {
166+
overrideEndpoint, ok := cs.overrides[address]
167+
if ok {
168+
cs.allEndpoints = append(cs.allEndpoints, &Endpoint{
169+
Address: overrideEndpoint.Address,
170+
CertPool: overrideEndpoint.CertPool,
171+
Refreshed: make(chan struct{}),
172+
})
173+
continue
174+
}
175+
164176
cs.allEndpoints = append(cs.allEndpoints, &Endpoint{
165177
Address: address,
166178
CertPool: certPool,
@@ -177,6 +189,16 @@ func (cs *ConnectionSource) Update(globalAddrs []string, orgs map[string]Orderer
177189
}
178190

179191
for _, address := range globalAddrs {
192+
overrideEndpoint, ok := cs.overrides[address]
193+
if ok {
194+
cs.allEndpoints = append(cs.allEndpoints, &Endpoint{
195+
Address: overrideEndpoint.Address,
196+
CertPool: overrideEndpoint.CertPool,
197+
Refreshed: make(chan struct{}),
198+
})
199+
continue
200+
}
201+
180202
cs.allEndpoints = append(cs.allEndpoints, &Endpoint{
181203
Address: address,
182204
CertPool: globalCertPool,

0 commit comments

Comments
 (0)