15
15
*/
16
16
package org .jivesoftware .openfire .plugin .util .cache ;
17
17
18
- import com .hazelcast .core .Cluster ;
19
- import com .hazelcast .core .EntryEvent ;
20
- import com .hazelcast .core .EntryEventType ;
21
- import com .hazelcast .core .EntryListener ;
22
- import com .hazelcast .core .LifecycleEvent ;
23
- import com .hazelcast .core .LifecycleEvent .LifecycleState ;
24
- import com .hazelcast .core .LifecycleListener ;
25
- import com .hazelcast .core .MapEvent ;
26
- import com .hazelcast .core .Member ;
27
- import com .hazelcast .core .MemberAttributeEvent ;
28
- import com .hazelcast .core .MembershipEvent ;
29
- import com .hazelcast .core .MembershipListener ;
18
+ import java .nio .charset .StandardCharsets ;
19
+ import java .util .ArrayList ;
20
+ import java .util .Collection ;
21
+ import java .util .Collections ;
22
+ import java .util .HashMap ;
23
+ import java .util .HashSet ;
24
+ import java .util .Iterator ;
25
+ import java .util .List ;
26
+ import java .util .Map ;
27
+ import java .util .Set ;
28
+ import java .util .concurrent .ConcurrentHashMap ;
29
+ import java .util .concurrent .ConcurrentLinkedQueue ;
30
+ import java .util .concurrent .locks .Lock ;
31
+
30
32
import org .jivesoftware .openfire .PacketException ;
31
33
import org .jivesoftware .openfire .RoutingTable ;
32
34
import org .jivesoftware .openfire .SessionManager ;
53
55
import org .xmpp .packet .JID ;
54
56
import org .xmpp .packet .Presence ;
55
57
56
- import java .nio .charset .StandardCharsets ;
57
- import java .util .ArrayList ;
58
- import java .util .Collection ;
59
- import java .util .Collections ;
60
- import java .util .HashMap ;
61
- import java .util .HashSet ;
62
- import java .util .Iterator ;
63
- import java .util .List ;
64
- import java .util .Map ;
65
- import java .util .Set ;
66
- import java .util .concurrent .ConcurrentHashMap ;
67
- import java .util .concurrent .ConcurrentLinkedQueue ;
68
- import java .util .concurrent .locks .Lock ;
58
+ import com .hazelcast .core .Cluster ;
59
+ import com .hazelcast .core .EntryEvent ;
60
+ import com .hazelcast .core .EntryEventType ;
61
+ import com .hazelcast .core .EntryListener ;
62
+ import com .hazelcast .core .LifecycleEvent ;
63
+ import com .hazelcast .core .LifecycleEvent .LifecycleState ;
64
+ import com .hazelcast .core .LifecycleListener ;
65
+ import com .hazelcast .core .MapEvent ;
66
+ import com .hazelcast .core .Member ;
67
+ import com .hazelcast .core .MemberAttributeEvent ;
68
+ import com .hazelcast .core .MembershipEvent ;
69
+ import com .hazelcast .core .MembershipListener ;
69
70
70
71
/**
71
72
* ClusterListener reacts to membership changes in the cluster. It takes care of cleaning up the state
@@ -113,7 +114,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
113
114
private final Map <Cache <?,?>, EntryListener > entryListeners = new HashMap <>();
114
115
115
116
private final Cluster cluster ;
116
- private final Map <String , ClusterNodeInfo > clusterNodesInfo = new ConcurrentHashMap <>();
117
+ private final Map <NodeID , ClusterNodeInfo > clusterNodesInfo = new ConcurrentHashMap <>();
117
118
118
119
/**
119
120
* Flag that indicates if the listener has done all clean up work when noticed that the
@@ -130,7 +131,7 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
130
131
131
132
this .cluster = cluster ;
132
133
for (Member member : cluster .getMembers ()) {
133
- clusterNodesInfo .put (member . getUuid ( ),
134
+ clusterNodesInfo .put (ClusteredCacheFactory . getNodeID ( member ),
134
135
new HazelcastClusterNodeInfo (member , cluster .getClusterTime ()));
135
136
}
136
137
@@ -415,7 +416,7 @@ private class DirectedPresenceListener implements EntryListener<String, Collecti
415
416
416
417
@ Override
417
418
public void entryAdded (EntryEvent <String , Collection <DirectedPresence >> event ) {
418
- byte [] nodeID = event .getMember (). getUuid (). getBytes ( StandardCharsets . UTF_8 );
419
+ final NodeID nodeID = ClusteredCacheFactory . getNodeID ( event .getMember ());
419
420
// Ignore events originated from this JVM
420
421
if (!XMPPServer .getInstance ().getNodeID ().equals (nodeID )) {
421
422
// Check if the directed presence was sent to an entity hosted by this JVM
@@ -429,10 +430,10 @@ public void entryAdded(EntryEvent<String, Collection<DirectedPresence>> event) {
429
430
}
430
431
}
431
432
if (!handlers .isEmpty ()) {
432
- Map <String , Collection <String >> senders = nodePresences .get (NodeID . getInstance ( nodeID ) );
433
+ Map <String , Collection <String >> senders = nodePresences .get (nodeID );
433
434
if (senders == null ) {
434
435
senders = new ConcurrentHashMap <>();
435
- nodePresences .put (NodeID . getInstance ( nodeID ) , senders );
436
+ nodePresences .put (nodeID , senders );
436
437
}
437
438
senders .put (sender , handlers );
438
439
}
@@ -441,7 +442,7 @@ public void entryAdded(EntryEvent<String, Collection<DirectedPresence>> event) {
441
442
442
443
@ Override
443
444
public void entryUpdated (EntryEvent <String , Collection <DirectedPresence >> event ) {
444
- byte [] nodeID = event .getMember (). getUuid (). getBytes ( StandardCharsets . UTF_8 );
445
+ final NodeID nodeID = ClusteredCacheFactory . getNodeID ( event .getMember ());
445
446
// Ignore events originated from this JVM
446
447
if (!XMPPServer .getInstance ().getNodeID ().equals (nodeID )) {
447
448
// Check if the directed presence was sent to an entity hosted by this JVM
@@ -454,10 +455,10 @@ public void entryUpdated(EntryEvent<String, Collection<DirectedPresence>> event)
454
455
handlers .addAll (getReceivers (event , handler ));
455
456
}
456
457
}
457
- Map <String , Collection <String >> senders = nodePresences .get (NodeID . getInstance ( nodeID ) );
458
+ Map <String , Collection <String >> senders = nodePresences .get (nodeID );
458
459
if (senders == null ) {
459
460
senders = new ConcurrentHashMap <>();
460
- nodePresences .put (NodeID . getInstance ( nodeID ) , senders );
461
+ nodePresences .put (nodeID , senders );
461
462
}
462
463
if (!handlers .isEmpty ()) {
463
464
senders .put (sender , handlers );
@@ -475,10 +476,10 @@ public void entryRemoved(EntryEvent<String, Collection<DirectedPresence>> event)
475
476
// Nothing to remove
476
477
return ;
477
478
}
478
- byte [] nodeID = event .getMember (). getUuid (). getBytes ( StandardCharsets . UTF_8 );
479
+ final NodeID nodeID = ClusteredCacheFactory . getNodeID ( event .getMember ());
479
480
if (!XMPPServer .getInstance ().getNodeID ().equals (nodeID )) {
480
481
String sender = event .getKey ();
481
- nodePresences .get (NodeID . getInstance ( nodeID ) ).remove (sender );
482
+ nodePresences .get (nodeID ).remove (sender );
482
483
}
483
484
}
484
485
@@ -509,7 +510,7 @@ public void entryEvicted(EntryEvent<String, Collection<DirectedPresence>> event)
509
510
}
510
511
511
512
private void mapClearedOrEvicted (MapEvent event ) {
512
- NodeID nodeID = NodeID . getInstance (event .getMember (). getUuid (). getBytes ( StandardCharsets . UTF_8 ));
513
+ final NodeID nodeID = ClusteredCacheFactory . getNodeID (event .getMember ());
513
514
// ignore events which were triggered by this node
514
515
if (!XMPPServer .getInstance ().getNodeID ().equals (nodeID )) {
515
516
nodePresences .get (nodeID ).clear ();
@@ -579,7 +580,7 @@ public void entryEvicted(EntryEvent<String, Set<NodeID>> event) {
579
580
}
580
581
581
582
private void mapClearedOrEvicted (MapEvent event ) {
582
- NodeID nodeID = NodeID . getInstance (event .getMember (). getUuid (). getBytes ( StandardCharsets . UTF_8 ));
583
+ final NodeID nodeID = ClusteredCacheFactory . getNodeID (event .getMember ());
583
584
// ignore events which were triggered by this node
584
585
if (!XMPPServer .getInstance ().getNodeID ().equals (nodeID )) {
585
586
Set <String > sessionJIDs = lookupJIDList (nodeID , componentsCache .getName ());
@@ -634,8 +635,8 @@ synchronized void joinCluster() {
634
635
if (seniorClusterMember ) {
635
636
ClusterManager .fireMarkedAsSeniorClusterMember ();
636
637
}
637
- logger .info ("Joined cluster as node: " + cluster . getLocalMember (). getUuid () + ". Senior Member: " +
638
- ( seniorClusterMember ? "YES" : "NO" ) );
638
+ logger .info ("Joined cluster. XMPPServer node={}, Hazelcast UUID={}, seniorClusterMember={}" ,
639
+ new Object []{ ClusteredCacheFactory . getNodeID ( cluster . getLocalMember ()), cluster . getLocalMember (). getUuid (), seniorClusterMember } );
639
640
done = false ;
640
641
}
641
642
@@ -650,6 +651,7 @@ private synchronized void leaveCluster() {
650
651
return ;
651
652
}
652
653
clusterMember = false ;
654
+ final boolean wasSeniorClusterMember = seniorClusterMember ;
653
655
seniorClusterMember = false ;
654
656
// Clean up all traces. This will set all remote sessions as unavailable
655
657
List <NodeID > nodeIDs = new ArrayList <>(nodeSessions .keySet ());
@@ -670,54 +672,55 @@ private synchronized void leaveCluster() {
670
672
// At this point c2s sessions are gone from the routing table so we can identify expired sessions
671
673
XMPPServer .getInstance ().getPresenceUpdateHandler ().removedExpiredPresences ();
672
674
}
673
- logger .info ("Left cluster as node: " + cluster .getLocalMember ().getUuid ());
675
+ logger .info ("Left cluster. XMPPServer node={}, Hazelcast UUID={}, wasSeniorClusterMember={}" ,
676
+ new Object []{ClusteredCacheFactory .getNodeID (cluster .getLocalMember ()), cluster .getLocalMember ().getUuid (), wasSeniorClusterMember });
674
677
done = true ;
675
678
}
676
679
677
680
@ Override
678
681
public void memberAdded (MembershipEvent event ) {
679
682
// local member only
683
+ final NodeID nodeID = ClusteredCacheFactory .getNodeID (event .getMember ());
680
684
if (event .getMember ().localMember ()) { // We left and re-joined the cluster
681
685
joinCluster ();
682
686
} else {
683
- nodePresences .put (NodeID .getInstance (event .getMember ().getUuid ().getBytes (StandardCharsets .UTF_8 )),
684
- new ConcurrentHashMap <>());
687
+ nodePresences .put (nodeID , new ConcurrentHashMap <>());
685
688
// Trigger event that a new node has joined the cluster
686
- ClusterManager .fireJoinedCluster (event . getMember (). getUuid (). getBytes ( StandardCharsets . UTF_8 ), true );
689
+ ClusterManager .fireJoinedCluster (nodeID . toByteArray ( ), true );
687
690
}
688
- clusterNodesInfo .put (event . getMember (). getUuid (),
691
+ clusterNodesInfo .put (nodeID ,
689
692
new HazelcastClusterNodeInfo (event .getMember (), cluster .getClusterTime ()));
690
693
}
691
694
692
695
@ Override
693
696
public void memberRemoved (MembershipEvent event ) {
694
- byte [] nodeID = event .getMember (). getUuid (). getBytes ( StandardCharsets . UTF_8 );
697
+ final NodeID nodeID = ClusteredCacheFactory . getNodeID ( event .getMember ());
695
698
696
699
if (event .getMember ().localMember ()) {
697
- logger .info ("Leaving cluster: " + new String ( nodeID , StandardCharsets . UTF_8 ) );
700
+ logger .info ("Leaving cluster: " + nodeID );
698
701
// This node may have realized that it got kicked out of the cluster
699
702
leaveCluster ();
700
703
} else {
701
704
// Trigger event that a node left the cluster
702
- ClusterManager .fireLeftCluster (nodeID );
705
+ ClusterManager .fireLeftCluster (nodeID . toByteArray () );
703
706
704
707
// Clean up directed presences sent from entities hosted in the leaving node to local entities
705
708
// Clean up directed presences sent to entities hosted in the leaving node from local entities
706
- cleanupDirectedPresences (NodeID . getInstance ( nodeID ) );
709
+ cleanupDirectedPresences (nodeID );
707
710
708
711
if (!seniorClusterMember && isSeniorClusterMember ()) {
709
712
seniorClusterMember = true ;
710
713
ClusterManager .fireMarkedAsSeniorClusterMember ();
711
714
}
712
- cleanupNode (NodeID . getInstance ( nodeID ) );
715
+ cleanupNode (nodeID );
713
716
714
717
// Remove traces of directed presences sent from local entities to handlers that no longer exist.
715
718
// At this point c2s sessions are gone from the routing table so we can identify expired sessions
716
719
XMPPServer .getInstance ().getPresenceUpdateHandler ().removedExpiredPresences ();
717
720
}
718
721
// Delete nodeID instance (release from memory)
719
- NodeID .deleteInstance (nodeID );
720
- clusterNodesInfo .remove (event . getMember (). getUuid ());
722
+ NodeID .deleteInstance (nodeID . toByteArray () );
723
+ clusterNodesInfo .remove (nodeID );
721
724
}
722
725
723
726
@ SuppressWarnings ("WeakerAccess" )
@@ -736,8 +739,8 @@ public void stateChanged(LifecycleEvent event) {
736
739
737
740
@ Override
738
741
public void memberAttributeChanged (MemberAttributeEvent event ) {
739
- ClusterNodeInfo priorNodeInfo = clusterNodesInfo .get (event .getMember (). getUuid ( ));
740
- clusterNodesInfo .put (event .getMember (). getUuid (),
742
+ ClusterNodeInfo priorNodeInfo = clusterNodesInfo .get (ClusteredCacheFactory . getNodeID ( event .getMember ()));
743
+ clusterNodesInfo .put (ClusteredCacheFactory . getNodeID ( event .getMember ()),
741
744
new HazelcastClusterNodeInfo (event .getMember (), priorNodeInfo .getJoinedTime ()));
742
745
}
743
746
@@ -766,7 +769,7 @@ public void entryEvicted(EntryEvent<DomainPair, byte[]> event) {
766
769
}
767
770
768
771
private void handleEntryEvent (EntryEvent <DomainPair , byte []> event , boolean removal ) {
769
- NodeID nodeID = NodeID . getInstance (event .getMember (). getUuid (). getBytes ( StandardCharsets . UTF_8 ));
772
+ NodeID nodeID = ClusteredCacheFactory . getNodeID (event .getMember ());
770
773
// ignore events which were triggered by this node
771
774
if (!XMPPServer .getInstance ().getNodeID ().equals (nodeID )) {
772
775
Set <DomainPair > sessionJIDS = nodeRoutes .get (nodeID );
@@ -782,7 +785,7 @@ private void handleEntryEvent(EntryEvent<DomainPair, byte[]> event, boolean remo
782
785
}
783
786
784
787
private void handleMapEvent (MapEvent event ) {
785
- NodeID nodeID = NodeID . getInstance (event .getMember (). getUuid (). getBytes ( StandardCharsets . UTF_8 ));
788
+ NodeID nodeID = ClusteredCacheFactory . getNodeID (event .getMember ());
786
789
// ignore events which were triggered by this node
787
790
if (!XMPPServer .getInstance ().getNodeID ().equals (nodeID )) {
788
791
Set <DomainPair > sessionJIDS = nodeRoutes .get (nodeID );
0 commit comments