forked from netboxlabs/pktvisor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathNetStreamHandler.h
136 lines (112 loc) · 4.63 KB
/
NetStreamHandler.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
#pragma once
#include "AbstractMetricsManager.h"
#include "MockInputStream.h"
#include "PcapInputStream.h"
#include "StreamHandler.h"
#include <Corrade/Utility/Debug.h>
#include <string>
namespace visor::handler::net {
using namespace visor::input::pcap;
using namespace visor::input::mock;
class NetworkMetricsBucket final : public visor::AbstractMetricsBucket
{
protected:
mutable std::shared_mutex _mutex;
Cardinality _srcIPCard;
Cardinality _dstIPCard;
TopN<std::string> _topGeoLoc;
TopN<std::string> _topASN;
TopN<uint32_t> _topIPv4;
TopN<std::string> _topIPv6;
// total numPackets is tracked in base class num_events
struct counters {
Counter UDP;
Counter TCP;
Counter OtherL4;
Counter IPv4;
Counter IPv6;
Counter total_in;
Counter total_out;
counters()
: UDP("packets", {"udp"}, "Count of UDP packets")
, TCP("packets", {"tcp"}, "Count of TCP packets")
, OtherL4("packets", {"other_l4"}, "Count of packets which are not UDP or TCP")
, IPv4("packets", {"ipv4"}, "Count of IPv4 packets")
, IPv6("packets", {"ipv6"}, "Count of IPv6 packets")
, total_in("packets", {"in"}, "Count of total ingress packets")
, total_out("packets", {"out"}, "Count of total egress packets")
{
}
};
counters _counters;
Rate _rate_in;
Rate _rate_out;
public:
NetworkMetricsBucket()
: _srcIPCard("packets", {"cardinality", "src_ips_in"}, "Source IP cardinality")
, _dstIPCard("packets", {"cardinality", "dst_ips_out"}, "Destination IP cardinality")
, _topGeoLoc("packets", {"top_geoLoc"}, "Top GeoIP locations")
, _topASN("packets", {"top_ASN"}, "Top ASNs by IP")
, _topIPv4("packets", {"top_ipv4"}, "Top IPv4 IP addresses")
, _topIPv6("packets", {"top_ipv6"}, "Top IPv6 IP addresses")
, _rate_in("packets", {"rates", "pps_in"}, "Rate of ingress in packets per second")
, _rate_out("packets", {"rates", "pps_out"}, "Rate of egress in packets per second")
{
set_event_rate_info("packets", {"rates", "pps_total"}, "Rate of all packets (combined ingress and egress) in packets per second");
set_num_events_info("packets", {"total"}, "Total packets processed");
set_num_sample_info("packets", {"deep_samples"}, "Total packets that were sampled for deep inspection");
}
// get a copy of the counters
counters counters() const
{
std::shared_lock lock(_mutex);
return _counters;
}
// visor::AbstractMetricsBucket
void specialized_merge(const AbstractMetricsBucket &other) override;
void to_json(json &j) const override;
void to_prometheus(std::stringstream &out, Metric::LabelMap add_labels = {}) const override;
// must be thread safe as it is called from time window maintenance thread
void on_set_read_only() override
{
// stop rate collection
_rate_in.cancel();
_rate_out.cancel();
}
void process_packet(bool deep, pcpp::Packet &payload, PacketDirection dir, pcpp::ProtocolType l3, pcpp::ProtocolType l4);
};
class NetworkMetricsManager final : public visor::AbstractMetricsManager<NetworkMetricsBucket>
{
public:
NetworkMetricsManager(const Configurable *window_config)
: visor::AbstractMetricsManager<NetworkMetricsBucket>(window_config)
{
}
void process_packet(pcpp::Packet &payload, PacketDirection dir, pcpp::ProtocolType l3, pcpp::ProtocolType l4, timespec stamp);
};
class NetStreamHandler final : public visor::StreamMetricsHandler<NetworkMetricsManager>
{
// the input stream sources we support (only one will be in use at a time)
PcapInputStream *_pcap_stream{nullptr};
MockInputStream *_mock_stream{nullptr};
sigslot::connection _pkt_connection;
sigslot::connection _start_tstamp_connection;
sigslot::connection _end_tstamp_connection;
void process_packet_cb(pcpp::Packet &payload, PacketDirection dir, pcpp::ProtocolType l3, pcpp::ProtocolType l4, timespec stamp);
void set_start_tstamp(timespec stamp);
void set_end_tstamp(timespec stamp);
public:
NetStreamHandler(const std::string &name, InputStream *stream, const Configurable *window_config);
~NetStreamHandler() override;
// visor::AbstractModule
std::string schema_key() const override
{
return "packets";
}
void start() override;
void stop() override;
};
}