20 from topologylistener
import flow_utils
21 from topologylistener
import message_utils
22 from topologylistener
import messageclasses
23 from topologylistener
import model
38 'output_port':
'port_out',
39 'input_port':
'port_in',
41 fields = model.grab_fields(channel_data, channel_to_fields)
61 fields = model.grab_fields(channel_data, {
62 'transit_vlan_id':
'vlan_transit'})
65 channel_data, **fields)
68 return super(AbstractSpeakerFlowInstallCommand, self).
as_tuple() + (
77 fields = model.grab_fields(
78 channel_data, {
'input_vlan_id':
'vlan_input'})
81 channel_data, **fields)
84 super(AbstractSpeakerFlowCommand, self).
__init__(
85 clazz=
'org.openkilda.messaging.command.flow.InstallIngressFlow',
89 return super(SpeakerFlowIngressCommand, self).
as_tuple() + (
95 super(AbstractSpeakerFlowCommand, self).
__init__(
96 clazz=
'org.openkilda.messaging.command.flow.InstallTransitFlow',
105 fields = model.grab_fields(
106 channel_data, {
'output_vlan_id':
'vlan_output'})
109 channel_data, **fields)
112 super(AbstractSpeakerFlowCommand, self).
__init__(
113 clazz=
'org.openkilda.messaging.command.flow.InstallEgressFlow',
117 return super(SpeakerFlowEgressCommand, self).
as_tuple() + (
126 fields = model.grab_fields(channel_data[
'criteria'], {
128 'in_port':
'port_in',
129 'out_port':
'port_out',
130 'in_vlan':
'vlan_input'})
133 channel_data, **fields)
138 fields[
'port_out'] =
None 140 super(AbstractSpeakerFlowCommand, self).
__init__(
141 clazz=
'org.openkilda.messaging.command.flow.RemoveFlow',
145 return super(SpeakerFlowRemoveCommand, self).
as_tuple() + (
150 'org.openkilda.messaging.command.flow.InstallIngressFlow':
151 SpeakerFlowIngressCommand,
152 'org.openkilda.messaging.command.flow.InstallTransitFlow':
153 SpeakerFlowTransitCommand,
154 'org.openkilda.messaging.command.flow.InstallEgressFlow':
155 SpeakerFlowEgressCommand,
156 'org.openkilda.messaging.command.flow.RemoveFlow': SpeakerFlowRemoveCommand}
160 correlation_id_counter = itertools.count()
163 super(TestFlow, self).
setUp()
165 share.env.reset_kafka_producer()
170 request = self.
load_data(
'flow-create-request.json')
171 flow_info_data = request[
'payload']
177 create_request = copy.deepcopy(request)
182 request[
'payload'][
'operation'] =
'UPDATE' 185 update_request = copy.deepcopy(request)
190 request[
'payload'][
'operation'] =
'DELETE' 191 delete_request = copy.deepcopy(request)
195 flow_pair = create_request[
'payload'][
'payload']
196 self.assertFalse(self.
is_flow_exist(flow_pair[
'forward'][
'flowid']))
197 self.assertTrue(share.feed_message(create_request))
202 expected_messages, create_request[
'correlation_id'])
205 flow_pair = update_request[
'payload'][
'payload']
206 self.assertTrue(self.
is_flow_exist(flow_pair[
'forward'][
'flowid']))
207 self.assertTrue(share.feed_message(update_request))
215 expected_commands, update_request[
'correlation_id'])
218 flow_pair = delete_request[
'payload'][
'payload']
219 flow_id = flow_pair[
'forward'][
'flowid']
222 self.assertTrue(share.feed_message(delete_request))
227 expect_commands, delete_request[
'correlation_id'])
230 flow_pair = request[
'payload'][
'payload']
231 flow_forward = flow_pair[
'forward']
233 flow_id = flow_forward[
'flowid']
238 self.assertEqual(expected_endpoints, actual_endpoints)
243 kafka_stream,
'correlation_id', correlation_id)
245 kafka_stream,
'clazz', message_utils.MT_COMMAND)
246 kafka_stream = self.
stream_map(kafka_stream,
lambda x: x[
'payload'])
249 for payload
in kafka_stream:
251 klass = clazz_to_command[payload[
'clazz']]
253 extra_commands.append(payload)
256 command = klass.of_kafka_channel(payload).as_tuple()
257 if command
in expected_commands:
258 expected_commands.discard(command)
260 extra_commands.append(payload)
263 message =
'Extra command messages have been produced' 265 for idx, payload
in enumerate(extra_commands):
266 print(
'#{}: {}'.
format(idx, pprint.pformat(payload)))
267 raise AssertionError(message)
269 if expected_commands:
270 message =
'Expected command messages have not been produced' 272 for idx, payload
in enumerate(sorted(expected_commands)):
273 print(
'#{}: {}'.
format(idx, pprint.pformat(payload)))
274 raise AssertionError(message)
283 for x
in messageclasses.features_status_transport_to_app_map}
284 features_request = share.feature_toggle_request(**features)
285 self.assertTrue(share.feed_message(share.command(features_request)))
293 'MATCH (:switch)-[f:flow]->(:switch)\n' 294 'WHERE f.flowid=$flow_id\n' 296 p = {
'flow_id': flow_id}
298 cursor = tx.run(q, p)
299 return (dict(x[
'f'])
for x
in cursor)
303 flow_pair = request[
'payload'][
'payload']
304 forward = flow_pair[
'forward']
305 reverse = flow_pair[
'reverse']
307 path_forward = forward[
'flowpath'][
'path']
308 path_reverse = reverse[
'flowpath'][
'path']
309 common_args_forward = {
310 'cookie': forward[
'cookie'],
311 'vlan_transit': forward[
'transit_vlan']
313 common_args_reverse = {
314 'cookie': reverse[
'cookie'],
315 'vlan_transit': reverse[
'transit_vlan']
321 dpid=forward[
'src_switch'],
322 port_in=forward[
'src_port'],
323 port_out=path_forward[0][
'port_no'],
324 vlan_input=forward[
'src_vlan'],
325 **common_args_forward).as_tuple(),
327 dpid=path_forward[1][
'switch_id'],
328 port_in=path_forward[1][
'port_no'],
329 port_out=path_forward[2][
'port_no'],
330 **common_args_forward).as_tuple(),
332 dpid=forward[
'dst_switch'],
333 port_in=path_forward[3][
'port_no'],
334 port_out=forward[
'dst_port'],
335 vlan_output=forward[
'dst_vlan'],
336 **common_args_forward).as_tuple(),
339 dpid=reverse[
'src_switch'],
340 port_in=reverse[
'src_port'],
341 port_out=path_reverse[0][
'port_no'],
342 vlan_input=reverse[
'src_vlan'],
343 **common_args_reverse).as_tuple(),
345 dpid=path_reverse[1][
'switch_id'],
346 port_in=path_reverse[1][
'port_no'],
347 port_out=path_reverse[2][
'port_no'],
348 **common_args_reverse).as_tuple(),
350 dpid=reverse[
'dst_switch'],
351 port_in=path_reverse[3][
'port_no'],
352 port_out=reverse[
'dst_port'],
353 vlan_output=reverse[
'dst_vlan'],
354 **common_args_reverse).as_tuple()}
358 flow_pairs = request[
'payload'][
'payload']
359 forward = flow_pairs[
'forward']
360 reverse = flow_pairs[
'reverse']
362 path_forward = forward[
'flowpath'][
'path']
363 path_reverse = reverse[
'flowpath'][
'path']
365 common_args_forward = {
'cookie': forward[
'cookie']}
366 common_args_reverse = {
'cookie': reverse[
'cookie']}
370 dpid=forward[
'src_switch'],
371 port_in=forward[
'src_port'],
372 port_out=path_forward[0][
'port_no'],
373 vlan_input=forward[
'src_vlan'],
374 **common_args_forward).as_tuple(),
376 dpid=path_forward[1][
'switch_id'],
377 port_in=path_forward[1][
'port_no'],
378 port_out=path_forward[2][
'port_no'],
379 vlan_input=forward[
'transit_vlan'],
380 **common_args_forward).as_tuple(),
382 dpid=forward[
'dst_switch'],
383 port_in=path_forward[3][
'port_no'],
384 port_out=forward[
'dst_port'],
385 vlan_input=forward[
'transit_vlan'],
386 **common_args_forward).as_tuple(),
389 dpid=reverse[
'src_switch'],
390 port_in=reverse[
'src_port'],
391 port_out=path_reverse[0][
'port_no'],
392 vlan_input=reverse[
'src_vlan'],
393 **common_args_reverse).as_tuple(),
395 dpid=path_reverse[1][
'switch_id'],
396 port_in=path_reverse[1][
'port_no'],
397 port_out=path_reverse[2][
'port_no'],
398 vlan_input=reverse[
'transit_vlan'],
399 **common_args_reverse).as_tuple(),
401 dpid=reverse[
'dst_switch'],
402 port_in=path_reverse[3][
'port_no'],
403 port_out=reverse[
'dst_port'],
404 vlan_input=reverse[
'transit_vlan'],
405 **common_args_reverse).as_tuple()}
409 flow_pair = request[
'payload'][
'payload']
410 for thread
in flow_pair[
'forward'], flow_pair[
'reverse']:
411 thread[
'cookie'] = mangle(thread[
'cookie'])
415 flow_pair = request[
'payload'][
'payload']
416 for thread
in flow_pair[
'forward'], flow_pair[
'reverse']:
417 thread[
'dst_port'] = mangle(thread[
'dst_port'])
427 flow_threads = flow_info_data[
'payload']
428 for thread
in flow_threads.values():
429 thread[
'cookie'] |= share.cookie_test_data_flag
433 flow_threads = flow_info_data[
'payload']
434 direction_to_flag = {
435 'forward': flow_utils.cookie_flag_forward,
436 'reverse': flow_utils.cookie_flag_reverse}
437 for direction
in flow_threads:
438 flag = direction_to_flag[direction]
439 flow_threads[direction][
'cookie'] |= flag
443 return (x
for x
in stream
if x.get(key) == value)
447 return (action(x)
for x
in stream)
452 json.loads(x.payload)
for x
in share.env.kafka_producer_backlog())
def __init__(self, fields)
def of_kafka_channel(cls, channel_data, extra)
def allow_all_features(self)
def stream_map(stream, action)
def of_kafka_channel(cls, channel_data, extra)
def of_kafka_channel(cls, channel_data, extra)
def make_correlation_id(self)
def flow_delete_expected_commands(request)
def open_neo4j_session(self)
def flow_create_expected_commands(request)
def load_data(self, name)
def flow_update(self, update_request, create_request)
def fix_direction_markers(flow_info_data)
def extract_flow_endpoints(flow)
def put_test_flow_marker(flow_info_data)
def validate_produced_commands(self, expected_commands, correlation_id)
def __init__(self, fields)
def kafka_backlog_stream()
def is_flow_exist(self, flow_id)
def __init__(self, fields)
def __init__(self, fields)
def validate_db_flow(self, request)
def mangle_dst_port(request, mangle)
def mangle_cookie(request, mangle)
def fetch_db_flow(tx, flow_id)
def of_kafka_channel(cls, channel_data, extra)
def flow_create(self, create_request)
def of_kafka_channel(cls, channel_data, extra)
def flow_delete(self, delete_request)
def stream_filter_by_key(stream, key, value)