-
Notifications
You must be signed in to change notification settings - Fork 154
ROX-28530: use the new collector iservice for process information #14652
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Skipping CI for Draft Pull Request. |
Images are ready for the commit at 0bf1081. To use with deploy scripts, first |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #14652 +/- ##
==========================================
- Coverage 48.95% 48.94% -0.01%
==========================================
Files 2550 2551 +1
Lines 187238 187338 +100
==========================================
+ Hits 91662 91698 +36
- Misses 88327 88392 +65
+ Partials 7249 7248 -1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
/test all |
c4eaff1
to
627fe7b
Compare
/test all |
/retest |
func newService(queue chan *sensor.ProcessSignal) Service { | ||
return &serviceImpl{ | ||
queue: queue, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anti-pattern waring - this object writes to the queue that is provided from the outside. This makes it unclear who and when should be closing this channel.
Solution: the queue should be created in the constructor of this object and returned as read-only. This object should then be responsible for closing the channel.
type Component interface { | ||
common.SensorComponent | ||
|
||
GetReceiver() chan *sensor.ProcessSignal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's always make it read-only for external actors.
GetReceiver() chan *sensor.ProcessSignal | |
GetReceiver() <-chan *sensor.ProcessSignal |
common.SensorComponent | ||
|
||
processPipeline Pipeline | ||
indicators chan *message.ExpiringMessage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't see any writes to this channel here, so let's make it read only.
indicators chan *message.ExpiringMessage | |
indicators <-chan *message.ExpiringMessage |
cmp := &componentImpl{ | ||
processPipeline: pipeline, | ||
indicators: indicators, | ||
receiver: make(chan *sensor.ProcessSignal, maxBufferSize), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need the buffer here?
It looks like we are only reading from this channel, so I am wondering what is the purpose of this channel?
sensor/kubernetes/sensor/sensor.go
Outdated
|
||
var signalSrv signalService.Service | ||
if cfg.signalServiceAuthFuncOverride != nil && cfg.localSensor { | ||
signalSrv = signalService.NewService(signalCmp.GetReceiver(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, so you create a channel in signalCmp
only to pass it to another object so that it can write to it?
This is not a good idea, we must always make sure that:
- There is exactly one writer of the channel. That writer is responsible for creating the channel and closing the channel. Writer returns the channel to the outside always as read-only channel.
- There can be many readers of the channel and none of them should attempt to write to it or close it.
- Writer and readers must be easy to identify.
For more details check https://www.oreilly.com/library/view/concurrency-in-go/9781491941294/ch04.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I explained offline, my initial approach was to have a single channel owned by the component and have it read messages coming from collector from both the old and new services. Thanks for taking the time to explain why this would be a dangerous practice in Go, I've made it so now the services each own a channel and those are passed in to the component for handling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good. I shared a few thoughts, let me know what you think
sensor/common/collector/service.go
Outdated
} | ||
|
||
func NewService(opts ...Option) Service { | ||
return newService() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Options need to be passed here and handle in newService
. Otherwise WithAuthFuncOverride
will be ignored and some integration tests will fail.
type componentImpl struct { | ||
common.SensorComponent | ||
|
||
processPipeline Pipeline | ||
indicators <-chan *message.ExpiringMessage | ||
signalMessages <-chan *storage.ProcessSignal | ||
processMessages <-chan *sensor.ProcessSignal | ||
writer io.Writer | ||
|
||
stopper concurrency.Stopper | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to merge this new component with the Pipeline
? I feel like this new component only acts as a bridge between the service and the Pipeline
component.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The whole code for processing processes (haha) is messy and confusing. From what I can tell, a long long time ago in a galaxy far away, signals were (or were going to be) the generic way for stackrox to handle events, so there are all these abstractions for signals by themselves, but the only signal that ever got implemented was for processes. And the reason I mention this is that we have the same situation in collector, there are a bunch of abstract classes (essentially interfaces) for signals, but they were only ever implemented for processes as well. I'm not a huge fan of this approach of "writing the abstraction first", IMO it leads to over complicated code that can very well end up not being used or used just once, like in our case.
AFAICT, this is the only implementation of the Pipeline
interface in sensor/common/signal/component/pipeline.go
(and gopls agrees with me):
type Pipeline struct { |
So, at this point, I'm not 100% sure how I would handle this. My instinct tells me to move the entire code under sensor/common/processsignal/
into sensor/common/signal
and ditch the interface altogether, but that might be messy. Another possibility would be to move the service and component into sensor/common/processsignal/
and rename the directory to something like process
, also dropping the interface. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd go with the second one, move everything to something like sensor/common/process
(maybe processindicator), remove the interface, and probably have only the SensorComponent
as the handler for processes. So:
- collectorService: to handle the connection with collector.
- processIndicatorComponent: to handle the processIndicator enrichment.
return &storage.ProcessSignal{ | ||
Id: signal.Id, | ||
ContainerId: signal.ContainerId, | ||
Time: signal.CreationTime, | ||
Name: signal.Name, | ||
Args: signal.Args, | ||
ExecFilePath: signal.ExecFilePath, | ||
Pid: signal.Pid, | ||
Uid: signal.Uid, | ||
Gid: signal.Gid, | ||
Scraped: signal.Scraped, | ||
LineageInfo: lineage, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably out of the scope for this PR and maybe this entire effort but I would try to move away from using storage
protos in the API as a general rule.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, I would argue this falls under this issue that is outside of the scope of the current epic.
https://issues.redhat.com/browse/ROX-28532
We'll try to get that prioritized and implemented in the near future, I have to also do some cleanups in the near future and I can probably get this done then.
|
||
func newService() Service { | ||
return &serviceImpl{ | ||
queue: make(chan *sensor.ProcessSignal, maxBufferSize), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add metrics to this buffer. Also, consider using ScaleSizeOnNonDefault
and an environment variable to configure this max size.
sensor/common/signal/singleton.go
Outdated
queue: make(chan *v1.Signal, maxBufferSize), | ||
indicators: indicators, | ||
processPipeline: pipeline, | ||
queue: make(chan *storage.ProcessSignal, maxBufferSize), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not 100% sure why we had this queue: make(chan *v1.Signal, maxBufferSize)
before because I believe it was not used 🤷 but since now queue: make(chan *storage.ProcessSignal, maxBufferSize)
is used, I'd add metrics and make is configurable with ScaleSizeOnNonDefault
+ and env variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Asking for both this and the serivice_impl.go
comment, could you provide an example of the metrics you'd like added? I'm not 100% sure how to look for those in the sensor code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure! Here's how we do it for the BufferedStream
. Basically we track how many items were added, removed, and dropped from the buffer.
2c0619b
to
291f3a2
Compare
ffa406e
to
332ef39
Compare
- Gracefully stop services. - Remove unused indicators member. - Rename environment variable.
d461686
to
ceae0a4
Compare
@Molter73: The following test failed, say
Full PR test history. Your PR dashboard. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
c.processPipeline.Process(signal) | ||
} | ||
|
||
// TODO(ROX-3281) this is a workaround for these collector issues |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This ticket was closed years ago. I guess this was copied from signal/signal_service.go
. You should delete this comment and its duplicate.
return true | ||
} | ||
|
||
func sensorIntoStorageSignal(signal *sensor.ProcessSignal) *storage.ProcessSignal { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func sensorIntoStorageSignal(signal *sensor.ProcessSignal) *storage.ProcessSignal { | |
func convertSensorProcessSignalToStorageProcessSignal(signal *sensor.ProcessSignal) *storage.ProcessSignal { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That naming is way too verbose, I will not make this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Posting my detailed comments from reading the code.
I still plan to look at the tests with local-sensor.
msg, err := server.Recv() | ||
if err != nil { | ||
log.Error("error dequeueing collector message, event: ", err) | ||
return err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not placing this block at the beginning of default
?
case *sensor.MsgFromCollector_Register: | ||
log.Infof("got register: %+v", msg.GetRegister()) | ||
case *sensor.MsgFromCollector_Info: | ||
log.Infof("got network info: %+v", msg.GetInfo()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like debugging output. Let's remove the log statements or change them to debug level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed them to debug, a follow up PR will implement the actual logic that needs to go here.
metrics.CollectorChannelInc(msg) | ||
switch msg.GetMsg().(type) { | ||
case *sensor.MsgFromCollector_ProcessSignal: | ||
s.queue <- msg.GetProcessSignal() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may block, so we need another switch here:
s.queue <- msg.GetProcessSignal() | |
select { | |
case s.queue <- msg.GetProcessSignal(): | |
case case <-server.Context().Done(): | |
return nil | |
} |
select { | ||
case <-server.Context().Done(): | ||
return nil | ||
default: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The body of this default
block may look better in a separate function (provided so many nested selects).
sensor/common/metrics/metrics.go
Outdated
collectorChannelMessagesCount = prometheus.NewCounterVec(prometheus.CounterOpts{ | ||
Namespace: metrics.PrometheusNamespace, | ||
Subsystem: metrics.SensorSubsystem.String(), | ||
Name: "num_messages_received_collector", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check the naming convention for prometheus metrics. The counter should not use num
but rather total
as postfix.
} | ||
|
||
func (c *componentImpl) Notify(e common.SensorComponentEvent) { | ||
log.Info(common.LogSensorComponentEvent(e)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LogSensorComponentEvent
accepts a component name as a second parameter - let's use it as it may help in some exotic debugging sessions.
if c.writer != nil { | ||
if data, err := signal.MarshalVT(); err == nil { | ||
if _, err := c.writer.Write(data); err != nil { | ||
log.Warnf("Error writing msg: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make this a bit more informative
log.Warnf("Error writing msg: %v", err) | |
log.Warnf("Error writing ProcessSignal data: %v", err) |
also, we could return those errors from this func (instead of only logging) and make this code easier to test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to return the errors? It seems these are problems with the writer which is only used for testing AFAICT. Returning the error would prevent the process signal from properly being processed.
// TODO(ROX-3281) this is a workaround for these collector issues | ||
func isProcessSignalValid(signal *storage.ProcessSignal) bool { | ||
// Example: <NA> or sometimes a truncated variant | ||
if signal.GetExecFilePath() == "" || signal.GetExecFilePath()[0] == '<' { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although accessing the 0-th element should be safe here, it looks like a case for strings.HasPrefix
sensor/kubernetes/sensor/sensor.go
Outdated
processPipeline := processsignal.NewProcessPipeline(indicators, storeProvider.Entities(), processfilter.Singleton(), policyDetector) | ||
var processSignals signalService.Service | ||
var signalSrv signalService.Service | ||
if cfg.signalServiceAuthFuncOverride != nil && cfg.localSensor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this if
is duplicated in line 123 - can we maybe merge those two?
sensor/kubernetes/sensor/sensor.go
Outdated
} | ||
|
||
// Create Process Pipeline | ||
indicators := make(chan *message.ExpiringMessage, queue.ScaleSizeOnNonDefault(env.ProcessIndicatorBufferSize)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This channel is used only in line 131 - let's create it inside the NewProcessPipeline
|
||
func (c *componentImpl) Stop(_ error) { | ||
c.processPipeline.Shutdown() | ||
c.stopper.Client().Stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we wait for the component to stop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not 100% sure how to go about this one, I think I just copy pasted the code from the existing component. Could you elaborate a bit?
// Currently eat unhandled signals | ||
continue | ||
metrics.IncrementTotalProcessesReceivedCounter() | ||
s.queue <- signal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could block. It should be in a select
statement.
continue | ||
signal, err := unwrapSignal(signalStreamMsg) | ||
if err != nil { | ||
metrics.IncrementTotalProcessesDroppedCounter() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 I'm not sure about this metric. Normally these SomethingDroppedCounter
are used to record when we drop a message not because it's not valid but because the buffers are full.
I think this should be used when we are pushing messages to the queue:
select {
case s.queue <- signal:
metrics.IncrementTotalProcessesReceivedCounter()
default:
metrics.IncrementTotalProcessesDroppedCounter()
}
- Unify process metrics - Refactor Communicate method - Minor cleanups - Prevent some locks when writing to channels
Description
This PR enables sensor to handle process signals coming from collector through the new internal service. In order to achieve this:
In addition to the previously described changes, a couple functions for translating between signal types have been added in order for the new type to be translated to the type central is expecting. In order to prevent this translation we could update the internal service between sensor and central to also use this new type, moving the translation into central itself before dumping to the DB, but this will require additional effort for compatibility between sensor and central which has not been discussed at the moment. Should this change be needed, it will be addressed in a follow up PR.
Sibling PR stackrox/collector#2063
User-facing documentation
Testing and quality
Automated testing
How I validated my change
Run collector and sensor with the new changes and checked the process information is available in the risk tab.