1
1
/*
2
2
* dCache - http://www.dcache.org/
3
3
*
4
- * Copyright (C) 2016 - 2021 Deutsches Elektronen-Synchrotron
4
+ * Copyright (C) 2016 - 2024 Deutsches Elektronen-Synchrotron
5
5
*
6
6
* This program is free software: you can redistribute it and/or modify
7
7
* it under the terms of the GNU Affero General Public License as
20
20
package dmg .cells .services ;
21
21
22
22
import static com .google .common .base .Preconditions .checkArgument ;
23
- import static java .nio .charset .StandardCharsets .UTF_8 ;
24
23
25
- import com .google .common .base .Joiner ;
26
24
import com .google .common .collect .ImmutableMap ;
27
- import com .google .common .collect .ImmutableSet ;
28
- import com .google .common .collect .Sets ;
29
25
import com .google .common .net .HostAndPort ;
30
26
import dmg .cells .network .LocationManagerConnector ;
31
27
import dmg .cells .nucleus .CellAdapter ;
36
32
import dmg .cells .zookeeper .LmPersistentNode ;
37
33
import dmg .cells .zookeeper .LmPersistentNode .PersistentNodeException ;
38
34
import dmg .util .CommandException ;
39
- import dmg .util .command .Argument ;
40
35
import dmg .util .command .Command ;
41
36
import java .io .ByteArrayInputStream ;
42
37
import java .io .ByteArrayOutputStream ;
43
38
import java .io .Closeable ;
44
39
import java .io .DataInputStream ;
45
40
import java .io .DataOutputStream ;
46
41
import java .io .IOException ;
47
- import java .net .InetAddress ;
48
42
import java .net .URI ;
49
43
import java .net .UnknownHostException ;
50
44
import java .util .Arrays ;
57
51
import java .util .Set ;
58
52
import java .util .concurrent .Callable ;
59
53
import java .util .concurrent .ExecutionException ;
60
- import java .util .function .BiConsumer ;
61
54
import java .util .function .Consumer ;
62
55
import java .util .stream .Collectors ;
63
56
import javax .net .SocketFactory ;
64
57
import org .apache .curator .framework .CuratorFramework ;
65
58
import org .apache .curator .framework .recipes .cache .ChildData ;
66
- import org .apache .curator .framework .recipes .cache .NodeCache ;
67
- import org .apache .curator .framework .recipes .cache .NodeCacheListener ;
68
59
import org .apache .curator .framework .recipes .cache .PathChildrenCache ;
69
60
import org .apache .curator .framework .recipes .cache .PathChildrenCacheEvent ;
70
61
import org .apache .curator .utils .CloseableUtils ;
71
62
import org .apache .curator .utils .ZKPaths ;
72
- import org .apache .zookeeper .CreateMode ;
73
- import org .apache .zookeeper .KeeperException .ConnectionLossException ;
74
- import org .apache .zookeeper .data .Stat ;
75
63
import org .dcache .ssl .CanlSslSocketCreator ;
76
64
import org .dcache .util .Args ;
77
65
import org .dcache .util .ColumnWriter ;
@@ -88,10 +76,8 @@ public class LocationManager extends CellAdapter {
88
76
LoggerFactory .getLogger (LocationManager .class );
89
77
90
78
private static final String ZK_CORES_URI = "/dcache/lm/cores-uri" ;
91
- private static final String ZK_CORE_CONFIG = "/dcache/lm/core-config" ;
92
79
93
80
private final CoreDomains coreDomains ;
94
- private final CoreConfig coreConfig ;
95
81
private final Args args ;
96
82
private final CellDomainRole role ;
97
83
private final Client client ;
@@ -137,10 +123,6 @@ enum Mode {
137
123
138
124
private final String _mode ;
139
125
140
- private static final ImmutableSet <Mode > tls = ImmutableSet .of (Mode .TLS );
141
- private static final ImmutableSet <Mode > plain = ImmutableSet .of (Mode .PLAIN );
142
- private static final ImmutableSet <Mode > plainAndTls = ImmutableSet .of (Mode .PLAIN , Mode .TLS );
143
-
144
126
Mode (String mode ) {
145
127
_mode = mode ;
146
128
}
@@ -154,17 +136,6 @@ public String getMode() {
154
136
return this ._mode ;
155
137
}
156
138
157
- public Set <Mode > getModeAsSet () {
158
- switch (this ) {
159
- case PLAIN_TLS :
160
- return plainAndTls ;
161
- case TLS :
162
- return tls ;
163
- default :
164
- return plain ;
165
- }
166
- }
167
-
168
139
public static Mode fromString (String mode ) {
169
140
String m = filterAndSort (mode );
170
141
@@ -176,17 +147,6 @@ public static Mode fromString(String mode) {
176
147
throw new IllegalArgumentException ("No Mode of type: " + mode );
177
148
}
178
149
179
- public static boolean isValid (String mode ) {
180
- String m = filterAndSort (mode );
181
-
182
- for (Mode b : Mode .values ()) {
183
- if (b ._mode .equalsIgnoreCase (m )) {
184
- return true ;
185
- }
186
- }
187
- return false ;
188
- }
189
-
190
150
private static String filterAndSort (String mode ) {
191
151
return Arrays .stream (mode .split ("," ))
192
152
.map (String ::trim )
@@ -394,102 +354,6 @@ Map<String, CoreDomainInfo> cores() {
394
354
}
395
355
}
396
356
397
- private class CoreConfig implements NodeCacheListener , Closeable {
398
-
399
- private final CuratorFramework _curator ;
400
-
401
- /**
402
- * Current modes extracted from the CoreDomain configuration.
403
- */
404
- private Mode _mode = Mode .PLAIN ;
405
-
406
- /**
407
- * Cache of the ZooKeeper node identified by {@code _node}.
408
- */
409
- private final NodeCache _cache ;
410
-
411
- /**
412
- * Stat of the last value loaded from the ZooKeeper node identified by {@code _node}.
413
- */
414
- private Stat _current ;
415
-
416
- /**
417
- * Callable to reset the CoreDomains when a change in config is detected
418
- */
419
- private final BiConsumer <Mode , State > _reset ;
420
-
421
- CoreConfig (CuratorFramework curator , BiConsumer <Mode , State > f ) {
422
- _curator = curator ;
423
- _cache = new NodeCache (_curator , ZK_CORE_CONFIG );
424
- _reset = f ;
425
- }
426
-
427
- public Mode getMode () {
428
- return _mode ;
429
- }
430
-
431
- synchronized void start () throws Exception {
432
- _cache .getListenable ().addListener (this );
433
- try {
434
- _cache .start (true );
435
- apply (_cache .getCurrentData ());
436
- } catch (ConnectionLossException e ) {
437
- LOGGER .warn (
438
- "Failed to connect to zookeeper, using mode {} until connection reestablished" ,
439
- _mode );
440
- }
441
- }
442
-
443
- private void apply (ChildData currentData ) {
444
- if (currentData == null ) {
445
- _current = null ;
446
- _mode = Mode .PLAIN ;
447
- LOGGER .info (
448
- "CoreDomain config node " + ZK_CORE_CONFIG + " not present; assuming mode {}" ,
449
- _mode );
450
- } else if (_current == null
451
- || currentData .getStat ().getVersion () > _current .getVersion ()) {
452
- _mode = Mode .fromString (new String (currentData .getData (), UTF_8 ));
453
- LOGGER .info ("CoreDomain config node " + ZK_CORE_CONFIG + " switching to mode {}" ,
454
- _mode );
455
- _current = currentData .getStat ();
456
- } else {
457
- LOGGER .info (
458
- "Ignoring spurious CoreDomain config node " + ZK_CORE_CONFIG + " updated" );
459
- }
460
- }
461
-
462
- @ Override
463
- public synchronized void nodeChanged () throws Exception {
464
- Set <Mode > oldModes = _mode .getModeAsSet ();
465
- apply (_cache .getCurrentData ());
466
- Set <Mode > curModes = _mode .getModeAsSet ();
467
-
468
- // old cur down up
469
- // none,tls - tls = none
470
- // none,tls - none = tls
471
- // none - tls = none tls
472
- // tls - none = tls none
473
- // none - none,tls = tls
474
- // tls - none,tls = none
475
-
476
- Set <Mode > up = Sets .difference (curModes , oldModes ).copyInto (new HashSet <>());
477
- LOGGER .info ("Following modes from CoreDomain are being brought up: [{}]" ,
478
- Joiner .on (',' ).join (up ));
479
- up .stream ().forEach (u -> _reset .accept (u , State .BRING_UP ));
480
-
481
- Set <Mode > down = Sets .difference (oldModes , curModes ).copyInto (new HashSet <>());
482
- LOGGER .info ("Following modes from CoreDomain are being taken down: [{}]" ,
483
- Joiner .on (',' ).join (down ));
484
- down .stream ().forEach (d -> _reset .accept (d , State .TEAR_DOWN ));
485
- }
486
-
487
- @ Override
488
- public void close () {
489
- CloseableUtils .closeQuietly (_cache );
490
- }
491
- }
492
-
493
357
/**
494
358
* Client component of the location manager for satellite domains.
495
359
* <p>
@@ -607,16 +471,21 @@ public class CoreClient extends Client {
607
471
608
472
private LoginManager lmTls ;
609
473
private LoginManager lmPlain ;
474
+ private final Mode connectionType ;
610
475
private volatile CoreDomainInfo info = new CoreDomainInfo ();
611
476
477
+ public CoreClient (Mode connectionType ) {
478
+ this .connectionType = connectionType ;
479
+ }
480
+
612
481
@ Override
613
482
protected boolean shouldConnectTo (String domain ) {
614
483
return domain .compareTo (getCellDomainName ()) < 0 ;
615
484
}
616
485
617
486
@ Override
618
487
public void start () throws Exception {
619
- switch (coreConfig . getMode () ) {
488
+ switch (connectionType ) {
620
489
case PLAIN :
621
490
startListenerWithTcp ();
622
491
break ;
@@ -628,16 +497,11 @@ public void start() throws Exception {
628
497
break ;
629
498
default :
630
499
throw new IllegalArgumentException (
631
- "Mode " + coreConfig . getMode () + "not supported for Core Domain" );
500
+ "Mode " + connectionType + "not supported for Core Domain" );
632
501
}
633
502
coreDomains .setCoreDomainInfoUri (info );
634
503
}
635
504
636
- @ Override
637
- public void close () {
638
- coreConfig .close ();
639
- }
640
-
641
505
@ Override
642
506
public synchronized void reset (Mode mode , State state ) {
643
507
try {
@@ -737,6 +601,7 @@ public LocationManager(String name, String args)
737
601
super (name , "System" , args );
738
602
this .args = getArgs ();
739
603
604
+ Mode connectionType = Mode .fromString (this .args .getOption ("mode" ));
740
605
coreDomains = CoreDomains .createWithMode (getCellDomainName (), getCuratorFramework (),
741
606
this .args .getOpt ("mode" ));
742
607
@@ -745,30 +610,24 @@ public LocationManager(String name, String args)
745
610
switch (role ) {
746
611
case CORE :
747
612
checkArgument (this .args .argc () >= 1 , "Listening port is required." );
748
- client = new CoreClient ();
613
+ client = new CoreClient (connectionType );
749
614
coreDomains .onChange (client ::update );
750
- coreConfig = new CoreConfig (getCuratorFramework (), client ::reset );
751
615
break ;
752
616
default :
753
617
client = new Client ();
754
618
coreDomains .onChange (client ::update );
755
- coreConfig = null ;
756
619
break ;
757
620
}
758
621
} else {
759
622
role = null ;
760
623
client = null ;
761
- coreConfig = null ;
762
624
}
763
625
}
764
626
765
627
@ Override
766
628
protected void started () {
767
629
try {
768
630
coreDomains .start ();
769
- if (coreConfig != null ) {
770
- coreConfig .start ();
771
- }
772
631
if (client != null ) {
773
632
client .start ();
774
633
}
@@ -906,60 +765,4 @@ public String call() {
906
765
return writer .toString ();
907
766
}
908
767
}
909
-
910
- @ Command (name = "set core-config" , hint = "set operating mode for CoreDomain" ,
911
- description = "Specify the mode to be none, tls or none,tls in which the CoreDomain should run" )
912
- class SetCoreConfigCommand implements Callable <String > {
913
-
914
- @ Argument (index = 0 ,
915
- usage = "Mode in which CoreDomain should run." )
916
- String _modes ;
917
-
918
- @ Override
919
- public synchronized String call () throws Exception {
920
- CuratorFramework curator = getCuratorFramework ();
921
- Set <String > modes = Sets .newHashSet (_modes .split ("," ))
922
- .stream ()
923
- .map (String ::trim )
924
- .filter (s -> !s .isEmpty ())
925
- .collect (Collectors .toSet ());
926
-
927
- if (modes .stream ().allMatch (Mode ::isValid )) {
928
- String config = Joiner .on ("," ).join (modes );
929
- byte [] data = config .getBytes (UTF_8 );
930
-
931
- if (curator .checkExists ().forPath (ZK_CORE_CONFIG ) != null ) {
932
- curator .setData ()
933
- .forPath (ZK_CORE_CONFIG , data );
934
- } else {
935
- curator .create ()
936
- .creatingParentContainersIfNeeded ()
937
- .withMode (CreateMode .PERSISTENT )
938
- .forPath (ZK_CORE_CONFIG , data );
939
- }
940
-
941
- if (Arrays .equals (curator .getData ().forPath (ZK_CORE_CONFIG ), data )) {
942
- return "Successfully updated CoreDomain mode configuration to " + config ;
943
- } else {
944
- return "Could not change CoreDomain configuration to " + config ;
945
- }
946
- }
947
-
948
- throw new BadConfigException ("Invalid Modes provided for CoreDomain configuration. " +
949
- "Valid modes are \" none\" , \" tls\" or \" none,tls\" " );
950
- }
951
- }
952
-
953
- @ Command (name = "get core-config" , hint = "get current mode of operation for CoreDomain" ,
954
- description =
955
- "Get the current operating modes of the CoreDomain. It should be one of the following "
956
- +
957
- "\" none\" , \" tls\" or \" none,tls\" ." )
958
- class GetCoreConfigCommand implements Callable <String > {
959
-
960
- @ Override
961
- public String call () throws Exception {
962
- return new String (getCuratorFramework ().getData ().forPath (ZK_CORE_CONFIG ), UTF_8 );
963
- }
964
- }
965
768
}
0 commit comments