Skip to content

Commit 3867a47

Browse files
denyeartGerrit Code Review
authored and
Gerrit Code Review
committed
Merge "[FAB-15389] Fix private data dissemination"
2 parents 2448b5e + b97ca73 commit 3867a47

File tree

4 files changed

+165
-46
lines changed

4 files changed

+165
-46
lines changed

gossip/privdata/distributor.go

+70-16
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ func (d *distributorImpl) computeDisseminationPlan(txID string,
178178
return nil, errors.WithStack(err)
179179
}
180180

181+
logger.Debugf("Computing dissemination plan for collection [%s]", collectionName)
181182
dPlan, err := d.disseminationPlanForMsg(colAP, colFilter, pvtDataMsg)
182183
if err != nil {
183184
return nil, errors.WithStack(err)
@@ -215,20 +216,37 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces
215216
return nil, err
216217
}
217218

219+
m := pvtDataMsg.GetPrivateData().Payload
220+
218221
eligiblePeers := d.eligiblePeersOfChannel(routingFilter)
219222
identitySets := d.identitiesOfEligiblePeers(eligiblePeers, colAP)
220223

221-
// Select one representative from each org
224+
peerEndpoints := map[string]string{}
225+
for _, peer := range eligiblePeers {
226+
epToAdd := peer.Endpoint
227+
if epToAdd == "" {
228+
epToAdd = peer.InternalEndpoint
229+
}
230+
peerEndpoints[string(peer.PKIid)] = epToAdd
231+
}
232+
222233
maximumPeerCount := colAP.MaximumPeerCount()
223234
requiredPeerCount := colAP.RequiredPeerCount()
224235

236+
remainingPeers := []api.PeerIdentityInfo{}
237+
selectedPeerEndpoints := []string{}
238+
239+
rand.Seed(time.Now().Unix())
240+
// Select one representative from each org
225241
if maximumPeerCount > 0 {
226242
for _, selectionPeers := range identitySets {
227243
required := 1
228244
if requiredPeerCount == 0 {
229245
required = 0
230246
}
231-
peer2SendPerOrg := selectionPeers[rand.Intn(len(selectionPeers))]
247+
selectedPeerIndex := rand.Intn(len(selectionPeers))
248+
peer2SendPerOrg := selectionPeers[selectedPeerIndex]
249+
selectedPeerEndpoints = append(selectedPeerEndpoints, peerEndpoints[string(peer2SendPerOrg.PKIId)])
232250
sc := gossipgossip.SendCriteria{
233251
Timeout: d.pushAckTimeout,
234252
Channel: gossipCommon.ChannelID(d.chainID),
@@ -246,34 +264,70 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces
246264
},
247265
})
248266

267+
// Add unselected peers to remainingPeers
268+
for i, peer := range selectionPeers {
269+
if i != selectedPeerIndex {
270+
remainingPeers = append(remainingPeers, peer)
271+
}
272+
}
273+
249274
if requiredPeerCount > 0 {
250275
requiredPeerCount--
251276
}
252277

253278
maximumPeerCount--
254279
if maximumPeerCount == 0 {
280+
logger.Debug("MaximumPeerCount satisfied")
281+
logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpoints)
255282
return disseminationPlan, nil
256283
}
257284
}
258285
}
259286

260-
// criteria to select remaining peers to satisfy colAP.MaximumPeerCount()
261-
// collection policy parameters
262-
sc := gossipgossip.SendCriteria{
263-
Timeout: d.pushAckTimeout,
264-
Channel: gossipCommon.ChannelID(d.chainID),
265-
MaxPeers: maximumPeerCount,
266-
MinAck: requiredPeerCount,
267-
IsEligible: func(member discovery.NetworkMember) bool {
268-
return routingFilter(member)
269-
},
287+
// criteria to select remaining peers to satisfy colAP.MaximumPeerCount() if there are still
288+
// unselected peers remaining for dissemination
289+
numPeersToSelect := maximumPeerCount
290+
if len(remainingPeers) < maximumPeerCount {
291+
numPeersToSelect = len(remainingPeers)
270292
}
293+
if numPeersToSelect > 0 {
294+
logger.Debugf("MaximumPeerCount not satisfied, selecting %d more peer(s) for dissemination", numPeersToSelect)
295+
}
296+
for maximumPeerCount > 0 && len(remainingPeers) > 0 {
297+
required := 1
298+
if requiredPeerCount == 0 {
299+
required = 0
300+
}
301+
selectedPeerIndex := rand.Intn(len(remainingPeers))
302+
peer2Send := remainingPeers[selectedPeerIndex]
303+
selectedPeerEndpoints = append(selectedPeerEndpoints, peerEndpoints[string(peer2Send.PKIId)])
304+
sc := gossipgossip.SendCriteria{
305+
Timeout: d.pushAckTimeout,
306+
Channel: gossipCommon.ChannelID(d.chainID),
307+
MaxPeers: 1,
308+
MinAck: required,
309+
IsEligible: func(member discovery.NetworkMember) bool {
310+
return bytes.Equal(member.PKIid, peer2Send.PKIId)
311+
},
312+
}
313+
disseminationPlan = append(disseminationPlan, &dissemination{
314+
criteria: sc,
315+
msg: &protoext.SignedGossipMessage{
316+
Envelope: proto.Clone(pvtDataMsg.Envelope).(*protosgossip.Envelope),
317+
GossipMessage: proto.Clone(pvtDataMsg.GossipMessage).(*protosgossip.GossipMessage),
318+
},
319+
})
320+
if requiredPeerCount > 0 {
321+
requiredPeerCount--
322+
}
271323

272-
disseminationPlan = append(disseminationPlan, &dissemination{
273-
criteria: sc,
274-
msg: pvtDataMsg,
275-
})
324+
maximumPeerCount--
325+
326+
// remove the selected peer from remaining peers
327+
remainingPeers = append(remainingPeers[:selectedPeerIndex], remainingPeers[selectedPeerIndex+1:]...)
328+
}
276329

330+
logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpoints)
277331
return disseminationPlan, nil
278332
}
279333

gossip/privdata/distributor_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ func TestDistributor(t *testing.T) {
270270
},
271271
}, 0)
272272
assert.Error(t, err)
273-
assert.Contains(t, err.Error(), "Failed disseminating 4 out of 4 private dissemination plans")
273+
assert.Contains(t, err.Error(), "Failed disseminating 2 out of 2 private dissemination plans")
274274

275275
assert.Equal(t,
276276
[]string{"channel", channelID},

integration/pvtdata/pvtdata_test.go

+76-29
Original file line numberDiff line numberDiff line change
@@ -65,17 +65,7 @@ var _ bool = Describe("PrivateData", func() {
6565
)
6666

6767
BeforeEach(func() {
68-
testDir, network, process, orderer, allPeers = initThreeOrgsSetup()
69-
helper = &testHelper{
70-
networkHelper: &networkHelper{
71-
Network: network,
72-
orderer: orderer,
73-
peers: allPeers,
74-
testDir: testDir,
75-
channelID: "testchannel",
76-
},
77-
}
78-
68+
testDir, network = initThreeOrgsSetup()
7969
legacyChaincode = nwo.Chaincode{
8070
Name: "marblesp",
8171
Version: "1.0",
@@ -101,12 +91,61 @@ var _ bool = Describe("PrivateData", func() {
10191
}
10292
})
10393

94+
JustBeforeEach(func() {
95+
process, orderer, allPeers = startNetwork(network)
96+
helper = &testHelper{
97+
networkHelper: &networkHelper{
98+
Network: network,
99+
orderer: orderer,
100+
peers: allPeers,
101+
testDir: testDir,
102+
channelID: "testchannel",
103+
},
104+
}
105+
})
106+
104107
AfterEach(func() {
105108
testCleanup(testDir, network, process)
106109
})
107110

108-
Describe("Reconciliation", func() {
109-
BeforeEach(func() {
111+
Describe("Dissemination", func() {
112+
When("pulling is disabled by setting the pull retry threshold to 0", func() {
113+
BeforeEach(func() {
114+
// set pull retry threshold to 0
115+
peers := []*nwo.Peer{
116+
network.Peer("org1", "peer0"),
117+
network.Peer("org2", "peer0"),
118+
network.Peer("org3", "peer0"),
119+
}
120+
for _, p := range peers {
121+
core := network.ReadPeerConfig(p)
122+
core.Peer.Gossip.PvtData.PullRetryThreshold = 0
123+
network.WritePeerConfig(p, core)
124+
}
125+
})
126+
127+
JustBeforeEach(func() {
128+
By("deploying legacy chaincode and adding marble1")
129+
testChaincode = chaincode{
130+
Chaincode: legacyChaincode,
131+
isLegacy: true,
132+
}
133+
helper.deployChaincode(testChaincode)
134+
helper.addMarble(testChaincode.Name,
135+
`{"name":"marble1", "color":"blue", "size":35, "owner":"tom", "price":99}`,
136+
network.Peer("org1", "peer0"),
137+
)
138+
})
139+
140+
It("disseminates private data per collections_config1", func() {
141+
helper.assertPvtdataPresencePerCollectionConfig1(testChaincode.Name, "marble1")
142+
})
143+
})
144+
145+
})
146+
147+
Describe("Reconciliation and pulling", func() {
148+
JustBeforeEach(func() {
110149
By("deploying legacy chaincode and adding marble1")
111150
testChaincode = chaincode{
112151
Chaincode: legacyChaincode,
@@ -125,7 +164,7 @@ var _ bool = Describe("PrivateData", func() {
125164
})
126165

127166
When("org3 is added to collectionMarbles via chaincode upgrade with collections_config2", func() {
128-
BeforeEach(func() {
167+
JustBeforeEach(func() {
129168
// collections_config2.json defines the access as follows:
130169
// 1. collectionMarbles - Org1, Org2 and Org3 have access to this collection
131170
// 2. collectionMarblePrivateDetails - Org2 and Org3 have access to this collection
@@ -151,22 +190,22 @@ var _ bool = Describe("PrivateData", func() {
151190
var (
152191
newPeer *nwo.Peer
153192
)
154-
BeforeEach(func() {
193+
JustBeforeEach(func() {
155194
newPeer = network.Peer("org1", "peer1")
156195
helper.addPeer(newPeer)
157196
allPeers = append(allPeers, newPeer)
158197
helper.installChaincode(testChaincode, newPeer)
159198
network.VerifyMembership(allPeers, "testchannel", "marblesp")
160199
})
161200

162-
It("causes the new peer to receive the existing private data only for collectionMarbles", func() {
201+
It("causes the new peer to pull the existing private data only for collectionMarbles", func() {
163202
helper.assertPvtdataPresencePerCollectionConfig1(testChaincode.Name, "marble1", newPeer)
164203
})
165204
})
166205
}
167206

168207
Context("chaincode in legacy lifecycle", func() {
169-
BeforeEach(func() {
208+
JustBeforeEach(func() {
170209
testChaincode = chaincode{
171210
Chaincode: legacyChaincode,
172211
isLegacy: true,
@@ -176,7 +215,7 @@ var _ bool = Describe("PrivateData", func() {
176215
})
177216

178217
Context("chaincode is migrated from legacy to new lifecycle with same collection config", func() {
179-
BeforeEach(func() {
218+
JustBeforeEach(func() {
180219
testChaincode = chaincode{
181220
Chaincode: newLifecycleChaincode,
182221
isLegacy: false,
@@ -228,7 +267,7 @@ var _ bool = Describe("PrivateData", func() {
228267
}
229268

230269
Context("chaincode in legacy lifecycle", func() {
231-
BeforeEach(func() {
270+
JustBeforeEach(func() {
232271
testChaincode = chaincode{
233272
Chaincode: legacyChaincode,
234273
isLegacy: true,
@@ -238,7 +277,7 @@ var _ bool = Describe("PrivateData", func() {
238277
})
239278

240279
Context("chaincode in new lifecycle", func() {
241-
BeforeEach(func() {
280+
JustBeforeEach(func() {
242281
testChaincode = chaincode{
243282
Chaincode: newLifecycleChaincode,
244283
isLegacy: false,
@@ -270,7 +309,7 @@ var _ bool = Describe("PrivateData", func() {
270309
}
271310

272311
Context("chaincode in legacy lifecycle", func() {
273-
BeforeEach(func() {
312+
JustBeforeEach(func() {
274313
testChaincode = chaincode{
275314
Chaincode: legacyChaincode,
276315
isLegacy: true,
@@ -280,7 +319,7 @@ var _ bool = Describe("PrivateData", func() {
280319
})
281320

282321
Context("chaincode in new lifecycle", func() {
283-
BeforeEach(func() {
322+
JustBeforeEach(func() {
284323
testChaincode = chaincode{
285324
Chaincode: newLifecycleChaincode,
286325
isLegacy: false,
@@ -292,7 +331,7 @@ var _ bool = Describe("PrivateData", func() {
292331
})
293332

294333
Describe("Collection Config Updates", func() {
295-
BeforeEach(func() {
334+
JustBeforeEach(func() {
296335
By("deploying legacy chaincode")
297336
testChaincode = chaincode{
298337
Chaincode: legacyChaincode,
@@ -302,7 +341,7 @@ var _ bool = Describe("PrivateData", func() {
302341
})
303342

304343
When("migrating a chaincode from legacy lifecycle to new lifecycle", func() {
305-
BeforeEach(func() {
344+
JustBeforeEach(func() {
306345
nwo.EnableCapabilities(network, "testchannel", "Application", "V2_0", orderer, allPeers...)
307346
newLifecycleChaincode.CollectionsConfig = collectionConfig("short_btl_config.json")
308347
newLifecycleChaincode.PackageID = "test-package-id"
@@ -433,7 +472,7 @@ var _ bool = Describe("PrivateData", func() {
433472
}
434473

435474
Context("chaincode in legacy lifecycle", func() {
436-
BeforeEach(func() {
475+
JustBeforeEach(func() {
437476
testChaincode = chaincode{
438477
Chaincode: legacyChaincode,
439478
isLegacy: true,
@@ -449,7 +488,7 @@ var _ bool = Describe("PrivateData", func() {
449488
})
450489

451490
Context("chaincode in new lifecycle", func() {
452-
BeforeEach(func() {
491+
JustBeforeEach(func() {
453492
testChaincode = chaincode{
454493
Chaincode: newLifecycleChaincode,
455494
isLegacy: false,
@@ -468,7 +507,7 @@ var _ bool = Describe("PrivateData", func() {
468507

469508
})
470509

471-
func initThreeOrgsSetup() (string, *nwo.Network, ifrit.Process, *nwo.Orderer, []*nwo.Peer) {
510+
func initThreeOrgsSetup() (string, *nwo.Network) {
472511
var err error
473512
testDir, err := ioutil.TempDir("", "e2e-pvtdata")
474513
Expect(err).NotTo(HaveOccurred())
@@ -485,8 +524,12 @@ func initThreeOrgsSetup() (string, *nwo.Network, ifrit.Process, *nwo.Orderer, []
485524

486525
n := nwo.New(networkConfig, testDir, client, StartPort(), components)
487526
n.GenerateConfigTree()
488-
n.Bootstrap()
489527

528+
return testDir, n
529+
}
530+
531+
func startNetwork(n *nwo.Network) (ifrit.Process, *nwo.Orderer, []*nwo.Peer) {
532+
n.Bootstrap()
490533
networkRunner := n.NetworkGroupRunner()
491534
process := ifrit.Invoke(networkRunner)
492535
Eventually(process.Ready(), n.EventuallyTimeout).Should(BeClosed())
@@ -504,7 +547,7 @@ func initThreeOrgsSetup() (string, *nwo.Network, ifrit.Process, *nwo.Orderer, []
504547
By("verifying membership")
505548
n.VerifyMembership(expectedPeers, "testchannel")
506549

507-
return testDir, n, process, orderer, expectedPeers
550+
return process, orderer, expectedPeers
508551
}
509552

510553
func testCleanup(testDir string, network *nwo.Network, process ifrit.Process) {
@@ -734,6 +777,10 @@ func (th *testHelper) assertPvtdataPresencePerCollectionConfig2(chaincodeName, m
734777
}
735778
}
736779

780+
func (th *testHelper) assertPvtdataPresencePerCollectionConfig5(chaincodeName, marbleName string, peers ...*nwo.Peer) {
781+
th.assertPvtdataPresencePerCollectionConfig1(chaincodeName, marbleName, peers...)
782+
}
783+
737784
// assertPresentInCollectionM asserts that the private data for given marble is present in collection
738785
// 'readMarble' at the given peers
739786
func (th *testHelper) assertPresentInCollectionM(chaincodeName, marbleName string, peerList ...*nwo.Peer) {

0 commit comments

Comments
 (0)