Skip to content

Commit 3fc1a87

Browse files
committed
- Changed GET_MBRS_REQ to return all registrations if group==null (https://issues.redhat.com/browse/JGRP-2693)
- Moved LazyRemovalCache from blocks -> util - Created LazyRemovalList (+ test) - Changed RouterStub to return all registrations from GossipRouter if group==null
1 parent 6e5ce51 commit 3fc1a87

File tree

13 files changed

+430
-68
lines changed

13 files changed

+430
-68
lines changed

src/org/jgroups/Global.java

+3
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ public final class Global {
7171
public static final String MATCH_HOST="match-host";
7272
public static final String MATCH_INTF="match-interface";
7373

74+
// used to return all groups (https://issues.redhat.com/browse/JGRP-2693)
75+
public static final String ALL_GROUPS="<<all-groups>>";
76+
7477
public static final String PREFIX="org.jgroups.protocols.";
7578
public static final String DEFAULT_PROTOCOL_STACK="udp.xml";
7679

src/org/jgroups/protocols/FD_SOCK.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import org.jgroups.*;
44
import org.jgroups.annotations.*;
5-
import org.jgroups.blocks.LazyRemovalCache;
5+
import org.jgroups.util.LazyRemovalCache;
66
import org.jgroups.conf.AttributeType;
77
import org.jgroups.stack.IpAddress;
88
import org.jgroups.stack.Protocol;

src/org/jgroups/protocols/TP.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
import org.jgroups.*;
55
import org.jgroups.annotations.*;
6-
import org.jgroups.blocks.LazyRemovalCache;
6+
import org.jgroups.util.LazyRemovalCache;
77
import org.jgroups.conf.AttributeType;
88
import org.jgroups.conf.ClassConfigurator;
99
import org.jgroups.conf.PropertyConverters;

src/org/jgroups/stack/GossipRouter.java

+28-17
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@
2727
import java.util.function.BiConsumer;
2828
import java.util.stream.Collectors;
2929

30+
import static org.jgroups.Global.ALL_GROUPS;
31+
import static org.jgroups.stack.GossipType.GET_MBRS_RSP;
32+
import static org.jgroups.stack.GossipType.GET_MBRS_RSP_LAST;
33+
3034
/**
3135
* Router for TCP based group comunication (using layer TCP instead of UDP). Instead of the TCP
3236
* layer sending packets point-to-point to each other member, it sends the packet to the router
@@ -176,10 +180,9 @@ public int numRegisteredClusters() {
176180

177181
@ManagedAttribute(description="The number of registered client (all clusters)")
178182
public int numRegisteredClients() {
179-
return (int)address_mappings.values().stream().mapToLong(s -> s.keySet().size()).sum();
183+
return (int)address_mappings.values().stream().mapToLong(Map::size).sum();
180184
}
181185

182-
183186
public GossipRouter init() throws Exception {
184187
diag=new DiagnosticsHandler(log, socket_factory, thread_factory)
185188
.registerProbeHandler(this)
@@ -188,7 +191,6 @@ public GossipRouter init() throws Exception {
188191
return this;
189192
}
190193

191-
192194
/**
193195
* Lifecycle operation. Called after create(). When this method is called, the managed attributes
194196
* have already been set.<br>
@@ -485,37 +487,46 @@ protected void handleGetMembersRequest(Address sender, DataInput in) {
485487
GossipData req=readRequest(in, GossipType.GET_MBRS);
486488
if(req == null)
487489
return;
488-
GossipData rsp=new GossipData(GossipType.GET_MBRS_RSP, req.getGroup(), null);
489-
Map<Address,Entry> members=address_mappings.get(req.getGroup());
490-
if(members != null) {
491-
for(Map.Entry<Address,Entry> entry : members.entrySet()) {
492-
Address logical_addr=entry.getKey();
493-
PhysicalAddress phys_addr=entry.getValue().phys_addr;
494-
String logical_name=entry.getValue().logical_name;
495-
PingData data=new PingData(logical_addr, true, logical_name, phys_addr);
496-
rsp.addPingData(data);
490+
boolean all_groups=req.getGroup() == null;
491+
Collection<String> clusters=all_groups? address_mappings.keySet() : List.of(req.getGroup());
492+
int num_rsps=clusters.size();
493+
for(String cluster: clusters) {
494+
num_rsps--;
495+
Map<Address,Entry> members=address_mappings.get(cluster);
496+
if(members != null) {
497+
String cluster_name=all_groups? ALL_GROUPS + ":" + cluster : cluster;
498+
GossipData rsp=new GossipData(num_rsps <= 0? GET_MBRS_RSP_LAST : GET_MBRS_RSP, cluster_name, null);
499+
for(Map.Entry<Address,Entry> entry: members.entrySet()) {
500+
Address logical_addr=entry.getKey();
501+
PhysicalAddress phys_addr=entry.getValue().phys_addr;
502+
String logical_name=entry.getValue().logical_name;
503+
PingData data=new PingData(logical_addr, true, logical_name, phys_addr);
504+
rsp.addPingData(data);
505+
}
506+
sendMembersResponse(sender, rsp, cluster);
497507
}
498508
}
509+
}
499510

511+
protected void sendMembersResponse(Address to, GossipData rsp, String group) {
500512
if(dump_msgs == DumpMessages.ALL || log.isDebugEnabled()) {
501513
String rsps=rsp.ping_data == null? null
502514
: rsp.ping_data.stream().map(r -> String.format("%s (%s)", r.getLogicalName(), r.getPhysicalAddr()))
503515
.collect(Collectors.joining(", "));
504516
if(rsps != null) {
505517
if(log.isDebugEnabled())
506-
log.debug("get(%s) -> %s", req.getGroup(), rsps);
518+
log.debug("get(%s) -> %s", group, rsps);
507519
if(dump_msgs == DumpMessages.ALL)
508-
System.out.printf("get(%s) -> %s\n", req.getGroup(), rsps);
520+
System.out.printf("get(%s) -> %s\n", group, rsps);
509521
}
510522
}
511-
512523
ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(rsp.serializedSize());
513524
try {
514525
rsp.writeTo(out);
515-
server.send(sender, out.buffer(), 0, out.position());
526+
server.send(to, out.buffer(), 0, out.position());
516527
}
517528
catch(Exception ex) {
518-
log.error("failed sending %s to %s: %s", GossipType.GET_MBRS_RSP, sender, ex);
529+
log.error("failed sending %s to %s: %s", rsp.type, to, ex);
519530
}
520531
}
521532

src/org/jgroups/stack/GossipType.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,6 @@ public enum GossipType {
1212
GET_MBRS_RSP,
1313
MESSAGE,
1414
HEARTBEAT, // request *and* response
15-
SUSPECT
15+
SUSPECT,
16+
GET_MBRS_RSP_LAST // the last response
1617
}

src/org/jgroups/stack/RouterStub.java

+52-25
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
11
package org.jgroups.stack;
22

33
import org.jgroups.Address;
4+
import org.jgroups.Global;
45
import org.jgroups.PhysicalAddress;
56
import org.jgroups.annotations.GuardedBy;
7+
import org.jgroups.util.*;
68
import org.jgroups.blocks.cs.*;
79
import org.jgroups.logging.Log;
810
import org.jgroups.logging.LogFactory;
911
import org.jgroups.protocols.PingData;
10-
import org.jgroups.util.ByteArrayDataInputStream;
11-
import org.jgroups.util.ByteArrayDataOutputStream;
12-
import org.jgroups.util.SocketFactory;
13-
import org.jgroups.util.Util;
1412

1513
import java.io.DataInput;
1614
import java.net.InetAddress;
@@ -19,6 +17,7 @@
1917

2018
import static java.lang.System.currentTimeMillis;
2119
import static java.util.concurrent.TimeUnit.MILLISECONDS;
20+
import static org.jgroups.stack.GossipType.GET_MBRS_RSP_LAST;
2221
import static org.jgroups.util.Util.printTime;
2322

2423

@@ -28,8 +27,11 @@
2827
*/
2928
public class RouterStub extends ReceiverAdapter implements Comparable<RouterStub>, ConnectionListener {
3029
public interface StubReceiver {void receive(GossipData data);}
31-
public interface MembersNotification {void members(List<PingData> mbrs);}
3230
public interface CloseListener {void closed(RouterStub stub);}
31+
public interface MembersNotification {
32+
void members(List<PingData> mbrs);
33+
default void members(String group, List<PingData> mbrs, boolean last) {members(mbrs);}
34+
}
3335

3436
protected BaseServer client;
3537
protected IpAddress local; // bind address
@@ -55,9 +57,15 @@ public interface CloseListener {void closed(RouterStub stub);}
5557
// When sending and non_blocking, how many messages to queue max
5658
protected int max_send_queue=128;
5759

60+
// the max members in the get_all_members_list
61+
protected int max_cache_size=10;
62+
63+
// the max age of elements in the get_all_members_list
64+
protected long max_cache_age=5000;
65+
5866
// map to correlate GET_MBRS requests and responses
5967
protected final Map<String,List<MembersNotification>> get_members_map=new HashMap<>();
60-
68+
protected final LazyRemovalList<MembersNotification> get_all_members_list;
6169

6270
public RouterStub(InetSocketAddress local_sa, InetSocketAddress remote_sa, boolean use_nio, CloseListener l, SocketFactory sf) {
6371
this(local_sa, remote_sa, use_nio, l, sf, -1);
@@ -93,6 +101,7 @@ public RouterStub(InetSocketAddress local_sa, InetSocketAddress remote_sa, boole
93101
this.max_send_queue=max_send_queue;
94102
if(resolveRemoteAddress()) // sets remote
95103
client=createClient(sf);
104+
get_all_members_list=new LazyRemovalList<>(max_cache_size, max_cache_age);
96105
}
97106

98107

@@ -118,8 +127,10 @@ public RouterStub(InetSocketAddress local_sa, InetSocketAddress remote_sa, boole
118127
public RouterStub nonBlockingSends(boolean b) {this.non_blocking_sends=b; return this;}
119128
public int maxSendQueue() {return max_send_queue;}
120129
public RouterStub maxSendQueue(int s) {this.max_send_queue=s; return this;}
121-
122-
130+
public int maxCacheSize() {return max_cache_size;}
131+
public RouterStub maxCacheSize(int max_cache_size) {this.max_cache_size=max_cache_size; return this;}
132+
public long maxCacheAge() {return max_cache_age;}
133+
public RouterStub maxCacheAge(long max_cache_age) {this.max_cache_age=max_cache_age; return this;}
123134

124135
/**
125136
* Registers mbr with the GossipRouter under the given group, with the given logical name and physical address.
@@ -180,10 +191,14 @@ public void destroy() {
180191
public void getMembers(final String group, MembersNotification callback) throws Exception {
181192
if(callback == null)
182193
return;
183-
// if(!isConnected()) throw new Exception ("not connected");
184-
synchronized(get_members_map) {
185-
List<MembersNotification> set=get_members_map.computeIfAbsent(group, k -> new ArrayList<>());
186-
set.add(callback);
194+
if(group == null) {
195+
get_all_members_list.add(callback);
196+
}
197+
else {
198+
synchronized(get_members_map) {
199+
List<MembersNotification> list=get_members_map.computeIfAbsent(group, __ -> new ArrayList<>());
200+
list.add(callback);
201+
}
187202
}
188203
try {
189204
writeRequest(new GossipData(GossipType.GET_MBRS, group, null));
@@ -230,12 +245,16 @@ public void receive(Address sender, DataInput in, int length) {
230245
receiver.receive(data);
231246
break;
232247
case GET_MBRS_RSP:
233-
notifyResponse(data.getGroup(), data.getPingData());
248+
case GET_MBRS_RSP_LAST:
249+
List<PingData> ping_data=data.getPingData();
250+
if(ping_data != null)
251+
notifyResponse(data.getGroup(), data.getPingData(), data.getType() == GET_MBRS_RSP_LAST);
234252
break;
235253
}
236254
if(handle_heartbeats)
237255
last_heartbeat=currentTimeMillis();
238-
} catch(Exception ex) {
256+
}
257+
catch(Exception ex) {
239258
log.error(Util.getMessage("FailedReadingData"), ex);
240259
}
241260
}
@@ -299,28 +318,36 @@ public void writeRequest(GossipData req) throws Exception {
299318
}
300319

301320
protected void removeResponse(String group, MembersNotification notif) {
321+
if(group == null || group.equals(Global.ALL_GROUPS)) {
322+
get_all_members_list.remove(notif);
323+
return;
324+
}
302325
synchronized(get_members_map) {
303-
List<MembersNotification> set=get_members_map.get(group);
304-
if(set == null || set.isEmpty()) {
326+
List<MembersNotification> list=get_members_map.get(group);
327+
if(list == null || list.isEmpty()) {
305328
get_members_map.remove(group);
306329
return;
307330
}
308-
if(set.remove(notif) && set.isEmpty())
331+
if(list.remove(notif) && list.isEmpty())
309332
get_members_map.remove(group);
310333
}
311334
}
312335

313-
protected void notifyResponse(String group, List<PingData> list) {
314-
if(group == null)
336+
protected void notifyResponse(String group, final List<PingData> list, boolean last) {
337+
if(group.startsWith(Global.ALL_GROUPS)) {
338+
int index=group.indexOf(':');
339+
final String grp=group.substring(index+1);
340+
get_all_members_list.forEach(n -> n.members(grp, list, last));
341+
if(last)
342+
get_all_members_list.clear(false);
315343
return;
316-
if(list == null)
317-
list=Collections.emptyList();
344+
}
318345
synchronized(get_members_map) {
319-
List<MembersNotification> set=get_members_map.get(group);
320-
while(set != null && !set.isEmpty()) {
346+
List<MembersNotification> l=get_members_map.get(group);
347+
while(l != null && !l.isEmpty()) {
321348
try {
322-
MembersNotification rsp=set.remove(0);
323-
rsp.members(list);
349+
MembersNotification rsp=l.remove(0);
350+
rsp.members(group, list, last);
324351
}
325352
catch(Throwable t) {
326353
log.error("failed notifying %s: %s", group, t);

src/org/jgroups/blocks/LazyRemovalCache.java renamed to src/org/jgroups/util/LazyRemovalCache.java

+14-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
package org.jgroups.blocks;
2-
3-
import org.jgroups.util.Util;
1+
package org.jgroups.util;
42

53
import java.util.*;
64
import java.util.concurrent.ConcurrentMap;
@@ -51,6 +49,19 @@ public boolean addIfAbsent(K key, V val) {
5149
return add(key, val, true);
5250
}
5351

52+
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
53+
Objects.requireNonNull(key);
54+
Objects.requireNonNull(mappingFunction);
55+
V v=get(key);
56+
if(v != null)
57+
return v;
58+
if((v=mappingFunction.apply(key)) != null) {
59+
add(key, v);
60+
return v;
61+
}
62+
return v;
63+
}
64+
5465
public void addAll(Map<K,V> m) {
5566
addAll(m, false);
5667
}
@@ -75,7 +86,6 @@ public boolean containsKeys(Collection<K> keys) {
7586
return true;
7687
}
7788

78-
7989
public V get(K key) {
8090
if(key == null)
8191
return null;

0 commit comments

Comments
 (0)