Skip to content

Commit 4779f0e

Browse files
committed
Add task metadata to /apps endpoint (#118)
* Export TaskInfo from the mesos agent collector We will reference TaskInfo in a map of container IDs to task info in a later commit. * Add a struct for container->task relationships ContainerTaskRels has a single member, a map of container IDs to TaskInfo references. It has a getter and a setter for easy access threadsafe access to the relationship map, and an update method which populates it from agentState. * Return task_id and task_name in /app output The new ContainerTaskRels argument providers access to the task metadata for each container. * Add tests for extracting a record with task data We add a new mock object with appropriate metadata and supply it to avroRecord.extract(), checking that the task IDs in the ContainerTaskRels are the same ones that correspond to the container ID. * Add tests for executor ID and framework ID This is orthogonal to the intent of this PR, however it's a small change and seemed like a quick win. * Refer to ContainerTaskRels in Framework collector This reference can be provided to the extract method from the framework collector. * Pass ref to ContainerTaskRels into transform * Update task mapping from agent collector We update the task mapping after every new fetch from agent state. * Pass reference to mapping into framework collector Now the framework collector has access to the ContainerTaskRels object which belongs to the mesos agent collector. * Clarify arg name * Defer unlock for consistency * Temporarily skip golint in build In scripts/test.sh we naively run go get to update golint. A new rule has just been added to golint (golang/lint#319) and I don't want to complicate my current PR (#118). I will revert this commit, fix any outstanding nil-returns and consider pinning golint in a future PR. * Create new TaskInfo object to avoid memory issues * Create ContainerTaskRels in dcos-metrics.go Rather than initiating ContainerTaskRels in the framework collector, we now initiate it in dcos-metrics.go and pass a reference into each collector instead. * Initiate ContainerTaskRels with a utility method * Remove unnecessary nil check
1 parent a3c0a1b commit 4779f0e

File tree

9 files changed

+192
-39
lines changed

9 files changed

+192
-39
lines changed

collectors/framework/framework.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
log "github.com/Sirupsen/logrus"
2727

2828
"github.com/dcos/dcos-metrics/collectors"
29+
mesosAgent "github.com/dcos/dcos-metrics/collectors/mesos/agent"
2930
"github.com/dcos/dcos-metrics/producers"
3031
"github.com/linkedin/goavro"
3132
)
@@ -54,8 +55,9 @@ type Collector struct {
5455
InputLimitAmountKBytesFlag int
5556
InputLimitPeriodFlag int
5657

57-
metricsChan chan producers.MetricsMessage
58-
nodeInfo collectors.NodeInfo
58+
metricsChan chan producers.MetricsMessage
59+
nodeInfo collectors.NodeInfo
60+
containerTaskRels *mesosAgent.ContainerTaskRels
5961
}
6062

6163
// countingReader is an io.Reader that provides counts of the number of bytes
@@ -66,10 +68,11 @@ type countingReader struct {
6668
}
6769

6870
// New returns a new instance of the framework collector.
69-
func New(cfg Collector, nodeInfo collectors.NodeInfo) (Collector, chan producers.MetricsMessage) {
71+
func New(cfg Collector, nodeInfo collectors.NodeInfo, ctr *mesosAgent.ContainerTaskRels) (Collector, chan producers.MetricsMessage) {
7072
c := cfg
7173
c.nodeInfo = nodeInfo
7274
c.metricsChan = make(chan producers.MetricsMessage)
75+
c.containerTaskRels = ctr
7376
return c, c.metricsChan
7477
}
7578

@@ -170,7 +173,7 @@ func (c *Collector) handleConnection(conn net.Conn) {
170173
}
171174

172175
ad := &AvroDatum{datum, topic, approxBytesRead}
173-
pmm, err := ad.transform(c.nodeInfo)
176+
pmm, err := ad.transform(c.nodeInfo, c.containerTaskRels)
174177
if err != nil {
175178
fwColLog.Error(err)
176179
}
@@ -179,7 +182,7 @@ func (c *Collector) handleConnection(conn net.Conn) {
179182
}
180183

181184
// transform creates a MetricsMessage from the Avro data coming in on our TCP channel.
182-
func (a *AvroDatum) transform(nodeInfo collectors.NodeInfo) (producers.MetricsMessage, error) {
185+
func (a *AvroDatum) transform(nodeInfo collectors.NodeInfo, ctr *mesosAgent.ContainerTaskRels) (producers.MetricsMessage, error) {
183186
var (
184187
tagData = avroRecord{}
185188
datapointData = avroRecord{}
@@ -211,7 +214,7 @@ func (a *AvroDatum) transform(nodeInfo collectors.NodeInfo) (producers.MetricsMe
211214
return pmm, err
212215
}
213216

214-
if err := tagData.extract(&pmm); err != nil {
217+
if err := tagData.extract(&pmm, ctr); err != nil {
215218
return pmm, err
216219
}
217220

@@ -252,7 +255,7 @@ func (a *AvroDatum) transform(nodeInfo collectors.NodeInfo) (producers.MetricsMe
252255
return pmm, err
253256
}
254257

255-
if err := datapointData.extract(&pmm); err != nil {
258+
if err := datapointData.extract(&pmm, ctr); err != nil {
256259
return pmm, err
257260
}
258261

collectors/framework/framework_test.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"time"
2626

2727
"github.com/dcos/dcos-metrics/collectors"
28+
mesosAgent "github.com/dcos/dcos-metrics/collectors/mesos/agent"
2829
"github.com/dcos/dcos-metrics/producers"
2930
"github.com/dcos/dcos-metrics/schema/metrics_schema"
3031
"github.com/linkedin/goavro"
@@ -50,7 +51,7 @@ var (
5051
func TestNew(t *testing.T) {
5152
Convey("When creating a new instance of the framework collector", t, func() {
5253
Convey("Should return a new Collector with the default config", func() {
53-
f, fc := New(mockCollectorConfig, mockNodeInfo)
54+
f, fc := New(mockCollectorConfig, mockNodeInfo, &mesosAgent.ContainerTaskRels{})
5455
So(f, ShouldHaveSameTypeAs, Collector{})
5556
So(fc, ShouldHaveSameTypeAs, make(chan producers.MetricsMessage))
5657
So(f.InputLimitAmountKBytesFlag, ShouldEqual, mockCollectorConfig.InputLimitAmountKBytesFlag)
@@ -91,7 +92,7 @@ func TestTransform(t *testing.T) {
9192
rec.Set("datapoints", []interface{}{recDps})
9293

9394
a := AvroDatum{Record: rec, Topic: "some-topic"}
94-
pmm, err := a.transform(mockNodeInfo)
95+
pmm, err := a.transform(mockNodeInfo, &mesosAgent.ContainerTaskRels{})
9596
So(pmm, ShouldHaveSameTypeAs, producers.MetricsMessage{})
9697

9798
// If we could mock the time here, we could do a single assertion
@@ -115,7 +116,7 @@ func TestTransform(t *testing.T) {
115116

116117
Convey("Should return an error if AvroDatum didn't contain a goavro.Record", func() {
117118
a := AvroDatum{Record: make(map[string]string), Topic: "some-topic"}
118-
_, err = a.transform(mockNodeInfo)
119+
_, err = a.transform(mockNodeInfo, &mesosAgent.ContainerTaskRels{})
119120
So(err, ShouldNotBeNil)
120121
})
121122

@@ -128,7 +129,7 @@ func TestTransform(t *testing.T) {
128129
rec.Set("datapoints", []interface{}{recDps})
129130

130131
a := AvroDatum{Record: rec, Topic: "some-topic"}
131-
_, err = a.transform(mockNodeInfo)
132+
_, err = a.transform(mockNodeInfo, &mesosAgent.ContainerTaskRels{})
132133
So(err, ShouldNotBeNil)
133134
})
134135

@@ -141,7 +142,7 @@ func TestTransform(t *testing.T) {
141142
rec.Set("tags", []interface{}{recTags})
142143

143144
a := AvroDatum{Record: rec, Topic: "some-topic"}
144-
_, err = a.transform(mockNodeInfo)
145+
_, err = a.transform(mockNodeInfo, &mesosAgent.ContainerTaskRels{})
145146
So(err, ShouldNotBeNil)
146147
})
147148

@@ -163,7 +164,7 @@ func TestTransform(t *testing.T) {
163164
rec.Set("datapoints", []interface{}{recNan})
164165

165166
a := AvroDatum{Record: rec, Topic: "some-topic"}
166-
pmm, err := a.transform(mockNodeInfo)
167+
pmm, err := a.transform(mockNodeInfo, &mesosAgent.ContainerTaskRels{})
167168
So(err, ShouldBeNil)
168169

169170
So(pmm.Datapoints[0].Name, ShouldEqual, "nan-name")
@@ -259,7 +260,7 @@ func TestHandleConnection(t *testing.T) {
259260
defer ln.Close()
260261
time.Sleep(1 * time.Second)
261262

262-
c, cc := New(mockCollectorConfig, mockNodeInfo)
263+
c, cc := New(mockCollectorConfig, mockNodeInfo, &mesosAgent.ContainerTaskRels{})
263264

264265
// This goroutine runs in the background waiting for a TCP connection
265266
// from the test below. Once the connection has been accepted,

collectors/framework/record.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"fmt"
2020
"time"
2121

22+
mesosAgent "github.com/dcos/dcos-metrics/collectors/mesos/agent"
2223
"github.com/dcos/dcos-metrics/producers"
2324
)
2425

@@ -38,7 +39,7 @@ type avroRecord []record
3839

3940
// avroRecord.extract() gets tags and datapoints from avro formatted data
4041
// and creates a MetricsMessage{}
41-
func (ar avroRecord) extract(pmm *producers.MetricsMessage) error {
42+
func (ar avroRecord) extract(pmm *producers.MetricsMessage, ctr *mesosAgent.ContainerTaskRels) error {
4243
var fieldType string
4344
if len(ar) > 0 {
4445
fieldType = ar[0].Name
@@ -58,6 +59,15 @@ func (ar avroRecord) extract(pmm *producers.MetricsMessage) error {
5859

5960
if tagName == "container_id" {
6061
pmm.Dimensions.ContainerID = tagValue
62+
63+
info := ctr.Get(tagValue)
64+
if info != nil {
65+
pmm.Dimensions.TaskID = info.ID
66+
pmm.Dimensions.TaskName = info.Name
67+
} else {
68+
fwColLog.Debugf("Container ID %s had no associated task", tagValue)
69+
}
70+
6171
} else if tagName == "framework_id" {
6272
pmm.Dimensions.FrameworkID = tagValue
6373
} else if tagName == "executor_id" {

collectors/framework/record_test.go

Lines changed: 58 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package framework
1919
import (
2020
"testing"
2121

22+
mesosAgent "github.com/dcos/dcos-metrics/collectors/mesos/agent"
2223
"github.com/dcos/dcos-metrics/producers"
2324
"github.com/dcos/dcos-metrics/schema/metrics_schema"
2425
"github.com/linkedin/goavro"
@@ -65,21 +66,51 @@ var (
6566
},
6667
},
6768
}
69+
avroRecordWithDimensions = avroRecord{
70+
record{
71+
Name: "dcos.metrics.Tag",
72+
Fields: []field{
73+
{Name: "k", Datum: "framework_id"},
74+
{Name: "v", Datum: "marathon"},
75+
},
76+
},
77+
record{
78+
Name: "dcos.metrics.Tag",
79+
Fields: []field{
80+
{Name: "k", Datum: "executor_id"},
81+
{Name: "v", Datum: "pierrepoint"},
82+
},
83+
},
84+
record{
85+
Name: "dcos.metrics.Tag",
86+
Fields: []field{
87+
{Name: "k", Datum: "container_id"},
88+
{Name: "v", Datum: "foo-container-id"},
89+
},
90+
},
91+
record{
92+
Name: "dcos.metrics.Tag",
93+
Fields: []field{
94+
{Name: "k", Datum: "some-label"},
95+
{Name: "v", Datum: "some-label-value"},
96+
},
97+
},
98+
}
6899
)
69100

70101
func TestExtract(t *testing.T) {
71102
Convey("When calling extract() on an avroRecord", t, func() {
72103
Convey("Should return an error if length of ar is 0", func() {
73104
ar := avroRecord{}
74-
err := ar.extract(&producers.MetricsMessage{})
105+
err := ar.extract(&producers.MetricsMessage{}, &mesosAgent.ContainerTaskRels{})
75106
So(err, ShouldNotBeNil)
76107
})
77108
})
78109

79110
Convey("When extracting a datapoint from an Avro record", t, func() {
80111
avroDatapoint := avroRecord{testDatapoint}
81112
pmmTest := producers.MetricsMessage{}
82-
err := avroDatapoint.extract(&pmmTest)
113+
err := avroDatapoint.extract(&pmmTest, &mesosAgent.ContainerTaskRels{})
83114

84115
Convey("Should extract the datapoint without errors", func() {
85116
So(err, ShouldBeNil)
@@ -97,33 +128,51 @@ func TestExtract(t *testing.T) {
97128
})
98129

99130
Convey("When extracting tags from an Avro record", t, func() {
100-
avroDatapoint := avroRecord{testTag}
131+
avroDatapoint := avroRecordWithDimensions
101132
pmmTest := producers.MetricsMessage{
102133
Dimensions: producers.Dimensions{
103134
Labels: make(map[string]string),
104135
},
105136
}
137+
testRels := mesosAgent.NewContainerTaskRels()
138+
testRels.Set("foo-container-id", &mesosAgent.TaskInfo{
139+
ID: "foo.1234567890",
140+
Name: "foo",
141+
})
106142

143+
err := avroDatapoint.extract(&pmmTest, testRels)
107144
Convey("Should extract the tag without errors", func() {
108-
err := avroDatapoint.extract(&pmmTest)
109-
value, ok := pmmTest.Dimensions.Labels["tag-name-field-test"]
110-
111145
So(err, ShouldBeNil)
146+
})
147+
148+
Convey("Should derive specific metadata from known tags", func() {
149+
So(pmmTest.Dimensions.FrameworkID, ShouldEqual, "marathon")
150+
So(pmmTest.Dimensions.ExecutorID, ShouldEqual, "pierrepoint")
151+
So(pmmTest.Dimensions.ContainerID, ShouldEqual, "foo-container-id")
152+
})
153+
154+
Convey("Should derive task ID and task name with the container ID", func() {
155+
So(pmmTest.Dimensions.TaskID, ShouldEqual, "foo.1234567890")
156+
So(pmmTest.Dimensions.TaskName, ShouldEqual, "foo")
157+
})
158+
159+
Convey("Should add other tags as labels", func() {
160+
label, ok := pmmTest.Dimensions.Labels["some-label"]
112161
So(ok, ShouldBeTrue)
113-
So(value, ShouldEqual, "tag-value-field-test")
162+
So(label, ShouldEqual, "some-label-value")
114163
})
115164
})
116165

117166
Convey("When analyzing the field types in a record", t, func() {
118167
Convey("Should return an error if the field type was empty", func() {
119168
ar := avroRecord{record{Name: ""}}
120-
err := ar.extract(&producers.MetricsMessage{})
169+
err := ar.extract(&producers.MetricsMessage{}, &mesosAgent.ContainerTaskRels{})
121170
So(err, ShouldNotBeNil)
122171
})
123172

124173
Convey("Should return an error for an unknown field type", func() {
125174
ar := avroRecord{record{Name: "not-dcos.not-metrics.not-Type"}}
126-
err := ar.extract(&producers.MetricsMessage{})
175+
err := ar.extract(&producers.MetricsMessage{}, &mesosAgent.ContainerTaskRels{})
127176
So(err, ShouldNotBeNil)
128177
})
129178
})

collectors/mesos/agent/agent.go

Lines changed: 60 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"net"
1919
"net/http"
2020
"strconv"
21+
"sync"
2122
"time"
2223

2324
"github.com/Sirupsen/logrus"
@@ -50,17 +51,68 @@ type Collector struct {
5051
// https://godoc.org/github.com/sirupsen/logrus#Entry
5152
log *logrus.Entry
5253

53-
metricsChan chan producers.MetricsMessage
54-
nodeInfo collectors.NodeInfo
55-
timestamp int64
54+
metricsChan chan producers.MetricsMessage
55+
nodeInfo collectors.NodeInfo
56+
timestamp int64
57+
ContainerTaskRels *ContainerTaskRels
58+
}
59+
60+
// ContainerTaskRels defines the relationship between containers and tasks.
61+
type ContainerTaskRels struct {
62+
sync.Mutex
63+
rels map[string]*TaskInfo
64+
}
65+
66+
// NewContainerTaskRels creates a new empty ContainerTaskRels
67+
func NewContainerTaskRels() *ContainerTaskRels {
68+
return &ContainerTaskRels{rels: make(map[string]*TaskInfo)}
69+
}
70+
71+
// Get is a utility method which handles the mutex lock and abstracts the inner
72+
// map in ContainerTaskRels away. If no task info is available for the supplied
73+
// containerID, returns nil.
74+
func (ctr *ContainerTaskRels) Get(containerID string) *TaskInfo {
75+
ctr.Lock()
76+
defer ctr.Unlock()
77+
return ctr.rels[containerID]
78+
}
79+
80+
// Set adds or updates an entry to ContainerTaskRels and, if necessary,
81+
// initiates the inner map. It is only currently used in tests.
82+
func (ctr *ContainerTaskRels) Set(containerID string, info *TaskInfo) {
83+
ctr.Lock()
84+
defer ctr.Unlock()
85+
ctr.rels[containerID] = info
86+
}
87+
88+
// update denormalizes the (deeply nested) /state map from the local mesos
89+
// agent to a list of tasks mapped to container IDs.
90+
func (ctr *ContainerTaskRels) update(as agentState) {
91+
rels := map[string]*TaskInfo{}
92+
for _, f := range as.Frameworks {
93+
for _, e := range f.Executors {
94+
for _, t := range e.Tasks {
95+
for _, s := range t.Statuses {
96+
rels[s.ContainerStatusInfo.ID.Value] = &TaskInfo{
97+
ID: t.ID,
98+
Name: t.Name,
99+
}
100+
}
101+
}
102+
}
103+
}
104+
ctr.Lock()
105+
ctr.rels = rels
106+
ctr.Unlock()
56107
}
57108

58109
// New creates a new instance of the Mesos agent collector (poller).
59-
func New(cfg Collector, nodeInfo collectors.NodeInfo) (Collector, chan producers.MetricsMessage) {
110+
func New(cfg Collector, nodeInfo collectors.NodeInfo, rels *ContainerTaskRels) (Collector, chan producers.MetricsMessage) {
60111
c := cfg
61112
c.log = logrus.WithFields(logrus.Fields{"collector": "mesos-agent"})
62113
c.nodeInfo = nodeInfo
63114
c.metricsChan = make(chan producers.MetricsMessage)
115+
c.ContainerTaskRels = rels
64116
return c, c.metricsChan
65117
}
66118

@@ -88,6 +140,9 @@ func (c *Collector) pollMesosAgent() {
88140
c.log.Errorf("Failed to get agent state from %s. Error: %s", host, err)
89141
}
90142

143+
c.log.Debug("Mapping containers to tasks")
144+
c.ContainerTaskRels.update(c.agentState)
145+
91146
c.log.Debugf("Fetching container metrics from host %s", host)
92147
if err := c.getContainerMetrics(); err != nil {
93148
c.log.Errorf("Failed to get container metrics from %s. Error: %s", host, err)
@@ -179,7 +234,7 @@ func getExecutorInfoByExecutorID(executorID string, executors []executorInfo) *e
179234
}
180235

181236
// getTaskInfoByContainerID returns the TaskInfo struct matching the given cID.
182-
func getTaskInfoByContainerID(containerID string, tasks []taskInfo) *taskInfo {
237+
func getTaskInfoByContainerID(containerID string, tasks []TaskInfo) *TaskInfo {
183238
for _, task := range tasks {
184239
if len(task.Statuses) > 0 && task.Statuses[0].ContainerStatusInfo.ID.Value == containerID {
185240
return &task

0 commit comments

Comments
 (0)