1
- use vector_config:: configurable_component;
1
+ use http:: { Request , StatusCode , Uri } ;
2
+ use hyper:: Body ;
2
3
4
+ use super :: {
5
+ service:: { ClickhouseRetryLogic , ClickhouseService } ,
6
+ sink:: ClickhouseSink ,
7
+ } ;
3
8
use crate :: {
4
- codecs:: Transformer ,
5
- config:: { AcknowledgementsConfig , Input , SinkConfig , SinkContext } ,
6
- http:: Auth ,
9
+ http:: { get_http_scheme_from_uri, Auth , HttpClient , MaybeAuth } ,
7
10
sinks:: {
8
- util:: {
9
- BatchConfig , Compression , RealtimeSizeBasedDefaultBatchSettings , TowerRequestConfig ,
10
- UriSerde ,
11
- } ,
12
- Healthcheck , VectorSink ,
11
+ prelude:: * ,
12
+ util:: { RealtimeSizeBasedDefaultBatchSettings , UriSerde } ,
13
13
} ,
14
- tls:: TlsConfig ,
15
14
} ;
16
15
17
- use super :: http_sink:: build_http_sink;
18
-
19
16
/// Configuration for the `clickhouse` sink.
20
17
#[ configurable_component( sink( "clickhouse" , "Deliver log data to a ClickHouse database." ) ) ]
21
18
#[ derive( Clone , Debug , Default ) ]
@@ -82,9 +79,41 @@ impl_generate_config_from_default!(ClickhouseConfig);
82
79
#[ typetag:: serde( name = "clickhouse" ) ]
83
80
impl SinkConfig for ClickhouseConfig {
84
81
async fn build ( & self , cx : SinkContext ) -> crate :: Result < ( VectorSink , Healthcheck ) > {
85
- // later we can build different sink(http, native) here
86
- // according to the clickhouseConfig
87
- build_http_sink ( self , cx) . await
82
+ let endpoint = self . endpoint . with_default_parts ( ) . uri ;
83
+ let protocol = get_http_scheme_from_uri ( & endpoint) ;
84
+
85
+ let auth = self . auth . choose_one ( & self . endpoint . auth ) ?;
86
+
87
+ let tls_settings = TlsSettings :: from_options ( & self . tls ) ?;
88
+ let client = HttpClient :: new ( tls_settings, & cx. proxy ) ?;
89
+
90
+ let service = ClickhouseService :: new (
91
+ client. clone ( ) ,
92
+ auth. clone ( ) ,
93
+ & endpoint,
94
+ self . database . as_deref ( ) ,
95
+ self . table . as_str ( ) ,
96
+ self . skip_unknown_fields ,
97
+ self . date_time_best_effort ,
98
+ ) ?;
99
+
100
+ let request_limits = self . request . unwrap_with ( & Default :: default ( ) ) ;
101
+ let service = ServiceBuilder :: new ( )
102
+ . settings ( request_limits, ClickhouseRetryLogic :: default ( ) )
103
+ . service ( service) ;
104
+
105
+ let batch_settings = self . batch . into_batcher_settings ( ) ?;
106
+ let sink = ClickhouseSink :: new (
107
+ batch_settings,
108
+ self . compression ,
109
+ self . encoding . clone ( ) ,
110
+ service,
111
+ protocol,
112
+ ) ;
113
+
114
+ let healthcheck = Box :: pin ( healthcheck ( client, endpoint, auth) ) ;
115
+
116
+ Ok ( ( VectorSink :: from_event_streamsink ( sink) , healthcheck) )
88
117
}
89
118
90
119
fn input ( & self ) -> Input {
@@ -95,3 +124,30 @@ impl SinkConfig for ClickhouseConfig {
95
124
& self . acknowledgements
96
125
}
97
126
}
127
+
128
+ async fn healthcheck ( client : HttpClient , endpoint : Uri , auth : Option < Auth > ) -> crate :: Result < ( ) > {
129
+ // TODO: check if table exists?
130
+ let uri = format ! ( "{}/?query=SELECT%201" , endpoint) ;
131
+ let mut request = Request :: get ( uri) . body ( Body :: empty ( ) ) . unwrap ( ) ;
132
+
133
+ if let Some ( auth) = auth {
134
+ auth. apply ( & mut request) ;
135
+ }
136
+
137
+ let response = client. send ( request) . await ?;
138
+
139
+ match response. status ( ) {
140
+ StatusCode :: OK => Ok ( ( ) ) ,
141
+ status => Err ( HealthcheckError :: UnexpectedStatus { status } . into ( ) ) ,
142
+ }
143
+ }
144
+
145
+ #[ cfg( test) ]
146
+ mod tests {
147
+ use super :: * ;
148
+
149
+ #[ test]
150
+ fn generate_config ( ) {
151
+ crate :: test_util:: test_generate_config :: < ClickhouseConfig > ( ) ;
152
+ }
153
+ }
0 commit comments