Skip to content
This repository was archived by the owner on Aug 2, 2022. It is now read-only.

Commit c2004f7

Browse files
ann3431lucaswin-amzn
authored andcommitted
Adding new type of input for Monitors - HttpInput (#82)
Closes #82
1 parent c0d0e8f commit c2004f7

File tree

12 files changed

+601
-13
lines changed

12 files changed

+601
-13
lines changed

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,14 @@ Please see our [documentation](https://opendistro.github.io/for-elasticsearch-do
2727
1. Check out this package from version control.
2828
1. Launch Intellij IDEA, choose **Import Project**, and select the `settings.gradle` file in the root of this package.
2929
1. To build from the command line, set `JAVA_HOME` to point to a JDK >= 12 before running `./gradlew`.
30+
- Unix System
31+
1. `export JAVA_HOME=jdk-install-dir`: Replace `jdk-install-dir` by the JAVA_HOME directory of your system.
32+
1. `export PATH=$JAVA_HOME/bin:$PATH`
33+
34+
- Windows System
35+
1. Find **My Computers** from file directory, right click and select **properties**.
36+
1. Select the **Advanced** tab, select **Environment variables**.
37+
1. Edit **JAVA_HOME** to path of where JDK software is installed.
3038

3139

3240
## Build

alerting/build.gradle

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,10 @@ configurations.all {
4949
force "commons-logging:commons-logging:${versions.commonslogging}"
5050
force "org.apache.httpcomponents:httpcore:${versions.httpcore}"
5151
force "commons-codec:commons-codec:${versions.commonscodec}"
52-
52+
force "commons-collections:commons-collections:3.2.2"
53+
force "org.apache.httpcomponents:httpcore-nio:4.4.11"
54+
force "org.apache.httpcomponents:httpclient:4.5.7"
55+
5356
// This is required because kotlin coroutines core-1.1.1 still requires kotin stdlib 1.3.20 and we're using 1.3.21
5457
force "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
5558
force "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
@@ -58,6 +61,7 @@ configurations.all {
5861

5962
dependencies {
6063
compileOnly "org.elasticsearch.plugin:elasticsearch-scripting-painless-spi:${versions.elasticsearch}"
64+
compile "org.apache.httpcomponents:httpasyncclient:4.1.4"
6165

6266
// Elasticsearch Nanny state
6367
compile "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"

alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/AlertingPlugin.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import com.amazon.opendistroforelasticsearch.alerting.core.JobSweeper
1919
import com.amazon.opendistroforelasticsearch.alerting.core.ScheduledJobIndices
2020
import com.amazon.opendistroforelasticsearch.alerting.core.action.node.ScheduledJobsStatsAction
2121
import com.amazon.opendistroforelasticsearch.alerting.core.action.node.ScheduledJobsStatsTransportAction
22+
import com.amazon.opendistroforelasticsearch.alerting.core.model.HttpInput
2223
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
2324
import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput
2425
import com.amazon.opendistroforelasticsearch.alerting.core.resthandler.RestScheduledJobStatsHandler
@@ -118,7 +119,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, P
118119
}
119120

120121
override fun getNamedXContent(): List<NamedXContentRegistry.Entry> {
121-
return listOf(Monitor.XCONTENT_REGISTRY, SearchInput.XCONTENT_REGISTRY)
122+
return listOf(Monitor.XCONTENT_REGISTRY, SearchInput.XCONTENT_REGISTRY, HttpInput.XCONTENT_REGISTRY)
122123
}
123124

124125
override fun createComponents(

alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/MonitorRunner.kt

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,12 @@ package com.amazon.opendistroforelasticsearch.alerting
1818
import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertError
1919
import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertIndices
2020
import com.amazon.opendistroforelasticsearch.alerting.alerts.moveAlerts
21+
import com.amazon.opendistroforelasticsearch.alerting.client.HttpInputClient
2122
import com.amazon.opendistroforelasticsearch.alerting.core.JobRunner
23+
import com.amazon.opendistroforelasticsearch.alerting.core.httpapi.suspendUntil
24+
import com.amazon.opendistroforelasticsearch.alerting.core.httpapi.toGetRequest
25+
import com.amazon.opendistroforelasticsearch.alerting.core.httpapi.toMap
26+
import com.amazon.opendistroforelasticsearch.alerting.core.model.HttpInput
2227
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
2328
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
2429
import com.amazon.opendistroforelasticsearch.alerting.core.model.SearchInput
@@ -51,13 +56,14 @@ import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.
5156
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_COUNT
5257
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_MILLIS
5358
import com.amazon.opendistroforelasticsearch.alerting.util.IndexUtils
54-
import org.apache.logging.log4j.LogManager
5559
import kotlinx.coroutines.CoroutineScope
5660
import kotlinx.coroutines.Dispatchers
5761
import kotlinx.coroutines.Job
5862
import kotlinx.coroutines.SupervisorJob
5963
import kotlinx.coroutines.launch
6064
import kotlinx.coroutines.withContext
65+
import org.apache.http.HttpResponse
66+
import org.apache.logging.log4j.LogManager
6167
import org.elasticsearch.ExceptionsHelper
6268
import org.elasticsearch.action.DocWriteRequest
6369
import org.elasticsearch.action.bulk.BackoffPolicy
@@ -105,7 +111,7 @@ class MonitorRunner(
105111
) : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
106112

107113
private val logger = LogManager.getLogger(MonitorRunner::class.java)
108-
114+
private var httpClient: HttpInputClient
109115
private lateinit var runnerSupervisor: Job
110116
override val coroutineContext: CoroutineContext
111117
get() = Dispatchers.Default + runnerSupervisor
@@ -122,14 +128,17 @@ class MonitorRunner(
122128
clusterService.clusterSettings.addSettingsUpdateConsumer(MOVE_ALERTS_BACKOFF_MILLIS, MOVE_ALERTS_BACKOFF_COUNT) {
123129
millis, count -> moveAlertsRetryPolicy = BackoffPolicy.exponentialBackoff(millis, count)
124130
}
131+
httpClient = HttpInputClient()
125132
}
126133

127134
override fun doStart() {
128135
runnerSupervisor = SupervisorJob()
136+
httpClient.client.start()
129137
}
130138

131139
override fun doStop() {
132140
runnerSupervisor.cancel()
141+
httpClient.client.close()
133142
}
134143

135144
override fun doClose() { }
@@ -292,6 +301,29 @@ class MonitorRunner(
292301
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }
293302
results += searchResponse.convertToMap()
294303
}
304+
is HttpInput -> {
305+
val response: HttpResponse = httpClient.client.suspendUntil {
306+
httpClient.client.execute(input.toGetRequest(), it)
307+
}
308+
// Make sure response content length is not larger than 100MB
309+
val contentLengthHeader = response.getFirstHeader("Content-Length").value
310+
311+
// Use content-length header to check size. If content-length header does not exist, set Alert in Error state.
312+
if (contentLengthHeader != null) {
313+
logger.debug("Content length is $contentLengthHeader")
314+
val contentLength = contentLengthHeader.toInt()
315+
if (contentLength > httpClient.MAX_CONTENT_LENGTH) {
316+
throw Exception("Response content size: $contentLength, is larger than ${httpClient.MAX_CONTENT_LENGTH}.")
317+
}
318+
} else {
319+
logger.debug("Content-length header does not exist, set alert to error state.")
320+
throw IllegalArgumentException("Response does not contain content-length header.")
321+
}
322+
323+
results += withContext(Dispatchers.IO) {
324+
response.toMap()
325+
}
326+
}
295327
else -> {
296328
throw IllegalArgumentException("Unsupported input type: ${input.name()}.")
297329
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package com.amazon.opendistroforelasticsearch.alerting.client
17+
18+
import com.amazon.opendistroforelasticsearch.alerting.core.model.HttpInput
19+
import org.apache.http.client.config.RequestConfig
20+
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient
21+
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
22+
import org.elasticsearch.common.unit.ByteSizeUnit
23+
import org.elasticsearch.common.unit.TimeValue
24+
import java.security.AccessController
25+
import java.security.PrivilegedAction
26+
27+
/**
28+
* This class takes [HttpInput] and performs GET requests to given URIs.
29+
*/
30+
class HttpInputClient {
31+
32+
// TODO: If possible, these settings should be implemented as changeable via the "_cluster/settings" API.
33+
private val CONNECTION_TIMEOUT_MILLISECONDS = TimeValue.timeValueSeconds(5).millis().toInt()
34+
private val REQUEST_TIMEOUT_MILLISECONDS = TimeValue.timeValueSeconds(10).millis().toInt()
35+
private val SOCKET_TIMEOUT_MILLISECONDS = TimeValue.timeValueSeconds(10).millis().toInt()
36+
val MAX_CONTENT_LENGTH = ByteSizeUnit.MB.toBytes(100)
37+
38+
val client = createHttpClient()
39+
40+
/**
41+
* Create [CloseableHttpAsyncClient] as a [PrivilegedAction] in order to avoid [java.net.NetPermission] error.
42+
*/
43+
private fun createHttpClient(): CloseableHttpAsyncClient {
44+
val config = RequestConfig.custom()
45+
.setConnectTimeout(CONNECTION_TIMEOUT_MILLISECONDS)
46+
.setConnectionRequestTimeout(REQUEST_TIMEOUT_MILLISECONDS)
47+
.setSocketTimeout(SOCKET_TIMEOUT_MILLISECONDS)
48+
.build()
49+
50+
return AccessController.doPrivileged(PrivilegedAction<CloseableHttpAsyncClient>({
51+
HttpAsyncClientBuilder.create()
52+
.setDefaultRequestConfig(config)
53+
.useSystemProperties()
54+
.build()
55+
} as () -> CloseableHttpAsyncClient))
56+
}
57+
}

alerting/src/main/kotlin/com/amazon/opendistroforelasticsearch/alerting/resthandler/RestIndexMonitorAction.kt

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,25 @@
1414
*/
1515
package com.amazon.opendistroforelasticsearch.alerting.resthandler
1616

17+
import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin
1718
import com.amazon.opendistroforelasticsearch.alerting.core.ScheduledJobIndices
19+
import com.amazon.opendistroforelasticsearch.alerting.core.httpapi.toConstructedUrl
20+
import com.amazon.opendistroforelasticsearch.alerting.core.model.HttpInput
1821
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob
1922
import com.amazon.opendistroforelasticsearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
2023
import com.amazon.opendistroforelasticsearch.alerting.model.Monitor
2124
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.ALERTING_MAX_MONITORS
2225
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT
23-
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.REQUEST_TIMEOUT
24-
import com.amazon.opendistroforelasticsearch.alerting.util.REFRESH
25-
import com.amazon.opendistroforelasticsearch.alerting.util._ID
26-
import com.amazon.opendistroforelasticsearch.alerting.util._VERSION
27-
import com.amazon.opendistroforelasticsearch.alerting.AlertingPlugin
2826
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MAX_ACTION_THROTTLE_VALUE
27+
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.REQUEST_TIMEOUT
2928
import com.amazon.opendistroforelasticsearch.alerting.util.IF_PRIMARY_TERM
3029
import com.amazon.opendistroforelasticsearch.alerting.util.IF_SEQ_NO
3130
import com.amazon.opendistroforelasticsearch.alerting.util.IndexUtils
31+
import com.amazon.opendistroforelasticsearch.alerting.util.REFRESH
32+
import com.amazon.opendistroforelasticsearch.alerting.util._ID
3233
import com.amazon.opendistroforelasticsearch.alerting.util._PRIMARY_TERM
3334
import com.amazon.opendistroforelasticsearch.alerting.util._SEQ_NO
35+
import com.amazon.opendistroforelasticsearch.alerting.util._VERSION
3436
import org.apache.logging.log4j.LogManager
3537
import org.elasticsearch.action.ActionListener
3638
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse
@@ -76,7 +78,7 @@ private val log = LogManager.getLogger(RestIndexMonitorAction::class.java)
7678
* Rest handlers to create and update monitors.
7779
*/
7880
class RestIndexMonitorAction(
79-
settings: Settings,
81+
val settings: Settings,
8082
controller: RestController,
8183
jobIndices: ScheduledJobIndices,
8284
clusterService: ClusterService
@@ -159,6 +161,7 @@ class RestIndexMonitorAction(
159161
*/
160162
private fun prepareMonitorIndexing() {
161163
validateActionThrottle(newMonitor, maxActionThrottle, TimeValue.timeValueMinutes(1))
164+
validateLocalPort(newMonitor, settings.get("http.port").toInt())
162165
if (channel.request().method() == PUT) return updateMonitor()
163166
val query = QueryBuilders.boolQuery().filter(QueryBuilders.termQuery("${Monitor.MONITOR_TYPE}.type", Monitor.MONITOR_TYPE))
164167
val searchSource = SearchSourceBuilder().query(query).timeout(requestTimeout)
@@ -180,6 +183,23 @@ class RestIndexMonitorAction(
180183
}
181184
}
182185

186+
/**
187+
* This function checks whether the [Monitor] has an [HttpInput] with localhost. If so, make sure the port is same as specified in settings.
188+
*/
189+
private fun validateLocalPort(monitor: Monitor, settingsPort: Int) {
190+
for (input in monitor.inputs) {
191+
if (input is HttpInput) {
192+
val constructedUrl = input.toConstructedUrl()
193+
// Make sure that when host is "localhost", only port number specified in settings is allowed.
194+
if (constructedUrl.host == "localhost") {
195+
require(constructedUrl.port == settingsPort) {
196+
"Host: ${constructedUrl.host} is restricted to port $settingsPort."
197+
}
198+
}
199+
}
200+
}
201+
}
202+
183203
/**
184204
* After searching for all existing monitors we validate the system can support another monitor to be created.
185205
*/

0 commit comments

Comments
 (0)