Skip to content

Commit 3f4a024

Browse files
out_azure_kusto: added buffering support (fluent#9797)
--------- Signed-off-by: Tanmaya Panda <[email protected]>
1 parent e330e45 commit 3f4a024

9 files changed

+2319
-202
lines changed

plugins/out_azure_kusto/CMakeLists.txt

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ set(src
33
azure_kusto_conf.c
44
azure_kusto_ingest.c
55
azure_msiauth.c
6+
azure_kusto_store.c
67
)
78

8-
FLB_PLUGIN(out_azure_kusto "${src}" "")
9+
FLB_PLUGIN(out_azure_kusto "${src}" "")

plugins/out_azure_kusto/azure_kusto.c

+1,112-90
Large diffs are not rendered by default.

plugins/out_azure_kusto/azure_kusto.h

+50-4
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@
2626
#include <fluent-bit/flb_sds.h>
2727
#include <fluent-bit/flb_upstream_ha.h>
2828

29+
#include <fluent-bit/flb_scheduler.h>
30+
#include <fluent-bit/flb_utils.h>
31+
#include <fluent-bit/flb_time.h>
32+
#include <sys/stat.h>
33+
#include <fcntl.h>
34+
2935
/* refresh token every 50 minutes */
3036
#define FLB_AZURE_KUSTO_TOKEN_REFRESH 3000
3137

@@ -53,14 +59,23 @@
5359

5460
#define FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT "60"
5561

62+
#define FLB_AZURE_KUSTO_BUFFER_DIR_MAX_SIZE "8G" /* 8GB buffer directory size */
63+
#define UPLOAD_TIMER_MAX_WAIT 180000
64+
#define UPLOAD_TIMER_MIN_WAIT 18000
65+
#define MAX_FILE_SIZE 4000000000 /* 4GB */
66+
67+
#define FLB_AZURE_IMDS_ENDPOINT "/metadata/identity/oauth2/token"
68+
#define FLB_AZURE_IMDS_API_VERSION "2018-02-01"
69+
#define FLB_AZURE_IMDS_RESOURCE "https://api.kusto.windows.net/"
70+
5671

5772
struct flb_azure_kusto_resources {
5873
struct flb_upstream_ha *blob_ha;
5974
struct flb_upstream_ha *queue_ha;
6075
flb_sds_t identity_token;
6176

6277
/* used to reload resouces after some time */
63-
time_t load_time;
78+
uint64_t load_time;
6479
};
6580

6681
struct flb_azure_kusto {
@@ -75,6 +90,7 @@ struct flb_azure_kusto {
7590
flb_sds_t ingestion_mapping_reference;
7691

7792
int ingestion_endpoint_connect_timeout;
93+
int io_timeout;
7894

7995
/* compress payload */
8096
int compression_enabled;
@@ -88,14 +104,17 @@ struct flb_azure_kusto {
88104
int include_time_key;
89105
flb_sds_t time_key;
90106

91-
/* --- internal data --- */
107+
flb_sds_t azure_kusto_buffer_key;
92108

93-
flb_sds_t ingestion_mgmt_endpoint;
109+
/* --- internal data --- */
94110

95111
/* oauth2 context */
96112
flb_sds_t oauth_url;
97113
struct flb_oauth2 *o;
98114

115+
int timer_created;
116+
int timer_ms;
117+
99118
/* mutex for acquiring oauth tokens */
100119
pthread_mutex_t token_mutex;
101120

@@ -107,9 +126,36 @@ struct flb_azure_kusto {
107126

108127
pthread_mutex_t blob_mutex;
109128

129+
pthread_mutex_t buffer_mutex;
130+
131+
int buffering_enabled;
132+
133+
size_t file_size;
134+
time_t upload_timeout;
135+
time_t retry_time;
136+
137+
int buffer_file_delete_early;
138+
int unify_tag;
139+
int blob_uri_length;
140+
int scheduler_max_retries;
141+
int delete_on_max_upload_error;
142+
143+
int has_old_buffers;
144+
size_t store_dir_limit_size;
145+
/* track the total amount of buffered data */
146+
size_t current_buffer_size;
147+
flb_sds_t buffer_dir;
148+
char *store_dir;
149+
struct flb_fstore *fs;
150+
struct flb_fstore_stream *stream_active; /* default active stream */
151+
struct flb_fstore_stream *stream_upload;
152+
153+
110154
/* Upstream connection to the backend server */
111155
struct flb_upstream *u;
112156

157+
struct flb_upstream *imds_upstream;
158+
113159
/* Fluent Bit context */
114160
struct flb_config *config;
115161

@@ -120,4 +166,4 @@ struct flb_azure_kusto {
120166
flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx);
121167
flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *csl);
122168

123-
#endif
169+
#endif

0 commit comments

Comments
 (0)