-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathphy.cpp
120 lines (91 loc) · 2.89 KB
/
phy.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <mosquitto.h>
#include "phy.hpp"
void incoming_packet(struct mosquitto* mosq, void* obj, const struct mosquitto_message *message)
{
Phy* p = (Phy*) obj;
uint32_t payloadlen = message->payloadlen;
if (p->m_rx_queue->size() == 0)
{
TRACE_PRINT("Ignoring incoming packet w/size %d b/c firmware is not waiting for a packet", payloadlen);
return;
}
if (payloadlen <= 4)
{
TRACE_PRINT("Ignoring too-small packet w/size %d", payloadlen);
return;
}
// limit the message size to Ethernet maximum frame size of 1518
if (payloadlen > MAX_ETHERNET_SIZE)
{
TRACE_PRINT("Ignoring large packet w/size %d", payloadlen);
return;
}
// check if the message has our nick at the start, if it came from us
if (((uint32_t*)message->payload)[0] == p->m_nic)
{
TRACE_PRINT("Ignoring packet w/nic %d b/c it's our nic %d", ((uint32_t*)message->payload)[0], p->m_nic);
return;
}
// now, subtract the nic part that was added to the message
payloadlen -= 4;
char* payload = ((char*)message->payload) + 4;
fifo_job job = p->m_rx_queue->get();
TRACE_PRINT("RX: size:%p:%d start:%p",
job.size,
*job.size,
job.start);
*job.size = payloadlen;
memcpy(job.start, payload, payloadlen);
if (job.callback)
{
job.callback();
}
}
Phy::Phy(ThreadedQueue<fifo_job>* tx_queue, ThreadedQueue<fifo_job>* rx_queue, const char* host, int port, const char* vpc):
m_tx_queue(tx_queue),
m_rx_queue(rx_queue),
m_done(false),
m_vpc(vpc)
{
int fd = open("/dev/random", O_RDONLY);
read(fd, &m_nic, sizeof(m_nic));
close(fd);
m_tx_thread = std::thread(&Phy::handle_tx, this);
//m_rx_thread = std::thread(&Phy::handle_rx, this);
mosquitto_lib_init();
bool clean_session = true;
int keep_alive_seconds = 20;
m_mosq = mosquitto_new(NULL, clean_session, this);
mosquitto_message_callback_set(m_mosq, incoming_packet);
int rc = mosquitto_connect(m_mosq, host, port, keep_alive_seconds);
mosquitto_subscribe(m_mosq, NULL, vpc, 0);
m_rx_thread = std::thread([this]{ mosquitto_loop_forever(m_mosq, -1, 1); });
}
void Phy::handle_tx(void)
{
while(true)
{
// TODO
fifo_job job = m_tx_queue->get();
TRACE_PRINT("TX: size:%p:%d start:%p",
job.size,
*job.size,
job.start);
int payloadlen = *job.size + 4;
char* to_send = (char*)malloc(payloadlen);
((uint32_t*)to_send)[0] = m_nic;
memcpy(to_send+4, job.start, *job.size);
int rc = mosquitto_publish(m_mosq, NULL, m_vpc, payloadlen, to_send, 0, false);
TRACE_PRINT("TX: rc=0x%x content=%s", rc, job.start);
free(to_send);
if (job.callback)
{
job.callback();
}
}
}