Skip to content

Commit 82d9680

Browse files
committed
Integrated Codel into DRR
1 parent c3235e2 commit 82d9680

File tree

4 files changed

+135
-118
lines changed

4 files changed

+135
-118
lines changed

bessctl/conf/testing/module_tests/drr.py

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@
44
from port import *
55

66
#ensures that given infinite input that the module does not crash.
7-
def crash_test():
8-
return [DRR(), 1, 1]
7+
def crash_test(queue_type, queue_dict):
8+
if queue_type == 1:
9+
return [DRR(type=queue_type, codel=queue_dict), 1, 1]
10+
else:
11+
return [DRR(type=queue_type, ring=queue_dict), 1, 1]
912

10-
# tests to make that inividual packets gets through the module
11-
def basic_output_test():
13+
# tests to make that individual packets gets through the module
14+
def basic_output_test(queue_type, queue_dict):
1215

1316
# produces the number of duplicate packets specified by num_pkts and uses the provided
1417
# the specifications in the packet along with dummy ports.
@@ -19,16 +22,28 @@ def gen_packet_list(protocol, input_ip, output_ip, num_pkts):
1922
packet_list.append({'input_port': 0, 'input_packet': cur_pkt,
2023
'output_port': 0, "output_packet": cur_pkt})
2124
return packet_list
22-
23-
single_basic = DRR(num_flows=2, max_flow_queue_size= 100)
25+
26+
if queue_type== 1:
27+
single_basic = DRR(num_flows=2, max_flow_queue_size= 100, type=queue_type,
28+
codel=queue_dict);
29+
else:
30+
single_basic = DRR(num_flows=2, max_flow_queue_size= 100, type=queue_type,
31+
ring=queue_dict);
32+
2433
monitor_task(single_basic, 0)
2534

2635
out = []
2736
single_packet= gen_packet_list(scapy.TCP, '22.22.22.22', '22.22.22.22', 1)
2837
out.append([single_basic, # test this module
2938
1, 1, # it has one input port and one output port
3039
single_packet])
31-
batch_basic = DRR(num_flows=4, max_flow_queue_size= 100)
40+
if queue_type== 1:
41+
batch_basic = DRR(num_flows=4, max_flow_queue_size= 100, type=queue_type,
42+
codel=queue_dict);
43+
else:
44+
batch_basic = DRR(num_flows=4, max_flow_queue_size= 100, type=queue_type,
45+
ring=queue_dict);
46+
3247
monitor_task(batch_basic, 0)
3348
packet_list = gen_packet_list(scapy.TCP, '22.22.22.1', '22.22.22.1', 2)
3449
packet_list += gen_packet_list(scapy.TCP, '22.22.11.1', '22.22.11.1', 2)
@@ -44,7 +59,7 @@ def fairness_test():
4459
# Takes the number of flows n, the quantum to give drr, the list packet rates for each flow
4560
# and the packet rate for the module. runs this setup for five seconds and tests that
4661
# throughput for each flow had a jaine fairness of atleast .95.
47-
def fairness_n_flow_test(n, quantum, rates, module_rate):
62+
def fairness_n_flow_test(n, quantum, rates, module_rate, queue_type, queue_dict):
4863
err = bess.reset_all()
4964

5065
packets = []
@@ -65,7 +80,11 @@ def fairness_n_flow_test(n, quantum, rates, module_rate):
6580

6681
me_out = Measure()
6782
snk = Sink()
68-
q = DRR(num_flows= n+1, quantum=quantum)
83+
if queue_type == 1:
84+
q = DRR(num_flows= n+1, quantum=quantum, type=queue_type, codel=queue_dict)
85+
else:
86+
q = DRR(num_flows= n+1, quantum=quantum, type=queue_type, ring=queue_dict)
87+
6988
me_in -> q -> me_out -> exm
7089

7190
measure_out = []
@@ -96,12 +115,16 @@ def fairness_n_flow_test(n, quantum, rates, module_rate):
96115
else:
97116
fair = f(me_out)/square_sum
98117
assert abs(.99 - fair) <=.05
99-
100-
fairness_n_flow_test(2, 1000, [80000, 20000], 30000)
101-
fairness_n_flow_test(5, 1000, [110000, 200000, 70000, 60000, 40000], 150000)
118+
llqueue_dict = {} # all default values
119+
codel_dict = {} # all default values
120+
fairness_n_flow_test(2, 1000, [80000, 20000], 30000, 0, llqueue_dict)
121+
fairness_n_flow_test(2, 1000, [80000, 20000], 30000, 1, codel_dict)
122+
fairness_n_flow_test(5, 1000, [110000, 200000, 70000, 60000, 40000], 150000, 0, llqueue_dict)
123+
fairness_n_flow_test(5, 1000, [110000, 200000, 70000, 60000, 40000], 150000, 1, codel_dict)
102124

103125
ten_flows = [210000, 120000, 130000, 160000, 100000, 105000, 90000, 70000, 60000, 40000]
104-
fairness_n_flow_test(10, 1000, ten_flows , 300000)
126+
fairness_n_flow_test(10, 1000, ten_flows , 300000, 0, llqueue_dict)
127+
fairness_n_flow_test(10, 1000, ten_flows , 300000, 1, codel_dict)
105128

106129
# hund_flows= []
107130
# cur = 200000
@@ -110,6 +133,10 @@ def fairness_n_flow_test(n, quantum, rates, module_rate):
110133
# cur -= 1600
111134
# fairness_n_flow_test(100, 1000, hund_flows, 300000)
112135

113-
OUTPUT_TEST_INPUTS += basic_output_test()
136+
llqueue_dict = {} # all default values llqueue queue type: 0
137+
codel_dict = {} # all default values codel queue type: 1
138+
OUTPUT_TEST_INPUTS += basic_output_test(0, llqueue_dict)
139+
OUTPUT_TEST_INPUTS += basic_output_test(1, codel_dict)
114140
CUSTOM_TEST_FUNCTIONS.append(fairness_test)
115-
CRASH_TEST_INPUTS.append(crash_test())
141+
CRASH_TEST_INPUTS.append(crash_test(0, llqueue_dict))
142+
CRASH_TEST_INPUTS.append(crash_test(1, codel_dict))

core/modules/drr.cc

Lines changed: 63 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,11 @@
88
#include "../utils/ether.h"
99
#include "../utils/ip.h"
1010
#include "../utils/udp.h"
11+
#include "../utils/common.h"
12+
#include "../utils/codel.h"
13+
14+
using bess::utils::Codel;
15+
using bess::pb::DRRArg;
1116

1217
uint32_t RoundToPowerTwo(uint32_t v) {
1318
v--;
@@ -31,7 +36,8 @@ DRR::DRR()
3136
max_queue_size_(kFlowQueueMax),
3237
max_number_flows_(kDefaultNumFlows),
3338
flow_ring_(nullptr),
34-
current_flow_(nullptr) {}
39+
current_flow_(nullptr),
40+
generator_(NULL) {}
3541

3642
DRR::~DRR() {
3743
for (auto it = flows_.begin(); it != flows_.end();) {
@@ -41,7 +47,7 @@ DRR::~DRR() {
4147
std::free(flow_ring_);
4248
}
4349

44-
CommandResponse DRR::Init(const bess::pb::DRRArg& arg) {
50+
CommandResponse DRR::Init(const DRRArg& arg) {
4551
CommandResponse err;
4652
task_id_t tid;
4753

@@ -70,11 +76,37 @@ CommandResponse DRR::Init(const bess::pb::DRRArg& arg) {
7076
}
7177

7278
int err_num = 0;
73-
flow_ring_ = AddQueue(max_number_flows_, &err_num);
79+
flow_ring_ = new LockLessQueue<Flow*>(max_number_flows_, true, true);
7480
if (err_num != 0) {
7581
return CommandFailure(-err_num);
7682
}
7783

84+
/* get flow queue generator */
85+
if (arg.type() == DRRArg::CODEL) {
86+
const DRRArg::Codel& codel = arg.codel();
87+
uint64_t target;
88+
uint64_t window;
89+
if (!(target = codel.target())) {
90+
target = Codel<bess::Packet*>::kDefaultTarget;
91+
}
92+
93+
if (!(window = codel.window())) {
94+
window = Codel<bess::Packet*>::kDefaultWindow;
95+
}
96+
97+
generator_ =
98+
Codel<bess::Packet*>::Factory(bess::Packet::Free, max_queue_size_, target, window);
99+
} else {
100+
const DRRArg::Llring& ring = arg.ring();
101+
size_t size;
102+
if (!(size = ring.size())) {
103+
size = LockLessQueue<bess::Packet*>::kDefaultRingSize;
104+
}
105+
106+
generator_ = LockLessQueue<bess::Packet*>::Factory(
107+
size, !ring.multiple_producer(), !ring.multiple_consumer());
108+
}
109+
78110
return CommandSuccess();
79111
}
80112

@@ -101,7 +133,7 @@ void DRR::ProcessBatch(bess::PacketBatch* batch) {
101133
// if the Flow doesn't exist create one
102134
// and add the packet to the new Flow
103135
if (it == nullptr) {
104-
if (llring_full(flow_ring_)) {
136+
if (flow_ring_->Full()) {
105137
bess::Packet::Free(pkt);
106138
} else {
107139
AddNewFlow(pkt, id, &err);
@@ -140,7 +172,7 @@ struct task_result DRR::RunTask(void*) {
140172
uint32_t DRR::GetNextBatch(bess::PacketBatch* batch, int* err) {
141173
Flow* f;
142174
uint32_t total_bytes = 0;
143-
uint32_t count = llring_count(flow_ring_);
175+
uint32_t count = flow_ring_->Size();
144176
if (current_flow_) {
145177
count++;
146178
}
@@ -155,33 +187,30 @@ uint32_t DRR::GetNextBatch(bess::PacketBatch* batch, int* err) {
155187
if (batch_size == batch->cnt()) {
156188
break;
157189
} else {
158-
count = llring_count(flow_ring_);
190+
count = flow_ring_->Size();
159191
batch_size = batch->cnt();
160192
}
161193
}
162194
count--;
163195

164196
f = GetNextFlow(err);
165-
if (*err != 0) {
197+
if (*err) {
166198
return total_bytes;
167199
} else if (f == nullptr) {
168200
continue;
169201
}
170202

171-
uint32_t bytes = GetNextPackets(batch, f, err);
203+
uint32_t bytes = GetNextPackets(batch, f);
172204
total_bytes += bytes;
173-
if (*err != 0) {
174-
return total_bytes;
175-
}
176205

177-
if (llring_empty(f->queue) && !f->next_packet) {
206+
if (f->queue->Empty() && !f->next_packet) {
178207
f->deficit = 0;
179208
}
180209

181210
// if the flow doesn't have any more packets to give, reenqueue it
182211
if (!f->next_packet || f->next_packet->total_len() > f->deficit) {
183-
*err = llring_enqueue(flow_ring_, f);
184-
if (*err != 0) {
212+
*err = flow_ring_->Push(f);
213+
if (*err) {
185214
return total_bytes;
186215
}
187216
} else {
@@ -198,18 +227,18 @@ DRR::Flow* DRR::GetNextFlow(int* err) {
198227
double now = get_epoch_time();
199228

200229
if (!current_flow_) {
201-
*err = llring_dequeue(flow_ring_, reinterpret_cast<void**>(&f));
202-
if (*err < 0) {
230+
*err = flow_ring_->Pop(f);
231+
if (*err) {
203232
return nullptr;
204233
}
205234

206-
if (llring_empty(f->queue) && !f->next_packet) {
235+
if (f->queue->Empty() && !f->next_packet) {
207236
// if the flow expired, remove it
208237
if (now - f->timer > kTtl) {
209238
RemoveFlow(f);
210239
} else {
211-
*err = llring_enqueue(flow_ring_, f);
212-
if (*err < 0) {
240+
*err = flow_ring_->Push(f);
241+
if (*err) {
213242
return nullptr;
214243
}
215244
}
@@ -224,15 +253,15 @@ DRR::Flow* DRR::GetNextFlow(int* err) {
224253
return f;
225254
}
226255

227-
uint32_t DRR::GetNextPackets(bess::PacketBatch* batch, Flow* f, int* err) {
256+
uint32_t DRR::GetNextPackets(bess::PacketBatch* batch, Flow* f) {
228257
uint32_t total_bytes = 0;
229258
bess::Packet* pkt;
230259

231-
while (!batch->full() && (!llring_empty(f->queue) || f->next_packet)) {
260+
while (!batch->full() && (!f->queue->Empty() || f->next_packet)) {
232261
// makes sure there isn't already a packet at the front
233262
if (!f->next_packet) {
234-
*err = llring_dequeue(f->queue, reinterpret_cast<void**>(&pkt));
235-
if (*err < 0) {
263+
int err = f->queue->Pop(pkt);
264+
if (err) {
236265
return total_bytes;
237266
}
238267
} else {
@@ -274,21 +303,21 @@ void DRR::AddNewFlow(bess::Packet* pkt, FlowId id, int* err) {
274303
Flow* f = new Flow(id);
275304

276305
// TODO(joshua) do proper error checking
277-
f->queue = AddQueue(static_cast<int>(kFlowQueueSize), err);
306+
f->queue = generator_();
278307

279-
if (*err != 0) {
308+
if (*err) {
280309
return;
281310
}
282311

283312
flows_.Insert(id, f);
284313

285314
Enqueue(f, pkt, err);
286-
if (*err != 0) {
315+
if (*err) {
287316
return;
288317
}
289318

290319
// puts flow in round robin
291-
*err = llring_enqueue(flow_ring_, f);
320+
*err = flow_ring_->Push(f);
292321
}
293322

294323
void DRR::RemoveFlow(Flow* f) {
@@ -299,77 +328,34 @@ void DRR::RemoveFlow(Flow* f) {
299328
delete f;
300329
}
301330

302-
llring* DRR::AddQueue(uint32_t slots, int* err) {
303-
int bytes = llring_bytes_with_slots(slots);
304-
int ret;
305-
306-
llring* queue = static_cast<llring*>(aligned_alloc(alignof(llring), bytes));
307-
if (!queue) {
308-
*err = -ENOMEM;
309-
return nullptr;
310-
}
311-
312-
ret = llring_init(queue, slots, 1, 1);
313-
if (ret) {
314-
std::free(queue);
315-
*err = -EINVAL;
316-
return nullptr;
317-
}
318-
return queue;
319-
}
320331

321332
void DRR::Enqueue(Flow* f, bess::Packet* newpkt, int* err) {
322333
// if the queue is full. drop the packet.
323-
if (llring_count(f->queue) >= max_queue_size_) {
334+
if (f->queue->Size() >= max_queue_size_) {
324335
bess::Packet::Free(newpkt);
325336
return;
326337
}
327338

328339
// creates a new queue if there is not enough space for the new packet
329340
// in the old queue
330-
if (llring_full(f->queue)) {
341+
if (f->queue->Full()) {
331342
uint32_t slots =
332-
RoundToPowerTwo(llring_count(f->queue) * kQueueGrowthFactor);
333-
f->queue = ResizeQueue(f->queue, slots, err);
334-
if (*err != 0) {
343+
RoundToPowerTwo(f->queue->Size() * kQueueGrowthFactor);
344+
*err = f->queue->Resize(slots);
345+
if (*err) {
335346
bess::Packet::Free(newpkt);
336347
return;
337348
}
338349
}
339350

340-
*err = llring_enqueue(f->queue, reinterpret_cast<void*>(newpkt));
341-
if (*err == 0) {
351+
*err = f->queue->Push(newpkt);
352+
if (!*err) {
342353
f->timer = get_epoch_time();
343354
} else {
344355
bess::Packet::Free(newpkt);
345356
}
346357
}
347358

348-
llring* DRR::ResizeQueue(llring* old_queue, uint32_t new_size, int* err) {
349-
llring* new_queue = AddQueue(new_size, err);
350-
if (*err != 0) {
351-
return nullptr;
352-
}
353-
354-
// migrates packets from the old queue
355-
if (old_queue) {
356-
bess::Packet* pkt;
357-
358-
while (llring_dequeue(old_queue, reinterpret_cast<void**>(&pkt)) == 0) {
359-
*err = llring_enqueue(new_queue, pkt);
360-
if (*err == -LLRING_ERR_NOBUF) {
361-
bess::Packet::Free(pkt);
362-
*err = 0;
363-
} else if (*err != 0) {
364-
std::free(new_queue);
365-
return nullptr;
366-
}
367-
}
368-
369-
std::free(old_queue);
370-
}
371-
return new_queue;
372-
}
373359

374360
CommandResponse DRR::SetQuantumSize(uint32_t size) {
375361
if (size == 0) {

0 commit comments

Comments
 (0)