29
29
#include < mqbblp_clustercatalog.h>
30
30
#include < mqbnet_authenticationcontext.h>
31
31
#include < mqbnet_initialconnectioncontext.h>
32
+ #include < mqbplug_authenticator.h>
32
33
33
34
// BMQ
34
35
#include < bmqio_status.h>
35
36
#include < bmqp_ctrlmsg_messages.h>
36
37
#include < bmqp_protocol.h>
37
38
#include < bmqp_schemaeventbuilder.h>
39
+ #include < bmqsys_threadutil.h>
38
40
#include < bmqu_memoutstream.h>
39
41
40
42
// BDE
41
43
#include < ball_log.h>
42
44
#include < bdlma_sequentialallocator.h>
45
+ #include < bdlmt_threadpool.h>
43
46
#include < bsl_memory.h>
44
47
#include < bsl_ostream.h>
48
+ #include < bsls_nullptr.h>
49
+ #include < bsls_timeinterval.h>
45
50
46
51
namespace BloombergLP {
47
52
namespace mqba {
@@ -62,6 +67,13 @@ int Authenticator::onAuthenticationRequest(
62
67
const bmqp_ctrlmsg::AuthenticationMessage& authenticationMsg,
63
68
const InitialConnectionContextSp& context)
64
69
{
70
+ enum RcEnum {
71
+ // Value for the various RC error categories
72
+ rc_SUCCESS = 0 ,
73
+ rc_AUTHENTICATING_IN_PROGRESS = -1 ,
74
+ rc_FAIL_TO_ENQUEUE_JOB = -2 ,
75
+ };
76
+
65
77
// PRECONDITIONS
66
78
BSLS_ASSERT_SAFE (authenticationMsg.isAuthenticateRequestValue ());
67
79
BSLS_ASSERT_SAFE (context->isIncoming ());
@@ -70,10 +82,6 @@ int Authenticator::onAuthenticationRequest(
70
82
<< context->channel ()->peerUri ()
71
83
<< " ': " << authenticationMsg;
72
84
73
- bmqp_ctrlmsg::AuthenticationMessage authenticationResponse;
74
- bmqp_ctrlmsg::AuthenticateResponse& response =
75
- authenticationResponse.makeAuthenticateResponse ();
76
-
77
85
// Create an AuthenticationContext for that connection
78
86
bsl::shared_ptr<mqbnet::AuthenticationContext> authenticationContext =
79
87
bsl::allocate_shared<mqbnet::AuthenticationContext>(
@@ -85,22 +93,18 @@ int Authenticator::onAuthenticationRequest(
85
93
);
86
94
87
95
context->setAuthenticationContext (authenticationContext);
96
+ authenticationContext->setInitialConnectionContext (context.get ());
88
97
89
- // Always succeeds for now
90
- // TODO: For later implementation, plugins will perform authentication,
91
- // taking the `AuthenticationContext` and updates it with the
92
- // authentication result.
93
- response.status ().category () = bmqp_ctrlmsg::StatusCategory::E_SUCCESS;
94
- response.status ().code () = 0 ;
95
- response.lifetimeMs () = 10 * 60 * 1000 ;
96
-
97
- authenticationContext->state ().testAndSwap (
98
- mqbnet::AuthenticationContext::State::e_AUTHENTICATING,
99
- mqbnet::AuthenticationContext::State::e_AUTHENTICATED);
98
+ // Authenticate
99
+ int rc = d_threadPool.enqueueJob (
100
+ bdlf::BindUtil::bind (&Authenticator::authenticate, this , context));
100
101
101
- int rc = sendAuthenticationMessage (errorDescription,
102
- authenticationResponse,
103
- authenticationContext);
102
+ if (rc != 0 ) {
103
+ errorDescription << " Failed to enqueue authentication job for '"
104
+ << context->channel ()->peerUri () << " ' [rc: " << rc
105
+ << " , message: " << authenticationMsg << " ]" ;
106
+ return rc_FAIL_TO_ENQUEUE_JOB; // RETURN
107
+ }
104
108
105
109
return rc;
106
110
}
@@ -118,7 +122,7 @@ int Authenticator::onAuthenticationResponse(
118
122
int Authenticator::sendAuthenticationMessage (
119
123
bsl::ostream& errorDescription,
120
124
const bmqp_ctrlmsg::AuthenticationMessage& message,
121
- const AuthenticationContextSp& context)
125
+ const InitialConnectionContextSp& context)
122
126
{
123
127
enum RcEnum {
124
128
// Value for the various RC error categories
@@ -127,12 +131,10 @@ int Authenticator::sendAuthenticationMessage(
127
131
rc_WRITE_FAILURE = -2
128
132
};
129
133
130
- bmqp::EncodingType::Enum encodingType = bmqp::EncodingType::e_BER;
131
-
132
134
bdlma::LocalSequentialAllocator<2048 > localAllocator (d_allocator_p);
133
135
134
136
bmqp::SchemaEventBuilder builder (d_blobSpPool_p,
135
- encodingType ,
137
+ context-> authenticationEncodingType () ,
136
138
&localAllocator);
137
139
138
140
int rc = builder.setMessage (message, bmqp::EventType::e_AUTHENTICATION);
@@ -144,8 +146,9 @@ int Authenticator::sendAuthenticationMessage(
144
146
145
147
// Send response event
146
148
bmqio::Status status;
147
- context->initialConnectionContext ()->channel ()->write (&status,
148
- *builder.blob ());
149
+
150
+ context->channel ()->write (&status, *builder.blob ());
151
+
149
152
if (!status) {
150
153
errorDescription << " Failed sending AuthenticationMessage "
151
154
<< " [status: " << status << " , message: " << message
@@ -162,11 +165,104 @@ void Authenticator::initiateOutboundAuthentication(
162
165
BALL_LOG_ERROR << " Not Implemented" ;
163
166
}
164
167
168
+ void Authenticator::authenticate (const InitialConnectionContextSp& context)
169
+ {
170
+ // PRECONDITIONS
171
+ BSLS_ASSERT (context->authenticationContext ());
172
+ BSLS_ASSERT (context->authenticationContext ()->initialConnectionContext ());
173
+
174
+ const bmqp_ctrlmsg::AuthenticateRequest& authenticateRequest =
175
+ context->authenticationContext ()
176
+ ->authenticationMessage ()
177
+ .authenticateRequest ();
178
+
179
+ bsl::shared_ptr<mqbplug::AuthenticationResult> result;
180
+ mqbplug::AuthenticationData authenticationData (
181
+ authenticateRequest.data ().value (),
182
+ context->channel ()->peerUri ());
183
+
184
+ bmqp_ctrlmsg::AuthenticationMessage authenticationResponse;
185
+ bmqp_ctrlmsg::AuthenticateResponse& response =
186
+ authenticationResponse.makeAuthenticateResponse ();
187
+
188
+ bmqu::MemOutStream authenticationErrorStream;
189
+
190
+ BALL_LOG_INFO << " Authenticating connection '"
191
+ << context->channel ()->peerUri () << " ' with mechanism '"
192
+ << authenticateRequest.mechanism () << " '" ;
193
+
194
+ const int authn_rc = d_authnController_p->authenticate (
195
+ authenticationErrorStream,
196
+ &result,
197
+ authenticateRequest.mechanism (),
198
+ authenticationData);
199
+ if (authn_rc != 0 ) {
200
+ BALL_LOG_ERROR << " Authentication failed for connection '"
201
+ << context->channel ()->peerUri () << " ' with mechanism '"
202
+ << authenticateRequest.mechanism ()
203
+ << " ' [rc: " << authn_rc
204
+ << " , error: " << authenticationErrorStream.str ()
205
+ << " ]" ;
206
+
207
+ response.status ().code () = authn_rc;
208
+ response.status ().category () = bmqp_ctrlmsg::StatusCategory::E_REFUSED;
209
+ response.status ().message () = authenticationErrorStream.str ();
210
+
211
+ context->setAuthenticationContext (bsl::nullptr_t ());
212
+ }
213
+ else {
214
+ response.status ().code () = 0 ;
215
+ response.status ().category () = bmqp_ctrlmsg::StatusCategory::E_SUCCESS;
216
+ response.lifetimeMs () = result->lifetimeMs ();
217
+
218
+ context->authenticationContext ()->setAuthenticationResult (result);
219
+
220
+ context->authenticationContext ()->state ().testAndSwap (
221
+ State::e_AUTHENTICATING,
222
+ State::e_AUTHENTICATED);
223
+ }
224
+
225
+ bmqu::MemOutStream sendResponseErrorStream;
226
+
227
+ const int send_rc = sendAuthenticationMessage (sendResponseErrorStream,
228
+ authenticationResponse,
229
+ context);
230
+
231
+ // TODO: if we close channel here, for initial connection, we won't be able
232
+ // to logOpenSessionTime
233
+ // Maybe this shoule be a callback. So can be triggered under different
234
+ // senarios
235
+ if (authn_rc != 0 ) {
236
+ bmqio::Status status (bmqio::StatusCategory::e_GENERIC_ERROR,
237
+ " AuthenticationError" ,
238
+ authn_rc,
239
+ d_allocator_p);
240
+ context->channel ()->close (status);
241
+ }
242
+ else if (send_rc != 0 ) {
243
+ BALL_LOG_ERROR << sendResponseErrorStream.str ();
244
+
245
+ bmqio::Status status (bmqio::StatusCategory::e_GENERIC_ERROR,
246
+ " AuthenticationError" ,
247
+ send_rc,
248
+ d_allocator_p);
249
+ context->channel ()->close (status);
250
+ }
251
+ }
252
+
165
253
// CREATORS
166
- Authenticator::Authenticator (BlobSpPool* blobSpPool,
167
- bslma::Allocator* allocator)
168
- : d_allocator_p(allocator)
254
+ Authenticator::Authenticator (
255
+ mqbauthn::AuthenticationController* authnController,
256
+ BlobSpPool* blobSpPool,
257
+ bslma::Allocator* allocator)
258
+ : d_authnController_p(authnController)
259
+ , d_threadPool(bmqsys::ThreadUtil::defaultAttributes(),
260
+ 0 , // min threads
261
+ 100 , // max threads
262
+ bsls::TimeInterval (120 ).totalMilliseconds(), // idle time
263
+ allocator)
169
264
, d_blobSpPool_p(blobSpPool)
265
+ , d_allocator_p(allocator)
170
266
{
171
267
// NOTHING
172
268
}
@@ -177,6 +273,23 @@ Authenticator::~Authenticator()
177
273
// NOTHING: (required because of inheritance)
178
274
}
179
275
276
+ int Authenticator::start (bsl::ostream& errorDescription)
277
+ {
278
+ int rc = d_threadPool.start ();
279
+ if (rc != 0 ) {
280
+ errorDescription << " Failed to start thread pool for Authenticator"
281
+ << " [rc: " << rc << " ]" ;
282
+ return rc; // RETURN
283
+ }
284
+
285
+ return 0 ;
286
+ }
287
+
288
+ void Authenticator::stop ()
289
+ {
290
+ d_threadPool.stop ();
291
+ }
292
+
180
293
int Authenticator::handleAuthentication (
181
294
bsl::ostream& errorDescription,
182
295
bool * isContinueRead,
0 commit comments