Open Kilda Java Documentation
test_flow.py
Go to the documentation of this file.
1 # Copyright 2018 Telstra Open Source
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 #
15 import copy
16 import itertools
17 import json
18 import pprint
19 
20 from topologylistener import flow_utils
21 from topologylistener import message_utils
22 from topologylistener import messageclasses
23 from topologylistener import model
24 from topologylistener.tests import share
25 
26 
28  clazz = model.Default(None)
29  dpid = model.Default(None)
30  cookie = model.Default(None)
31  port_in = model.Default(None)
32  port_out = model.Default(None)
33 
34  @classmethod
35  def of_kafka_channel(cls, channel_data, **extra):
36  channel_to_fields = {
37  'switch_id': 'dpid',
38  'output_port': 'port_out',
39  'input_port': 'port_in',
40  'cookie': 'cookie'}
41  fields = model.grab_fields(channel_data, channel_to_fields)
42  fields.update(extra)
43  return cls(**fields)
44 
45  def as_tuple(self):
46  return (
47  self.clazz,
48  self.dpid,
49  self.cookie,
50  self.port_in, self.port_out)
51 
52  def _sort_key(self):
53  return self.as_tuple()
54 
55 
57  vlan_transit = model.Default(None)
58 
59  @classmethod
60  def of_kafka_channel(cls, channel_data, **extra):
61  fields = model.grab_fields(channel_data, {
62  'transit_vlan_id': 'vlan_transit'})
63  fields.update(extra)
64  return super(AbstractSpeakerFlowInstallCommand, cls).of_kafka_channel(
65  channel_data, **fields)
66 
67  def as_tuple(self):
68  return super(AbstractSpeakerFlowInstallCommand, self).as_tuple() + (
69  self.vlan_transit,)
70 
71 
73  vlan_input = model.Default(None)
74 
75  @classmethod
76  def of_kafka_channel(cls, channel_data, **extra):
77  fields = model.grab_fields(
78  channel_data, {'input_vlan_id': 'vlan_input'})
79  fields.update(extra)
80  return super(SpeakerFlowIngressCommand, cls).of_kafka_channel(
81  channel_data, **fields)
82 
83  def __init__(self, **fields):
84  super(AbstractSpeakerFlowCommand, self).__init__(
85  clazz='org.openkilda.messaging.command.flow.InstallIngressFlow',
86  **fields)
87 
88  def as_tuple(self):
89  return super(SpeakerFlowIngressCommand, self).as_tuple() + (
90  self.vlan_input, )
91 
92 
94  def __init__(self, **fields):
95  super(AbstractSpeakerFlowCommand, self).__init__(
96  clazz='org.openkilda.messaging.command.flow.InstallTransitFlow',
97  **fields)
98 
99 
101  vlan_output = model.Default(None)
102 
103  @classmethod
104  def of_kafka_channel(cls, channel_data, **extra):
105  fields = model.grab_fields(
106  channel_data, {'output_vlan_id': 'vlan_output'})
107  fields.update(extra)
108  return super(SpeakerFlowEgressCommand, cls).of_kafka_channel(
109  channel_data, **fields)
110 
111  def __init__(self, **fields):
112  super(AbstractSpeakerFlowCommand, self).__init__(
113  clazz='org.openkilda.messaging.command.flow.InstallEgressFlow',
114  **fields)
115 
116  def as_tuple(self):
117  return super(SpeakerFlowEgressCommand, self).as_tuple() + (
118  self.vlan_output, )
119 
120 
122  vlan_input = model.Default(None)
123 
124  @classmethod
125  def of_kafka_channel(cls, channel_data, **extra):
126  fields = model.grab_fields(channel_data['criteria'], {
127  'cookie': 'cookie',
128  'in_port': 'port_in',
129  'out_port': 'port_out',
130  'in_vlan': 'vlan_input'})
131  fields.update(extra)
132  return super(SpeakerFlowRemoveCommand, cls).of_kafka_channel(
133  channel_data, **fields)
134 
135  def __init__(self, **fields):
136  # FIXME(surabujin): TE fill incorrect values in `port_out`
137  # into FlowRemove command
138  fields['port_out'] = None
139 
140  super(AbstractSpeakerFlowCommand, self).__init__(
141  clazz='org.openkilda.messaging.command.flow.RemoveFlow',
142  **fields)
143 
144  def as_tuple(self):
145  return super(SpeakerFlowRemoveCommand, self).as_tuple() + (
146  self.vlan_input, )
147 
148 
149 clazz_to_command = {
150  'org.openkilda.messaging.command.flow.InstallIngressFlow':
151  SpeakerFlowIngressCommand,
152  'org.openkilda.messaging.command.flow.InstallTransitFlow':
153  SpeakerFlowTransitCommand,
154  'org.openkilda.messaging.command.flow.InstallEgressFlow':
155  SpeakerFlowEgressCommand,
156  'org.openkilda.messaging.command.flow.RemoveFlow': SpeakerFlowRemoveCommand}
157 
158 
160  correlation_id_counter = itertools.count()
161 
162  def setUp(self):
163  super(TestFlow, self).setUp()
164 
165  share.env.reset_kafka_producer()
166 
167  self.allow_all_features()
168 
169  def test_crud(self):
170  request = self.load_data('flow-create-request.json')
171  flow_info_data = request['payload']
172  self.put_test_flow_marker(flow_info_data)
173  self.fix_direction_markers(flow_info_data)
174 
175  # create
176  request['correlation_id'] = self.make_correlation_id()
177  create_request = copy.deepcopy(request)
178  self.flow_create(create_request)
179 
180  # update
181  request['correlation_id'] = self.make_correlation_id()
182  request['payload']['operation'] = 'UPDATE'
183  self.mangle_cookie(request, lambda cookie: cookie + 1)
184  self.mangle_dst_port(request, lambda port: port + 1)
185  update_request = copy.deepcopy(request)
186  self.flow_update(update_request, create_request)
187 
188  # delete
189  request['correlation_id'] = self.make_correlation_id()
190  request['payload']['operation'] = 'DELETE'
191  delete_request = copy.deepcopy(request)
192  self.flow_delete(delete_request)
193 
194  def flow_create(self, create_request):
195  flow_pair = create_request['payload']['payload']
196  self.assertFalse(self.is_flow_exist(flow_pair['forward']['flowid']))
197  self.assertTrue(share.feed_message(create_request))
198 
199  self.validate_db_flow(create_request)
200  expected_messages = self.flow_create_expected_commands(create_request)
202  expected_messages, create_request['correlation_id'])
203 
204  def flow_update(self, update_request, create_request):
205  flow_pair = update_request['payload']['payload']
206  self.assertTrue(self.is_flow_exist(flow_pair['forward']['flowid']))
207  self.assertTrue(share.feed_message(update_request))
208 
209  self.validate_db_flow(update_request)
210  expected_commands = self.flow_create_expected_commands(update_request)
211 
212  expected_commands.update(self.flow_delete_expected_commands(
213  create_request))
215  expected_commands, update_request['correlation_id'])
216 
217  def flow_delete(self, delete_request):
218  flow_pair = delete_request['payload']['payload']
219  flow_id = flow_pair['forward']['flowid']
220 
221  self.assertTrue(self.is_flow_exist(flow_id))
222  self.assertTrue(share.feed_message(delete_request))
223  self.assertFalse(self.is_flow_exist(flow_id))
224 
225  expect_commands = self.flow_delete_expected_commands(delete_request)
227  expect_commands, delete_request['correlation_id'])
228 
229  def validate_db_flow(self, request):
230  flow_pair = request['payload']['payload']
231  flow_forward = flow_pair['forward']
232 
233  flow_id = flow_forward['flowid']
234  expected_endpoints = set(self.extract_flow_endpoints(flow_forward))
235  with self.open_neo4j_session() as tx:
236  for db_flow in self.fetch_db_flow(tx, flow_id):
237  actual_endpoints = set(self.extract_flow_endpoints(db_flow))
238  self.assertEqual(expected_endpoints, actual_endpoints)
239 
240  def validate_produced_commands(self, expected_commands, correlation_id):
241  kafka_stream = self.kafka_backlog_stream()
242  kafka_stream = self.stream_filter_by_key(
243  kafka_stream, 'correlation_id', correlation_id)
244  kafka_stream = self.stream_filter_by_key(
245  kafka_stream, 'clazz', message_utils.MT_COMMAND)
246  kafka_stream = self.stream_map(kafka_stream, lambda x: x['payload'])
247 
248  extra_commands = []
249  for payload in kafka_stream:
250  try:
251  klass = clazz_to_command[payload['clazz']]
252  except KeyError:
253  extra_commands.append(payload)
254  continue
255 
256  command = klass.of_kafka_channel(payload).as_tuple()
257  if command in expected_commands:
258  expected_commands.discard(command)
259  else:
260  extra_commands.append(payload)
261 
262  if extra_commands:
263  message = 'Extra command messages have been produced'
264  print(message)
265  for idx, payload in enumerate(extra_commands):
266  print('#{}: {}'.format(idx, pprint.pformat(payload)))
267  raise AssertionError(message)
268 
269  if expected_commands:
270  message = 'Expected command messages have not been produced'
271  print(message)
272  for idx, payload in enumerate(sorted(expected_commands)):
273  print('#{}: {}'.format(idx, pprint.pformat(payload)))
274  raise AssertionError(message)
275 
276  def is_flow_exist(self, flow_id):
277  with self.open_neo4j_session() as tx:
278  return bool(list(self.fetch_db_flow(tx, flow_id)))
279 
281  features = {
282  x: True
283  for x in messageclasses.features_status_transport_to_app_map}
284  features_request = share.feature_toggle_request(**features)
285  self.assertTrue(share.feed_message(share.command(features_request)))
286 
288  return '{}-{}'.format(self.id(), next(self.correlation_id_counter))
289 
290  @staticmethod
291  def fetch_db_flow(tx, flow_id):
292  q = (
293  'MATCH (:switch)-[f:flow]->(:switch)\n'
294  'WHERE f.flowid=$flow_id\n'
295  'RETURN f')
296  p = {'flow_id': flow_id}
297 
298  cursor = tx.run(q, p)
299  return (dict(x['f']) for x in cursor)
300 
301  @staticmethod
303  flow_pair = request['payload']['payload']
304  forward = flow_pair['forward']
305  reverse = flow_pair['reverse']
306 
307  path_forward = forward['flowpath']['path']
308  path_reverse = reverse['flowpath']['path']
309  common_args_forward = {
310  'cookie': forward['cookie'],
311  'vlan_transit': forward['transit_vlan']
312  }
313  common_args_reverse = {
314  'cookie': reverse['cookie'],
315  'vlan_transit': reverse['transit_vlan']
316  }
317 
318  return {
319  # forward
321  dpid=forward['src_switch'],
322  port_in=forward['src_port'],
323  port_out=path_forward[0]['port_no'],
324  vlan_input=forward['src_vlan'],
325  **common_args_forward).as_tuple(),
327  dpid=path_forward[1]['switch_id'],
328  port_in=path_forward[1]['port_no'],
329  port_out=path_forward[2]['port_no'],
330  **common_args_forward).as_tuple(),
332  dpid=forward['dst_switch'],
333  port_in=path_forward[3]['port_no'],
334  port_out=forward['dst_port'],
335  vlan_output=forward['dst_vlan'],
336  **common_args_forward).as_tuple(),
337  # reverse
339  dpid=reverse['src_switch'],
340  port_in=reverse['src_port'],
341  port_out=path_reverse[0]['port_no'],
342  vlan_input=reverse['src_vlan'],
343  **common_args_reverse).as_tuple(),
345  dpid=path_reverse[1]['switch_id'],
346  port_in=path_reverse[1]['port_no'],
347  port_out=path_reverse[2]['port_no'],
348  **common_args_reverse).as_tuple(),
350  dpid=reverse['dst_switch'],
351  port_in=path_reverse[3]['port_no'],
352  port_out=reverse['dst_port'],
353  vlan_output=reverse['dst_vlan'],
354  **common_args_reverse).as_tuple()}
355 
356  @staticmethod
358  flow_pairs = request['payload']['payload']
359  forward = flow_pairs['forward']
360  reverse = flow_pairs['reverse']
361 
362  path_forward = forward['flowpath']['path']
363  path_reverse = reverse['flowpath']['path']
364 
365  common_args_forward = {'cookie': forward['cookie']}
366  common_args_reverse = {'cookie': reverse['cookie']}
367  return {
368  # forward
370  dpid=forward['src_switch'],
371  port_in=forward['src_port'],
372  port_out=path_forward[0]['port_no'],
373  vlan_input=forward['src_vlan'],
374  **common_args_forward).as_tuple(),
376  dpid=path_forward[1]['switch_id'],
377  port_in=path_forward[1]['port_no'],
378  port_out=path_forward[2]['port_no'],
379  vlan_input=forward['transit_vlan'],
380  **common_args_forward).as_tuple(),
382  dpid=forward['dst_switch'],
383  port_in=path_forward[3]['port_no'],
384  port_out=forward['dst_port'],
385  vlan_input=forward['transit_vlan'],
386  **common_args_forward).as_tuple(),
387  # reverse
389  dpid=reverse['src_switch'],
390  port_in=reverse['src_port'],
391  port_out=path_reverse[0]['port_no'],
392  vlan_input=reverse['src_vlan'],
393  **common_args_reverse).as_tuple(),
395  dpid=path_reverse[1]['switch_id'],
396  port_in=path_reverse[1]['port_no'],
397  port_out=path_reverse[2]['port_no'],
398  vlan_input=reverse['transit_vlan'],
399  **common_args_reverse).as_tuple(),
401  dpid=reverse['dst_switch'],
402  port_in=path_reverse[3]['port_no'],
403  port_out=reverse['dst_port'],
404  vlan_input=reverse['transit_vlan'],
405  **common_args_reverse).as_tuple()}
406 
407  @staticmethod
408  def mangle_cookie(request, mangle):
409  flow_pair = request['payload']['payload']
410  for thread in flow_pair['forward'], flow_pair['reverse']:
411  thread['cookie'] = mangle(thread['cookie'])
412 
413  @staticmethod
414  def mangle_dst_port(request, mangle):
415  flow_pair = request['payload']['payload']
416  for thread in flow_pair['forward'], flow_pair['reverse']:
417  thread['dst_port'] = mangle(thread['dst_port'])
418 
419  @staticmethod
421  return [
422  flow['src_switch'],
423  flow['dst_switch']]
424 
425  @staticmethod
426  def put_test_flow_marker(flow_info_data):
427  flow_threads = flow_info_data['payload']
428  for thread in flow_threads.values():
429  thread['cookie'] |= share.cookie_test_data_flag
430 
431  @staticmethod
432  def fix_direction_markers(flow_info_data):
433  flow_threads = flow_info_data['payload']
434  direction_to_flag = {
435  'forward': flow_utils.cookie_flag_forward,
436  'reverse': flow_utils.cookie_flag_reverse}
437  for direction in flow_threads:
438  flag = direction_to_flag[direction]
439  flow_threads[direction]['cookie'] |= flag
440 
441  @staticmethod
442  def stream_filter_by_key(stream, key, value):
443  return (x for x in stream if x.get(key) == value)
444 
445  @staticmethod
446  def stream_map(stream, action):
447  return (action(x) for x in stream)
448 
449  @staticmethod
451  return (
452  json.loads(x.payload) for x in share.env.kafka_producer_backlog())
def flow_update(self, update_request, create_request)
Definition: test_flow.py:204
def fix_direction_markers(flow_info_data)
Definition: test_flow.py:432
def put_test_flow_marker(flow_info_data)
Definition: test_flow.py:426
def validate_produced_commands(self, expected_commands, correlation_id)
Definition: test_flow.py:240
def flow_create(self, create_request)
Definition: test_flow.py:194
def flow_delete(self, delete_request)
Definition: test_flow.py:217
def stream_filter_by_key(stream, key, value)
Definition: test_flow.py:442