16
16
import org .reactivecommons .async .rabbit .config .ConnectionFactoryProvider ;
17
17
import org .reactivecommons .async .rabbit .config .RabbitProperties ;
18
18
import org .reactivecommons .async .rabbit .config .props .AsyncProps ;
19
+ import org .reactivecommons .async .rabbit .config .spring .RabbitPropertiesBase ;
19
20
import org .springframework .boot .context .properties .PropertyMapper ;
20
21
import reactor .core .publisher .Mono ;
21
22
import reactor .rabbitmq .ChannelPool ;
29
30
import reactor .rabbitmq .Utils ;
30
31
import reactor .util .retry .Retry ;
31
32
33
+ import javax .net .ssl .KeyManager ;
34
+ import javax .net .ssl .KeyManagerFactory ;
35
+ import javax .net .ssl .SSLContext ;
36
+ import javax .net .ssl .TrustManager ;
37
+ import javax .net .ssl .TrustManagerFactory ;
38
+ import javax .net .ssl .X509TrustManager ;
39
+ import java .io .FileInputStream ;
40
+ import java .io .IOException ;
41
+ import java .io .InputStream ;
42
+ import java .security .KeyManagementException ;
43
+ import java .security .KeyStore ;
44
+ import java .security .KeyStoreException ;
45
+ import java .security .NoSuchAlgorithmException ;
46
+ import java .security .SecureRandom ;
47
+ import java .security .UnrecoverableKeyException ;
48
+ import java .security .cert .CertificateException ;
32
49
import java .time .Duration ;
50
+ import java .util .Arrays ;
33
51
import java .util .logging .Level ;
34
52
35
53
@ Log
36
54
@ UtilityClass
37
55
public class RabbitMQSetupUtils {
38
56
private static final String LISTENER_TYPE = "listener" ;
39
57
private static final String SENDER_TYPE = "sender" ;
58
+ private static final String DEFAULT_PROTOCOL ;
59
+ public static final int START_INTERVAL = 300 ;
60
+ public static final int MAX_BACKOFF_INTERVAL = 3000 ;
61
+
62
+ static {
63
+ String protocol = "TLSv1.1" ;
64
+ try {
65
+ String [] protocols = SSLContext .getDefault ().getSupportedSSLParameters ().getProtocols ();
66
+ for (String prot : protocols ) {
67
+ if ("TLSv1.2" .equals (prot )) {
68
+ protocol = "TLSv1.2" ;
69
+ break ;
70
+ }
71
+ }
72
+ } catch (NoSuchAlgorithmException e ) {
73
+ // nothing
74
+ }
75
+ DEFAULT_PROTOCOL = protocol ;
76
+ }
40
77
41
78
@ SneakyThrows
42
79
public static ConnectionFactoryProvider connectionFactoryProvider (RabbitProperties properties ) {
@@ -48,9 +85,7 @@ public static ConnectionFactoryProvider connectionFactoryProvider(RabbitProperti
48
85
map .from (properties ::determinePassword ).whenNonNull ().to (factory ::setPassword );
49
86
map .from (properties ::determineVirtualHost ).whenNonNull ().to (factory ::setVirtualHost );
50
87
factory .useNio ();
51
- if (properties .getSsl () != null && properties .getSsl ().isEnabled ()) {
52
- factory .useSslProtocol ();
53
- }
88
+ setUpSSL (factory , properties );
54
89
return () -> factory ;
55
90
}
56
91
@@ -118,9 +153,95 @@ private static Mono<Connection> createConnectionMono(ConnectionFactory factory,
118
153
log .log (Level .SEVERE , "Error creating connection to RabbitMQ Broker in host" +
119
154
factory .getHost () + ". Starting retry process..." , err )
120
155
)
121
- .retryWhen (Retry .backoff (Long .MAX_VALUE , Duration .ofMillis (300 ))
122
- .maxBackoff (Duration .ofMillis (3000 )))
156
+ .retryWhen (Retry .backoff (Long .MAX_VALUE , Duration .ofMillis (START_INTERVAL ))
157
+ .maxBackoff (Duration .ofMillis (MAX_BACKOFF_INTERVAL )))
123
158
.cache ();
124
159
}
125
160
161
+ // SSL based on RabbitConnectionFactoryBean
162
+ // https://github.com/spring-projects/spring-amqp/blob/main/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitConnectionFactoryBean.java
163
+
164
+ private static void setUpSSL (ConnectionFactory factory , RabbitProperties properties )
165
+ throws NoSuchAlgorithmException , KeyManagementException , KeyStoreException , UnrecoverableKeyException ,
166
+ CertificateException , IOException {
167
+ var ssl = properties .getSsl ();
168
+ if (ssl != null && ssl .isEnabled ()) {
169
+ var keyManagers = configureKeyManagers (ssl );
170
+ var trustManagers = configureTrustManagers (ssl );
171
+ var secureRandom = SecureRandom .getInstanceStrong ();
172
+
173
+ if (log .isLoggable (Level .FINE )) {
174
+ log .fine ("Initializing SSLContext with KM: " + Arrays .toString (keyManagers ) +
175
+ ", TM: " + Arrays .toString (trustManagers ) + ", random: " + secureRandom );
176
+ }
177
+ var context = createSSLContext (ssl );
178
+ context .init (keyManagers , trustManagers , secureRandom );
179
+ factory .useSslProtocol (context );
180
+
181
+ logDetails (trustManagers );
182
+
183
+ if (ssl .getVerifyHostname ()) {
184
+ factory .enableHostnameVerification ();
185
+ }
186
+ }
187
+ }
188
+
189
+ private static KeyManager [] configureKeyManagers (RabbitPropertiesBase .Ssl ssl ) throws KeyStoreException ,
190
+ IOException ,
191
+ NoSuchAlgorithmException ,
192
+ CertificateException , UnrecoverableKeyException {
193
+ KeyManager [] keyManagers = null ;
194
+ if (ssl .getKeyStore () != null ) {
195
+ var ks = KeyStore .getInstance (ssl .getKeyStoreType ());
196
+ char [] keyPassphrase = null ;
197
+ if (ssl .getKeyStorePassword () != null ) {
198
+ keyPassphrase = ssl .getKeyStorePassword ().toCharArray ();
199
+ }
200
+ try (var inputStream = new FileInputStream (ssl .getKeyStore ())) {
201
+ ks .load (inputStream , keyPassphrase );
202
+ }
203
+ var kmf = KeyManagerFactory .getInstance (KeyManagerFactory .getDefaultAlgorithm ());
204
+ kmf .init (ks , keyPassphrase );
205
+ keyManagers = kmf .getKeyManagers ();
206
+ }
207
+ return keyManagers ;
208
+ }
209
+
210
+ private static TrustManager [] configureTrustManagers (RabbitPropertiesBase .Ssl ssl )
211
+ throws KeyStoreException , IOException , NoSuchAlgorithmException , CertificateException {
212
+ KeyStore tks = null ;
213
+ if (ssl .getTrustStore () != null ) {
214
+ tks = KeyStore .getInstance (ssl .getTrustStoreType ());
215
+ char [] trustPassphrase = null ;
216
+ if (ssl .getTrustStorePassword () != null ) {
217
+ trustPassphrase = ssl .getTrustStorePassword ().toCharArray ();
218
+ }
219
+ try (InputStream inputStream = new FileInputStream (ssl .getTrustStore ())) {
220
+ tks .load (inputStream , trustPassphrase );
221
+ }
222
+ }
223
+
224
+ var tmf = TrustManagerFactory .getInstance (TrustManagerFactory .getDefaultAlgorithm ());
225
+ tmf .init (tks );
226
+ return tmf .getTrustManagers ();
227
+ }
228
+
229
+ private static SSLContext createSSLContext (RabbitPropertiesBase .Ssl ssl ) throws NoSuchAlgorithmException {
230
+ return SSLContext .getInstance (ssl .getAlgorithm () != null ? ssl .getAlgorithm () : DEFAULT_PROTOCOL );
231
+ }
232
+
233
+ private static void logDetails (TrustManager [] managers ) {
234
+ var found = false ;
235
+ for (var trustManager : managers ) {
236
+ if (trustManager instanceof X509TrustManager ) {
237
+ found = true ;
238
+ var x509TrustManager = (X509TrustManager ) trustManager ;
239
+ log .info ("Loaded " + x509TrustManager .getAcceptedIssuers ().length + " accepted issuers for rabbitmq" );
240
+ }
241
+ }
242
+ if (!found ) {
243
+ log .warning ("No X509TrustManager found in the truststore." );
244
+ }
245
+ }
246
+
126
247
}
0 commit comments