Skip to content

Commit 3ddf4ac

Browse files
committed
upstream: fix handling of the busy_flag for keepalive connections
There are situations where a race condition can happen with keepalive connections with a mix of failed states of intermediary connections, an example can be HTTP output + AWS auth, where the AWS auth failed and could not be obtained in the DNS resolution state). This commit ensures the busy_flag is set as soon as the connection is created and only removed when released or an exception happens. When destroying connections, we now not only validate the busy flag but as well check if the connection is not assigned to a priority queue. I also added a new check for the event structure clearing up the states before its used again with the event loop interface. The fixes address the issue described in: #10458 Signed-off-by: Eduardo Silva <[email protected]>
1 parent 3588a8d commit 3ddf4ac

File tree

1 file changed

+35
-3
lines changed

1 file changed

+35
-3
lines changed

src/flb_upstream.c

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,7 @@ static int prepare_destroy_conn(struct flb_connection *u_conn)
530530
/* Add node to destroy queue */
531531
mk_list_add(&u_conn->_head, &uq->destroy_queue);
532532

533+
533534
flb_upstream_decrement_total_connections_count(u);
534535

535536
/*
@@ -555,8 +556,13 @@ static inline int prepare_destroy_conn_safe(struct flb_connection *u_conn)
555556

556557
static int destroy_conn(struct flb_connection *u_conn)
557558
{
558-
/* Delay the destruction of busy connections */
559-
if (u_conn->busy_flag) {
559+
/*
560+
* Delay the destruction if the connection is still marked busy or
561+
* if there are pending events referencing it (e.g. queued in the
562+
* priority bucket queue).
563+
*/
564+
if (u_conn->busy_flag ||
565+
u_conn->event._priority_head.prev != NULL) {
560566
return 0;
561567
}
562568

@@ -614,8 +620,9 @@ static struct flb_connection *create_conn(struct flb_upstream *u)
614620
flb_debug("[upstream] connection #%i failed to %s:%i",
615621
conn->fd, u->tcp_host, u->tcp_port);
616622

617-
prepare_destroy_conn_safe(conn);
623+
/* allow the connection to be destroyed immediately */
618624
conn->busy_flag = FLB_FALSE;
625+
prepare_destroy_conn_safe(conn);
619626

620627
return NULL;
621628
}
@@ -647,16 +654,19 @@ int flb_upstream_destroy(struct flb_upstream *u)
647654

648655
mk_list_foreach_safe(head, tmp, &uq->av_queue) {
649656
u_conn = mk_list_entry(head, struct flb_connection, _head);
657+
u_conn->busy_flag = FLB_FALSE;
650658
prepare_destroy_conn(u_conn);
651659
}
652660

653661
mk_list_foreach_safe(head, tmp, &uq->busy_queue) {
654662
u_conn = mk_list_entry(head, struct flb_connection, _head);
663+
u_conn->busy_flag = FLB_FALSE;
655664
prepare_destroy_conn(u_conn);
656665
}
657666

658667
mk_list_foreach_safe(head, tmp, &uq->destroy_queue) {
659668
u_conn = mk_list_entry(head, struct flb_connection, _head);
669+
u_conn->busy_flag = FLB_FALSE;
660670
destroy_conn(u_conn);
661671
}
662672

@@ -758,9 +768,14 @@ struct flb_connection *flb_upstream_conn_get(struct flb_upstream *u)
758768

759769
flb_stream_release_lock(&u->base);
760770

771+
/* mark connection as busy so it won't be freed until released */
772+
conn->busy_flag = FLB_TRUE;
773+
761774
err = flb_socket_error(conn->fd);
762775

763776
if (!FLB_EINPROGRESS(err) && err != 0) {
777+
/* allow immediate cleanup on failure */
778+
conn->busy_flag = FLB_FALSE;
764779
flb_debug("[upstream] KA connection #%i is in a failed state "
765780
"to: %s:%i, cleaning up",
766781
conn->fd, u->tcp_host, u->tcp_port);
@@ -799,6 +814,8 @@ struct flb_connection *flb_upstream_conn_get(struct flb_upstream *u)
799814
}
800815

801816
if (conn != NULL) {
817+
/* newly created connections return in an idle state; mark busy */
818+
conn->busy_flag = FLB_TRUE;
802819
flb_connection_reset_io_timeout(conn);
803820
flb_upstream_increment_busy_connections_count(u);
804821
}
@@ -828,6 +845,9 @@ int flb_upstream_conn_release(struct flb_connection *conn)
828845
struct flb_upstream *u = conn->upstream;
829846
struct flb_upstream_queue *uq;
830847

848+
/* allow pending destroy to free this connection once we're done */
849+
conn->busy_flag = FLB_FALSE;
850+
831851
flb_upstream_decrement_busy_connections_count(u);
832852

833853
uq = flb_upstream_queue_get(u);
@@ -855,6 +875,18 @@ int flb_upstream_conn_release(struct flb_connection *conn)
855875
* notified if the 'available keepalive connection' gets disconnected by
856876
* the remote endpoint we need to add it again.
857877
*/
878+
/*
879+
* Ensure the event is not registered before we add it back. In some
880+
* error paths the previous handler might not have removed the event
881+
* from the loop which would cause epoll_ctl(ADD) to fail with
882+
* EEXIST. Remove it here if still registered and then reset the
883+
* structure state.
884+
*/
885+
if (MK_EVENT_IS_REGISTERED((&conn->event))) {
886+
mk_event_del(conn->evl, &conn->event);
887+
}
888+
889+
MK_EVENT_NEW(&conn->event);
858890
conn->event.handler = cb_upstream_conn_ka_dropped;
859891

860892
ret = mk_event_add(conn->evl,

0 commit comments

Comments
 (0)