7
7
from dash_api .route_type_pb2 import *
8
8
from dash_api .types_pb2 import *
9
9
10
- from dash_utils import DashDB
10
+ from dash_utils import dash_db
11
11
12
12
import time
13
13
import uuid
18
18
NUM_PORTS = 2
19
19
20
20
class TestDash (object ):
21
- def test_appliance (self , dvs ):
22
- dashobj = DashDB (dvs )
21
+ def test_appliance (self , dash_db ):
23
22
self .appliance_id = "100"
24
23
self .sip = "10.0.0.1"
25
24
self .vm_vni = "4321"
26
25
pb = Appliance ()
27
26
pb .sip .ipv4 = socket .htonl (int (ipaddress .ip_address (self .sip )))
28
27
pb .vm_vni = int (self .vm_vni )
29
- dashobj .create_appliance (self .appliance_id , {"pb" : pb .SerializeToString ()})
28
+ dash_db .create_appliance (self .appliance_id , {"pb" : pb .SerializeToString ()})
30
29
time .sleep (3 )
31
30
32
- direction_entries = dashobj .asic_direction_lookup_table .get_keys ()
31
+ direction_entries = dash_db .asic_direction_lookup_table .get_keys ()
33
32
assert direction_entries
34
- fvs = dashobj .asic_direction_lookup_table [direction_entries [0 ]]
33
+ fvs = dash_db .asic_direction_lookup_table [direction_entries [0 ]]
35
34
for fv in fvs .items ():
36
35
if fv [0 ] == "SAI_DIRECTION_LOOKUP_ENTRY_ATTR_ACTION" :
37
36
assert fv [1 ] == "SAI_DIRECTION_LOOKUP_ENTRY_ACTION_SET_OUTBOUND_DIRECTION"
38
- vip_entries = dashobj .asic_vip_table .get_keys ()
37
+ vip_entries = dash_db .asic_vip_table .get_keys ()
39
38
assert vip_entries
40
- fvs = dashobj .asic_vip_table [vip_entries [0 ]]
39
+ fvs = dash_db .asic_vip_table [vip_entries [0 ]]
41
40
for fv in fvs .items ():
42
41
if fv [0 ] == "SAI_VIP_ENTRY_ATTR_ACTION" :
43
42
assert fv [1 ] == "SAI_VIP_ENTRY_ACTION_ACCEPT"
44
43
45
- def test_vnet (self , dvs ):
46
- dashobj = DashDB (dvs )
44
+ def test_vnet (self , dash_db ):
47
45
self .vnet = "Vnet1"
48
46
self .vni = "45654"
49
47
self .guid = "559c6ce8-26ab-4193-b946-ccc6e8f930b2"
50
48
pb = Vnet ()
51
49
pb .vni = int (self .vni )
52
50
pb .guid .value = bytes .fromhex (uuid .UUID (self .guid ).hex )
53
- dashobj .create_vnet (self .vnet , {"pb" : pb .SerializeToString ()})
51
+ dash_db .create_vnet (self .vnet , {"pb" : pb .SerializeToString ()})
54
52
time .sleep (3 )
55
- vnets = dashobj .asic_dash_vnet_table .get_keys ()
53
+ vnets = dash_db .asic_dash_vnet_table .get_keys ()
56
54
assert vnets
57
55
self .vnet_oid = vnets [0 ]
58
- vnet_attr = dashobj .asic_dash_vnet_table [self .vnet_oid ]
56
+ vnet_attr = dash_db .asic_dash_vnet_table [self .vnet_oid ]
59
57
assert vnet_attr ["SAI_VNET_ATTR_VNI" ] == "45654"
60
58
61
- def test_eni (self , dvs ):
62
- dashobj = DashDB (dvs )
59
+ def test_eni (self , dash_db ):
63
60
self .vnet = "Vnet1"
64
61
self .mac_string = "F4939FEFC47E"
65
62
self .mac_address = "F4:93:9F:EF:C4:7E"
@@ -72,15 +69,15 @@ def test_eni(self, dvs):
72
69
pb .underlay_ip .ipv4 = socket .htonl (int (ipaddress .ip_address (self .underlay_ip )))
73
70
pb .admin_state = State .STATE_ENABLED
74
71
pb .vnet = self .vnet
75
- dashobj .create_eni (self .mac_string , {"pb" : pb .SerializeToString ()})
72
+ dash_db .create_eni (self .mac_string , {"pb" : pb .SerializeToString ()})
76
73
time .sleep (3 )
77
- vnets = dashobj .asic_dash_vnet_table .get_keys ()
74
+ vnets = dash_db .asic_dash_vnet_table .get_keys ()
78
75
assert vnets
79
76
self .vnet_oid = vnets [0 ]
80
- enis = dashobj .asic_eni_table .get_keys ()
77
+ enis = dash_db .asic_eni_table .get_keys ()
81
78
assert enis
82
79
self .eni_oid = enis [0 ];
83
- fvs = dashobj .asic_eni_table [enis [0 ]]
80
+ fvs = dash_db .asic_eni_table [enis [0 ]]
84
81
for fv in fvs .items ():
85
82
if fv [0 ] == "SAI_ENI_ATTR_VNET_ID" :
86
83
assert fv [1 ] == str (self .vnet_oid )
@@ -94,15 +91,14 @@ def test_eni(self, dvs):
94
91
assert fv [1 ] == "true"
95
92
96
93
time .sleep (3 )
97
- eni_addr_maps = dashobj .asic_eni_ether_addr_map_table .get_keys ()
94
+ eni_addr_maps = dash_db .asic_eni_ether_addr_map_table .get_keys ()
98
95
assert eni_addr_maps
99
- fvs = dashobj .asic_eni_ether_addr_map_table [eni_addr_maps [0 ]]
96
+ fvs = dash_db .asic_eni_ether_addr_map_table [eni_addr_maps [0 ]]
100
97
for fv in fvs .items ():
101
98
if fv [0 ] == "SAI_ENI_ETHER_ADDRESS_MAP_ENTRY_ATTR_ENI_ID" :
102
99
assert fv [1 ] == str (self .eni_oid )
103
100
104
- def test_vnet_map (self , dvs ):
105
- dashobj = DashDB (dvs )
101
+ def test_vnet_map (self , dash_db ):
106
102
self .vnet = "Vnet1"
107
103
self .ip1 = "10.1.1.1"
108
104
self .ip2 = "10.1.1.2"
@@ -114,34 +110,33 @@ def test_vnet_map(self, dvs):
114
110
route_action .action_name = "action1"
115
111
route_action .action_type = ACTION_TYPE_MAPROUTING
116
112
route_type_msg .items .append (route_action )
117
- dashobj .create_routing_type (self .routing_type , {"pb" : route_type_msg .SerializeToString ()})
113
+ dash_db .create_routing_type (self .routing_type , {"pb" : route_type_msg .SerializeToString ()})
118
114
pb = VnetMapping ()
119
115
pb .mac_address = bytes .fromhex (self .mac_address .replace (":" , "" ))
120
116
pb .action_type = RoutingType .ROUTING_TYPE_VNET_ENCAP
121
117
pb .underlay_ip .ipv4 = socket .htonl (int (ipaddress .ip_address (self .underlay_ip )))
122
118
123
- dashobj .create_vnet_map (self .vnet , self .ip1 , {"pb" : pb .SerializeToString ()})
124
- dashobj .create_vnet_map (self .vnet , self .ip2 , {"pb" : pb .SerializeToString ()})
119
+ dash_db .create_vnet_map (self .vnet , self .ip1 , {"pb" : pb .SerializeToString ()})
120
+ dash_db .create_vnet_map (self .vnet , self .ip2 , {"pb" : pb .SerializeToString ()})
125
121
time .sleep (3 )
126
122
127
- vnet_ca_to_pa_maps = dashobj .asic_dash_outbound_ca_to_pa_table .get_keys ()
123
+ vnet_ca_to_pa_maps = dash_db .asic_dash_outbound_ca_to_pa_table .get_keys ()
128
124
assert len (vnet_ca_to_pa_maps ) >= 2
129
- fvs = dashobj .asic_dash_outbound_ca_to_pa_table [vnet_ca_to_pa_maps [0 ]]
125
+ fvs = dash_db .asic_dash_outbound_ca_to_pa_table [vnet_ca_to_pa_maps [0 ]]
130
126
for fv in fvs .items ():
131
127
if fv [0 ] == "SAI_OUTBOUND_CA_TO_PA_ENTRY_ATTR_UNDERLAY_DIP" :
132
128
assert fv [1 ] == "101.1.2.3"
133
129
if fv [0 ] == "SAI_OUTBOUND_CA_TO_PA_ENTRY_ATTR_OVERLAY_DMAC" :
134
130
assert fv [1 ] == "F4:93:9F:EF:C4:7E"
135
131
136
- vnet_pa_validation_maps = dashobj .asic_pa_validation_table .get_keys ()
132
+ vnet_pa_validation_maps = dash_db .asic_pa_validation_table .get_keys ()
137
133
assert vnet_pa_validation_maps
138
- fvs = dashobj .asic_pa_validation_table [vnet_pa_validation_maps [0 ]]
134
+ fvs = dash_db .asic_pa_validation_table [vnet_pa_validation_maps [0 ]]
139
135
for fv in fvs .items ():
140
136
if fv [0 ] == "SAI_PA_VALIDATION_ENTRY_ATTR_ACTION" :
141
137
assert fv [1 ] == "SAI_PA_VALIDATION_ENTRY_ACTION_PERMIT"
142
138
143
- def test_outbound_routing (self , dvs ):
144
- dashobj = DashDB (dvs )
139
+ def test_outbound_routing (self , dash_db ):
145
140
self .vnet = "Vnet1"
146
141
self .mac_string = "F4939FEFC47E"
147
142
self .ip = "10.1.0.0/24"
@@ -151,21 +146,20 @@ def test_outbound_routing(self, dvs):
151
146
pb .action_type = RoutingType .ROUTING_TYPE_VNET_DIRECT
152
147
pb .vnet_direct .vnet = self .vnet
153
148
pb .vnet_direct .overlay_ip .ipv4 = socket .htonl (int (ipaddress .ip_address (self .overlay_ip )))
154
- dashobj .create_outbound_routing (self .mac_string , self .ip , {"pb" : pb .SerializeToString ()})
149
+ dash_db .create_outbound_routing (self .mac_string , self .ip , {"pb" : pb .SerializeToString ()})
155
150
time .sleep (3 )
156
151
157
- outbound_routing_entries = dashobj .asic_outbound_routing_table .get_keys ()
152
+ outbound_routing_entries = dash_db .asic_outbound_routing_table .get_keys ()
158
153
assert outbound_routing_entries
159
- fvs = dashobj .asic_outbound_routing_table [outbound_routing_entries [0 ]]
154
+ fvs = dash_db .asic_outbound_routing_table [outbound_routing_entries [0 ]]
160
155
for fv in fvs .items ():
161
156
if fv [0 ] == "SAI_OUTBOUND_ROUTING_ENTRY_ATTR_ACTION" :
162
157
assert fv [1 ] == "SAI_OUTBOUND_ROUTING_ENTRY_ACTION_ROUTE_VNET_DIRECT"
163
158
if fv [0 ] == "SAI_OUTBOUND_ROUTING_ENTRY_ATTR_OVERLAY_IP" :
164
159
assert fv [1 ] == "10.0.0.6"
165
160
assert "SAI_OUTBOUND_ROUTING_ENTRY_ATTR_DST_VNET_ID" in fvs
166
161
167
- def test_inbound_routing (self , dvs ):
168
- dashobj = DashDB (dvs )
162
+ def test_inbound_routing (self , dash_db ):
169
163
self .mac_string = "F4939FEFC47E"
170
164
self .vnet = "Vnet1"
171
165
self .vni = "3251"
@@ -180,30 +174,29 @@ def test_inbound_routing(self, dvs):
180
174
pb .protocol = int (self .protocol )
181
175
pb .vnet = self .vnet
182
176
183
- dashobj .create_inbound_routing (self .mac_string , self .vni , self .ip , {"pb" : pb .SerializeToString ()})
177
+ dash_db .create_inbound_routing (self .mac_string , self .vni , self .ip , {"pb" : pb .SerializeToString ()})
184
178
time .sleep (3 )
185
179
186
- inbound_routing_entries = dashobj .asic_inbound_routing_rule_table .get_keys ()
180
+ inbound_routing_entries = dash_db .asic_inbound_routing_rule_table .get_keys ()
187
181
assert inbound_routing_entries
188
- fvs = dashobj .asic_inbound_routing_rule_table [inbound_routing_entries [0 ]]
182
+ fvs = dash_db .asic_inbound_routing_rule_table [inbound_routing_entries [0 ]]
189
183
for fv in fvs .items ():
190
184
if fv [0 ] == "SAI_INBOUND_ROUTING_ENTRY_ATTR_ACTION" :
191
185
assert fv [1 ] == "SAI_INBOUND_ROUTING_ENTRY_ACTION_VXLAN_DECAP_PA_VALIDATE"
192
186
193
- def test_cleanup (self , dvs ):
194
- dashobj = DashDB (dvs )
187
+ def test_cleanup (self , dash_db ):
195
188
self .vnet = "Vnet1"
196
189
self .mac_string = "F4939FEFC47E"
197
190
self .vni = "3251"
198
191
self .sip = "10.1.1.1"
199
192
self .dip = "10.1.0.0/24"
200
193
self .appliance_id = "100"
201
- dashobj .remove_inbound_routing (self .mac_string , self .vni , self .sip )
202
- dashobj .remove_outbound_routing (self .mac_string , self .dip )
203
- dashobj .remove_eni (self .mac_string )
204
- dashobj .remove_vnet_map (self .vnet , self .sip )
205
- dashobj .remove_vnet (self .vnet )
206
- dashobj .remove_appliance (self .appliance_id )
194
+ dash_db .remove_inbound_routing (self .mac_string , self .vni , self .sip )
195
+ dash_db .remove_outbound_routing (self .mac_string , self .dip )
196
+ dash_db .remove_eni (self .mac_string )
197
+ dash_db .remove_vnet_map (self .vnet , self .sip )
198
+ dash_db .remove_vnet (self .vnet )
199
+ dash_db .remove_appliance (self .appliance_id )
207
200
208
201
# Add Dummy always-pass test at end as workaroud
209
202
# for issue when Flaky fail on final test it invokes module tear-down
0 commit comments