Skip to content

Commit 6d12bd1

Browse files
authored
Merge pull request #3333 from mateka/more-prometheus-monitoring
Allow more control over Prometheus metrics collection
2 parents b2b3b58 + 05a4bc2 commit 6d12bd1

File tree

3 files changed

+88
-24
lines changed

3 files changed

+88
-24
lines changed

doc/configuration.rst

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -637,6 +637,18 @@ marker_table
637637
created if it doesn't already exist. Defaults to "table_updates".
638638

639639

640+
[prometheus]
641+
------------
642+
643+
use_task_family_in_labels
644+
Should task family be used as a prometheus bucket label.
645+
Default value is true.
646+
647+
task_parameters_to_use_in_labels
648+
List of task arguments' names used as additional prometheus bucket labels.
649+
Passed in a form of a json list.
650+
651+
640652
[redshift]
641653
----------
642654

@@ -1045,6 +1057,7 @@ metric_namespace
10451057
Optional prefix to add to the beginning of every metric sent to Datadog.
10461058
Default value is "luigi".
10471059

1060+
10481061
Per Task Retry-Policy
10491062
---------------------
10501063

luigi/contrib/prometheus_metric.py

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,63 +1,84 @@
11
from prometheus_client import CollectorRegistry, Counter, Gauge, generate_latest, CONTENT_TYPE_LATEST
2+
from luigi import parameter
23
from luigi.metrics import MetricsCollector
4+
from luigi.task import Config
5+
6+
7+
class prometheus(Config):
8+
use_task_family_in_labels = parameter.BoolParameter(
9+
default=True, parsing=parameter.BoolParameter.EXPLICIT_PARSING
10+
)
11+
task_parameters_to_use_in_labels = parameter.ListParameter(default=[])
312

413

514
class PrometheusMetricsCollector(MetricsCollector):
615

7-
def __init__(self):
16+
def _generate_task_labels(self, task):
17+
return {
18+
label: task.family if label == "family" else task.params.get(label)
19+
for label in self.labels
20+
}
21+
22+
def __init__(self, *args, **kwargs):
823
super(PrometheusMetricsCollector, self).__init__()
924
self.registry = CollectorRegistry()
25+
config = prometheus(**kwargs)
26+
self.labels = list(config.task_parameters_to_use_in_labels)
27+
if config.use_task_family_in_labels:
28+
self.labels += ["family"]
29+
if not self.labels:
30+
raise ValueError("Prometheus labels cannot be empty (see prometheus configuration)")
1031
self.task_started_counter = Counter(
1132
'luigi_task_started_total',
1233
'number of started luigi tasks',
13-
['family'],
34+
self.labels,
1435
registry=self.registry
1536
)
1637
self.task_failed_counter = Counter(
1738
'luigi_task_failed_total',
1839
'number of failed luigi tasks',
19-
['family'],
40+
self.labels,
2041
registry=self.registry
2142
)
2243
self.task_disabled_counter = Counter(
2344
'luigi_task_disabled_total',
2445
'number of disabled luigi tasks',
25-
['family'],
46+
self.labels,
2647
registry=self.registry
2748
)
2849
self.task_done_counter = Counter(
2950
'luigi_task_done_total',
3051
'number of done luigi tasks',
31-
['family'],
52+
self.labels,
3253
registry=self.registry
3354
)
3455
self.task_execution_time = Gauge(
3556
'luigi_task_execution_time_seconds',
3657
'luigi task execution time in seconds',
37-
['family'],
58+
self.labels,
3859
registry=self.registry
3960
)
4061

4162
def generate_latest(self):
4263
return generate_latest(self.registry)
4364

4465
def handle_task_started(self, task):
45-
self.task_started_counter.labels(family=task.family).inc()
46-
self.task_execution_time.labels(family=task.family)
66+
self.task_started_counter.labels(**self._generate_task_labels(task)).inc()
67+
self.task_execution_time.labels(**self._generate_task_labels(task))
4768

4869
def handle_task_failed(self, task):
49-
self.task_failed_counter.labels(family=task.family).inc()
50-
self.task_execution_time.labels(family=task.family).set(task.updated - task.time_running)
70+
self.task_failed_counter.labels(**self._generate_task_labels(task)).inc()
71+
self.task_execution_time.labels(**self._generate_task_labels(task)).set(task.updated - task.time_running)
5172

5273
def handle_task_disabled(self, task, config):
53-
self.task_disabled_counter.labels(family=task.family).inc()
54-
self.task_execution_time.labels(family=task.family).set(task.updated - task.time_running)
74+
self.task_disabled_counter.labels(**self._generate_task_labels(task)).inc()
75+
self.task_execution_time.labels(**self._generate_task_labels(task)).set(task.updated - task.time_running)
5576

5677
def handle_task_done(self, task):
57-
self.task_done_counter.labels(family=task.family).inc()
78+
self.task_done_counter.labels(**self._generate_task_labels(task)).inc()
5879
# time_running can be `None` if task was already complete
5980
if task.time_running is not None:
60-
self.task_execution_time.labels(family=task.family).set(task.updated - task.time_running)
81+
self.task_execution_time.labels(**self._generate_task_labels(task)).set(task.updated - task.time_running)
6182

6283
def configure_http_handler(self, http_handler):
6384
http_handler.set_header('Content-Type', CONTENT_TYPE_LATEST)

test/contrib/prometheus_metric_test.py

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,28 @@
1515
WORKER = 'myworker'
1616
TASK_ID = 'TaskID'
1717
TASK_FAMILY = 'TaskFamily'
18+
A_PARAM_VALUE = "1"
19+
B_PARAM_VALUE = "2"
20+
C_PARAM_VALUE = "3"
1821

1922

2023
@pytest.mark.contrib
21-
class PrometheusMetricTest(unittest.TestCase):
24+
class PrometheusMetricBaseTest(unittest.TestCase):
25+
COLLECTOR_KWARGS = {}
26+
EXPECTED_LABELS = {"family": TASK_FAMILY}
27+
2228
def setUp(self):
23-
self.collector = PrometheusMetricsCollector()
29+
self.collector = PrometheusMetricsCollector(**self.COLLECTOR_KWARGS)
2430
self.s = Scheduler(metrics_collector=MetricsCollectors.prometheus)
25-
self.gauge_name = 'luigi_task_execution_time_seconds'
26-
self.labels = {'family': TASK_FAMILY}
31+
self.gauge_name = "luigi_task_execution_time_seconds"
2732

2833
def startTask(self):
29-
self.s.add_task(worker=WORKER, task_id=TASK_ID, family=TASK_FAMILY)
34+
self.s.add_task(
35+
worker=WORKER,
36+
task_id=TASK_ID,
37+
family=TASK_FAMILY,
38+
params={"a": A_PARAM_VALUE, "b": B_PARAM_VALUE, "c": C_PARAM_VALUE},
39+
)
3040
task = self.s._state.get_task(TASK_ID)
3141
task.time_running = 0
3242
task.updated = 5
@@ -38,9 +48,11 @@ def test_handle_task_started(self):
3848

3949
counter_name = 'luigi_task_started_total'
4050
gauge_name = self.gauge_name
41-
labels = self.labels
51+
labels = self.EXPECTED_LABELS
4252

43-
assert self.collector.registry.get_sample_value(counter_name, labels=self.labels) == 1
53+
assert (
54+
self.collector.registry.get_sample_value(counter_name, labels=labels) == 1
55+
)
4456
assert self.collector.registry.get_sample_value(gauge_name, labels=labels) == 0
4557

4658
def test_handle_task_failed(self):
@@ -49,7 +61,7 @@ def test_handle_task_failed(self):
4961

5062
counter_name = 'luigi_task_failed_total'
5163
gauge_name = self.gauge_name
52-
labels = self.labels
64+
labels = self.EXPECTED_LABELS
5365

5466
assert self.collector.registry.get_sample_value(counter_name, labels=labels) == 1
5567
assert self.collector.registry.get_sample_value(gauge_name, labels=labels) == task.updated - task.time_running
@@ -60,7 +72,7 @@ def test_handle_task_disabled(self):
6072

6173
counter_name = 'luigi_task_disabled_total'
6274
gauge_name = self.gauge_name
63-
labels = self.labels
75+
labels = self.EXPECTED_LABELS
6476

6577
assert self.collector.registry.get_sample_value(counter_name, labels=labels) == 1
6678
assert self.collector.registry.get_sample_value(gauge_name, labels=labels) == task.updated - task.time_running
@@ -71,7 +83,7 @@ def test_handle_task_done(self):
7183

7284
counter_name = 'luigi_task_done_total'
7385
gauge_name = self.gauge_name
74-
labels = self.labels
86+
labels = self.EXPECTED_LABELS
7587

7688
assert self.collector.registry.get_sample_value(counter_name, labels=labels) == 1
7789
assert self.collector.registry.get_sample_value(gauge_name, labels=labels) == task.updated - task.time_running
@@ -80,3 +92,21 @@ def test_configure_http_handler(self):
8092
mock_http_handler = mock.MagicMock()
8193
self.collector.configure_http_handler(mock_http_handler)
8294
mock_http_handler.set_header.assert_called_once_with('Content-Type', CONTENT_TYPE_LATEST)
95+
96+
97+
@pytest.mark.contrib
98+
class PrometheusMetricTaskParamsOnlyTest(PrometheusMetricBaseTest):
99+
COLLECTOR_KWARGS = {
100+
"use_task_family_in_labels": False,
101+
"task_parameters_to_use_in_labels": ["a", "c"],
102+
}
103+
EXPECTED_LABELS = {"a": A_PARAM_VALUE, "c": C_PARAM_VALUE}
104+
105+
106+
@pytest.mark.contrib
107+
class PrometheusMetricTaskFamilyAndTaskParamsTest(PrometheusMetricBaseTest):
108+
COLLECTOR_KWARGS = {
109+
"use_task_family_in_labels": True,
110+
"task_parameters_to_use_in_labels": ["b"],
111+
}
112+
EXPECTED_LABELS = {"family": TASK_FAMILY, "b": B_PARAM_VALUE}

0 commit comments

Comments
 (0)