Open Kilda Java Documentation
messageclasses.py
Go to the documentation of this file.
1 #!/usr/bin/python
2 # Copyright 2017 Telstra Open Source
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 
17 import json
18 import logging
19 import threading
20 
21 from topologylistener import model
22 
23 from topologylistener import config
24 from topologylistener import db
25 from topologylistener import exc
26 from topologylistener import isl_utils
27 from topologylistener import link_props_utils
28 import flow_utils
29 import message_utils
30 
31 logger = logging.getLogger(__name__)
32 graph = flow_utils.graph
33 switch_states = {
34  'active': 'ACTIVATED',
35  'inactive': 'DEACTIVATED',
36  'removed': 'REMOVED'
37 }
38 
39 MT_SWITCH = "org.openkilda.messaging.info.event.SwitchInfoData"
40 MT_SWITCH_EXTENDED = "org.openkilda.messaging.info.event.SwitchInfoExtendedData"
41 MT_ISL = "org.openkilda.messaging.info.event.IslInfoData"
42 MT_PORT = "org.openkilda.messaging.info.event.PortInfoData"
43 MT_FLOW_INFODATA = "org.openkilda.messaging.info.flow.FlowInfoData"
44 MT_FLOW_RESPONSE = "org.openkilda.messaging.info.flow.FlowResponse"
45 MT_VALID_REQUEST = "org.openkilda.messaging.command.switches.SwitchRulesValidateRequest"
46 MT_SYNC_REQUEST = "org.openkilda.messaging.command.switches.SwitchRulesSyncRequest"
47 MT_NETWORK = "org.openkilda.messaging.info.discovery.NetworkInfoData"
48 MT_SWITCH_RULES = "org.openkilda.messaging.info.rule.SwitchFlowEntries"
49 #feature toggle is the functionality to turn off/on specific features
50 MT_STATE_TOGGLE = "org.openkilda.messaging.command.system.FeatureToggleStateRequest"
51 MT_TOGGLE = "org.openkilda.messaging.command.system.FeatureToggleRequest"
52 MT_NETWORK_TOPOLOGY_CHANGE = (
53  "org.openkilda.messaging.info.event.NetworkTopologyChange")
54 CD_NETWORK = "org.openkilda.messaging.command.discovery.NetworkCommandData"
55 CD_FLOWS_SYNC_REQUEST = 'org.openkilda.messaging.command.FlowsSyncRequest'
56 CD_LINK_PROPS_PUT = 'org.openkilda.messaging.te.request.LinkPropsPut'
57 CD_LINK_PROPS_DROP = 'org.openkilda.messaging.te.request.LinkPropsDrop'
58 
59 FEATURE_SYNC_OFRULES = 'sync_rules_on_activation'
60 FEATURE_REROUTE_ON_ISL_DISCOVERY = 'flows_reroute_on_isl_discovery'
61 FEATURE_CREATE_FLOW = 'create_flow'
62 FEATURE_UPDATE_FLOW = 'update_flow'
63 FEATURE_DELETE_FLOW = 'delete_flow'
64 FEATURE_PUSH_FLOW = 'push_flow'
65 FEATURE_UNPUSH_FLOW = 'unpush_flow'
66 
67 features_status = {
68  FEATURE_SYNC_OFRULES: False,
69  FEATURE_REROUTE_ON_ISL_DISCOVERY: True,
70  FEATURE_CREATE_FLOW: False,
71  FEATURE_UPDATE_FLOW: False,
72  FEATURE_DELETE_FLOW: False,
73  FEATURE_PUSH_FLOW: False,
74  FEATURE_UNPUSH_FLOW: False,
75 }
76 
77 features_status_app_to_transport_map = {
78  FEATURE_SYNC_OFRULES: 'sync_rules',
79  FEATURE_REROUTE_ON_ISL_DISCOVERY: 'reflow_on_switch_activation',
80  FEATURE_CREATE_FLOW: 'create_flow',
81  FEATURE_UPDATE_FLOW: 'update_flow',
82  FEATURE_DELETE_FLOW: 'delete_flow',
83  FEATURE_PUSH_FLOW: 'push_flow',
84  FEATURE_UNPUSH_FLOW: 'unpush_flow'
85 }
86 
87 features_status_transport_to_app_map = {
88  transport: app
89  for app, transport in features_status_app_to_transport_map.items()}
90 
91 
92 # This is used for blocking on flow changes.
93 # flow_sem = multiprocessing.Semaphore()
94 neo4j_update_lock = threading.RLock()
95 
96 
98  q = 'MERGE (target:config {name: "config"})\n'
99  q += db.format_set_fields(db.escape_fields(
100  {x: '$' + x for x in features_status_app_to_transport_map.values()},
101  raw_values=True), field_prefix='target.')
102  p = {
103  y: features_status[x]
104  for x, y in features_status_app_to_transport_map.items()}
105 
106  with graph.begin() as tx:
107  db.log_query('CONFIG update', q, p)
108  tx.run(q, p)
109 
110 
112  q = 'MATCH (target:config {name: "config"}) RETURN target LIMIT 2'
113  db.log_query('CONFIG read', q, None)
114  with graph.begin() as tx:
115  cursor = tx.run(q)
116  try:
117  config_node = db.fetch_one(cursor)['target']
118  for feature, name in features_status_app_to_transport_map.items():
119  features_status[feature] = config_node[name]
120  except exc.DBEmptyResponse:
121  logger.info(
122  'There is no persistent config in DB, fallback to'
123  ' builtin defaults')
124 
125 
126 read_config()
127 
128 
130  def __init__(self, message):
131  self._raw_message = message
132 
133  self.type = message.get("clazz")
134  self.payload = message.get("payload", {})
135  self.destination = message.get("destination","")
136  self.correlation_id = message.get("correlation_id", "admin-request")
137  self.reply_to = message.get("reply_to", "")
138 
139  try:
140  timestamp = message['timestamp']
141  timestamp = model.TimeProperty.new_from_java_timestamp(timestamp)
142  except KeyError:
143  timestamp = model.TimeProperty.now()
144  self.timestamp = timestamp
145 
146  def to_json(self):
147  return json.dumps(
148  self, default=lambda o: o.__dict__, sort_keys=True, indent=4)
149 
150  def get_type(self):
151  message_type = self.get_message_type()
152  command = self.get_command()
153  return command if message_type == 'unknown' else message_type
154 
155  def get_command(self):
156  return self.payload.get('clazz', 'unknown')
157 
158  def get_message_type(self):
159  return self.payload.get('clazz', 'unknown')
160 
161  def handle(self):
162  try:
163  event_handled = False
164 
165  if self.get_message_type() == MT_SWITCH:
166  if self.payload['state'] == "ADDED":
167  self.create_switch()
168  elif self.payload['state'] == "ACTIVATED":
169  self.activate_switch()
170  elif self.payload['state'] in ("DEACTIVATED", "REMOVED"):
171  self.switch_unplug()
172  event_handled = True
173 
174  elif self.get_message_type() == MT_ISL:
175  if self.payload['state'] == "DISCOVERED":
176  self.create_isl()
177  elif self.payload['state'] in ("FAILED", "MOVED"):
178  self.isl_discovery_failed()
179  event_handled = True
180 
181  elif self.get_message_type() == MT_PORT:
182  if self.payload['state'] == "DOWN":
183  self.port_down()
184  event_handled = True
185 
186  # Cache topology expects to receive OFE events
187  if event_handled:
188  message_utils.send_cache_message(self.payload,
189  self.correlation_id)
191 
192  elif self.get_message_type() == MT_FLOW_INFODATA:
193  event_handled = self.flow_operation()
194 
195  elif self.get_command() == CD_FLOWS_SYNC_REQUEST:
197  event_handled = True
198 
199  elif self.get_message_type() == MT_STATE_TOGGLE:
200  event_handled = self.get_feature_toggle_state()
201  elif self.get_message_type() == MT_TOGGLE:
202  event_handled = self.update_feature_toggles()
203 
204  elif self.get_message_type() == MT_SWITCH_EXTENDED:
205  if features_status[FEATURE_SYNC_OFRULES]:
206  event_handled = self.validate_and_sync_switch_rules()
207  else:
208  event_handled = True
209 
210  elif self.get_message_type() == MT_VALID_REQUEST:
211  event_handled = self.send_dump_rules_request()
212 
213  elif self.get_message_type() == MT_SWITCH_RULES:
214  event_handled = self.validate_switch_rules()
215 
216  elif self.get_message_type() == MT_SYNC_REQUEST:
217  event_handled = self.sync_switch_rules()
218 
219  elif self.get_message_type() in (
220  CD_LINK_PROPS_PUT, CD_LINK_PROPS_DROP):
221  self.handle_link_props()
222  event_handled = True
223 
224  if not event_handled:
225  logger.error('Message was not handled correctly: message=%s',
226  self.payload)
227 
228  return event_handled
229  except Exception as e:
230  logger.exception("Exception during handling message")
231  return False
232 
233  def handle_link_props(self):
234  try:
235  if self.get_message_type() == CD_LINK_PROPS_PUT:
236  self.link_props_put()
237  elif self.get_message_type() == CD_LINK_PROPS_DROP:
238  self.link_props_drop()
239  else:
241  'link props request {}'.format(self.get_message_type()))
242  except exc.Error as e:
243  payload = message_utils.make_link_props_response(
244  self.payload, None, error=str(e))
245  message_utils.send_link_props_response(
246  payload, self.correlation_id,
247  self.get_message_type() == CD_LINK_PROPS_DROP)
248 
249  def activate_switch(self):
250  switch_id = self.payload['switch_id']
251 
252  logger.info('Switch %s activation request: timestamp=%s',
253  switch_id, self.timestamp)
254 
255  with graph.begin() as tx:
256  flow_utils.precreate_switches(tx, switch_id)
257 
258  q = 'MATCH (target:switch {name: $dpid}) SET target.state="active"'
259  p = {'dpid': switch_id}
260  db.log_query('SWITCH activate', q, p)
261  tx.run(q, p)
262 
263  def create_switch(self):
264  switch_id = self.payload['switch_id']
265 
266  logger.info('Switch %s creation request: timestamp=%s',
267  switch_id, self.timestamp)
268 
269  with graph.begin() as tx:
270  flow_utils.precreate_switches(tx, switch_id)
271 
272  p = {
273  'address': self.payload['address'],
274  'hostname': self.payload['hostname'],
275  'description': self.payload['description'],
276  'controller': self.payload['controller'],
277  'state': 'active'}
278  q = 'MATCH (target:switch {name: $dpid})\n' + db.format_set_fields(
279  db.escape_fields(
280  {x: '$' + x for x in p}, raw_values=True),
281  field_prefix='target.')
282  p['dpid'] = switch_id
283 
284  db.log_query('SWITCH create', q, p)
285  tx.run(q, p)
286 
287  def switch_unplug(self):
288  switch_id = self.payload['switch_id']
289  logger.info('Switch %s deactivation request', switch_id)
290 
291  with graph.begin() as tx:
292  flow_utils.precreate_switches(tx, switch_id)
293 
294  q = ('MATCH (target:switch {name: $dpid}) '
295  'SET target.state="inactive"')
296  tx.run(q, {'dpid': switch_id})
297 
298  isl_utils.switch_unplug(tx, switch_id)
299 
301  """
302  :return: Ideally, this should return true IFF discovery is deleted or deactivated.
303  """
304  path = self.payload['path']
305  switch_id = path[0]['switch_id']
306  port = int(path[0]['port_no'])
307 
308  effective_policy = config.get("isl_failover_policy", "effective_policy")
309  logger.info('Isl failure: %s_%d -- apply policy %s: timestamp=%s',
310  switch_id, port, effective_policy, self.timestamp)
311 
312  is_moved = self.payload['state'] == 'MOVED'
313  try:
314  with graph.begin() as tx:
315  updated = isl_utils.disable_by_endpoint(
316  tx, model.IslPathNode(switch_id, port), is_moved)
317  updated.sort(key=lambda x: (x.source, x.dest))
318  for isl in updated:
319  # we can get multiple records for one port
320  # but will use lifecycle data from first one
321  life_cycle = isl_utils.get_life_cycle_fields(tx, isl)
322  self.update_payload_lifecycle(life_cycle)
323  break
324 
325  except exc.DBRecordNotFound:
326  logger.error('There is no ISL on %s_%s', switch_id, port)
327 
328  def port_down(self):
329  switch_id = self.payload['switch_id']
330  port_id = int(self.payload['port_no'])
331 
332  logger.info('Port %s_%d deletion request: timestamp=%s',
333  switch_id, port_id, self.timestamp)
334 
335  try:
336  with graph.begin() as tx:
337  for isl in isl_utils.disable_by_endpoint(
338  tx, model.IslPathNode(switch_id, port_id)):
339  # TODO(crimi): should be policy / toggle based
340  isl_utils.increase_cost(
341  tx, isl,
342  config.ISL_COST_WHEN_PORT_DOWN,
343  config.ISL_COST_WHEN_PORT_DOWN)
344  isl_utils.increase_cost(
345  tx, isl.reversed(),
346  config.ISL_COST_WHEN_PORT_DOWN,
347  config.ISL_COST_WHEN_PORT_DOWN)
348  except exc.DBRecordNotFound:
349  logger.info("There is no ISL on %s_%s", switch_id, port_id)
350 
351  def create_isl(self):
352  """
353  Two parts to creating an ISL:
354  (1) create the relationship itself
355  (2) add any link properties, if they exist.
356 
357  NB: The query used for (2) is the same as in the TER
358  TODO: either share the same query as library in python, or handle in java
359 
360  :return: success or failure (boolean)
361  """
362  path = self.payload['path']
363  latency = int(self.payload['latency_ns'])
364  a_switch = path[0]['switch_id']
365  a_port = int(path[0]['port_no'])
366  b_switch = path[1]['switch_id']
367  b_port = int(path[1]['port_no'])
368  speed = int(self.payload['speed'])
369  available_bandwidth = int(self.payload['available_bandwidth'])
370 
371  isl = model.InterSwitchLink.new_from_java(self.payload)
372  isl.ensure_path_complete()
373 
374  logger.info('ISL %s create request', isl)
375  with graph.begin() as tx:
376  flow_utils.precreate_switches(
377  tx, isl.source.dpid, isl.dest.dpid)
378  isl_utils.create_if_missing(tx, self.timestamp, isl)
379  isl_utils.set_props(tx, isl, {
380  'latency': latency,
381  'speed': speed,
382  'max_bandwidth': available_bandwidth,
383  'actual': 'active'})
384 
385  isl_utils.update_status(tx, isl, mtime=self.timestamp)
386  isl_utils.resolve_conflicts(tx, isl)
387 
388  life_cycle = isl_utils.get_life_cycle_fields(tx, isl)
389  self.update_payload_lifecycle(life_cycle)
390 
391  #
392  # Now handle the second part .. pull properties from link_props if they exist
393  #
394 
395  src_sw, src_pt, dst_sw, dst_pt = a_switch, a_port, b_switch, b_port # use same names as TER code
396  query = 'MATCH (src:switch)-[i:isl]->(dst:switch) '
397  query += ' WHERE i.src_switch = "%s" ' \
398  ' AND i.src_port = %s ' \
399  ' AND i.dst_switch = "%s" ' \
400  ' AND i.dst_port = %s ' % (src_sw, src_pt, dst_sw, dst_pt)
401  query += ' MATCH (lp:link_props) '
402  query += ' WHERE lp.src_switch = "%s" ' \
403  ' AND lp.src_port = %s ' \
404  ' AND lp.dst_switch = "%s" ' \
405  ' AND lp.dst_port = %s ' % (src_sw, src_pt, dst_sw, dst_pt)
406  query += ' SET i += lp '
407  graph.run(query)
408 
409  #
410  # Finally, update the available_bandwidth..
411  #
412  flow_utils.update_isl_bandwidth(src_sw, src_pt, dst_sw, dst_pt)
413 
414  logger.info('ISL %s have been created/updated', isl)
415 
416  return True
417 
419  if self.get_message_type() != MT_ISL:
420  return
421  if self.payload['state'] != "DISCOVERED":
422  return
423  if not features_status[FEATURE_REROUTE_ON_ISL_DISCOVERY]:
424  return
425 
426  path = self.payload['path']
427  node = path[0]
428 
429  payload = {
430  'clazz': MT_NETWORK_TOPOLOGY_CHANGE,
431  'type': 'ENDPOINT_ADD',
432  'switch_id': node['switch_id'],
433  'port_number': node['port_no']}
434 
435  message_utils.send_cache_message(
436  payload, self.correlation_id)
437 
438  @staticmethod
439  def create_flow(flow_id, flow, correlation_id, tx, propagate=True, from_nb=False):
440  """
441  :param propagate: If true, send to switch
442  :param from_nb: If true, send response to NORTHBOUND API; otherwise to FLOW_TOPOLOGY
443  :return:
444  """
445 
446  try:
447  rules = flow_utils.build_rules(flow)
448 
449  logger.info('Flow rules were built: correlation_id=%s, flow_id=%s',
450  correlation_id, flow_id)
451 
452  flow_utils.store_flow(flow, tx)
453 
454  logger.info('Flow was stored: correlation_id=%s, flow_id=%s',
455  correlation_id, flow_id)
456 
457  if propagate:
458  message_utils.send_install_commands(rules, correlation_id)
459  logger.info('Flow rules INSTALLED: correlation_id=%s, flow_id=%s', correlation_id, flow_id)
460 
461  if not from_nb:
462  message_utils.send_info_message({'payload': flow, 'clazz': MT_FLOW_RESPONSE}, correlation_id)
463  else:
464  # The request is sent from Northbound .. send response back
465  logger.info('Flow rules NOT PROPAGATED: correlation_id=%s, flow_id=%s', correlation_id, flow_id)
466  data = {"payload":{"flowid": flow_id,"status": "UP"},
467  "clazz": message_utils.MT_INFO_FLOW_STATUS}
468  message_utils.send_to_topic(
469  payload=data,
470  correlation_id=correlation_id,
471  message_type=message_utils.MT_INFO,
472  destination="NORTHBOUND",
473  topic=config.KAFKA_NORTHBOUND_TOPIC
474  )
475 
476  except Exception as e:
477  logger.exception('Can not create flow: %s', flow_id)
478  if not from_nb:
479  # Propagate is the normal scenario, so send response back to FLOW
480  message_utils.send_error_message(correlation_id, "CREATION_FAILURE", e.message, flow_id)
481  else:
482  # This means we tried a PUSH, send response back to NORTHBOUND
483  message_utils.send_error_message(correlation_id, "PUSH_FAILURE", e.message, flow_id,
484  destination="NORTHBOUND", topic=config.KAFKA_NORTHBOUND_TOPIC)
485  raise
486 
487  return True
488 
489  @staticmethod
490  def delete_flow(flow_id, flow, correlation_id, parent_tx=None, propagate=True, from_nb=False):
491  """
492  Simple algorithm - delete the stuff in the DB, send delete commands, send a response.
493  Complexity - each segment in the path may have a separate cookie, so that information needs to be gathered.
494  NB: Each switch in the flow should get a delete command.
495 
496  # TODO: eliminate flowpath as part of delete_flow request; rely on flow_id only
497  # TODO: Add state to flow .. ie "DELETING", as part of refactoring project to add state
498  - eg: flow_utils.update_state(flow, DELETING, parent_tx)
499 
500  :param parent_tx: If there is a larger transaction to use, then use it.
501  :return: True, unless an exception is raised.
502  """
503  try:
504  # All flows .. single switch or multi .. will start with deleting based on the src and flow cookie; then
505  # we'll have a delete per segment based on the destination. Consequently, the "single switch flow" is
506  # automatically addressed using this algorithm.
507  flow_cookie = int(flow['cookie'])
508  transit_vlan = int(flow['transit_vlan'])
509 
510  current_node = {'switch_id': flow['src_switch'], 'flow_id': flow_id, 'cookie': flow_cookie,
511  'meter_id': flow['meter_id'], 'in_port': flow['src_port'], 'in_vlan': flow['src_vlan']}
512  nodes = [current_node]
513 
514  segments = flow_utils.fetch_flow_segments(flow_id, flow_cookie)
515  for segment in segments:
516  current_node['out_port'] = segment['src_port']
517 
518  # every segment should have a cookie field, based on merge_segment; but just in case..
519  segment_cookie = segment.get('cookie', flow_cookie)
520  current_node = {'switch_id': segment['dst_switch'], 'flow_id': flow_id, 'cookie': segment_cookie,
521  'meter_id': None, 'in_port': segment['dst_port'], 'in_vlan': transit_vlan,
522  'out_port': segment['dst_port']}
523  nodes.append(current_node)
524 
525  current_node['out_port'] = flow['dst_port']
526 
527  if propagate:
528  logger.info('Flow rules remove start: correlation_id=%s, flow_id=%s, path=%s', correlation_id, flow_id,
529  nodes)
530  message_utils.send_delete_commands(nodes, correlation_id)
531  logger.info('Flow rules removed end : correlation_id=%s, flow_id=%s', correlation_id, flow_id)
532 
533  if from_nb:
534  # The request is sent from Northbound .. send response back
535  logger.info('Flow rules from NB: correlation_id=%s, flow_id=%s', correlation_id, flow_id)
536  data = {"payload":{"flowid": flow_id,"status": "DOWN"},
537  "clazz": message_utils.MT_INFO_FLOW_STATUS}
538  message_utils.send_to_topic(
539  payload=data,
540  correlation_id=correlation_id,
541  message_type=message_utils.MT_INFO,
542  destination="NORTHBOUND",
543  topic=config.KAFKA_NORTHBOUND_TOPIC
544  )
545 
546  flow_utils.remove_flow(flow, parent_tx)
547 
548  logger.info('Flow was removed: correlation_id=%s, flow_id=%s', correlation_id, flow_id)
549 
550  except Exception as e:
551  logger.exception('Can not delete flow: %s', e.message)
552  if not from_nb:
553  # Propagate is the normal scenario, so send response back to FLOW
554  message_utils.send_error_message(correlation_id, "DELETION_FAILURE", e.message, flow_id)
555  else:
556  # This means we tried a UNPUSH, send response back to NORTHBOUND
557  message_utils.send_error_message( correlation_id, "UNPUSH_FAILURE", e.message, flow_id,
558  destination="NORTHBOUND", topic=config.KAFKA_NORTHBOUND_TOPIC)
559  raise
560 
561  return True
562 
563  @staticmethod
564  def update_flow(flow_id, flow, correlation_id, tx):
565  try:
566  old_flow = flow_utils.get_old_flow(flow)
567 
568  #
569  # Start the transaction to govern the create/delete
570  #
571  logger.info('Flow rules were built: correlation_id=%s, flow_id=%s', correlation_id, flow_id)
572  rules = flow_utils.build_rules(flow)
573  # TODO: add tx to store_flow
574  flow_utils.store_flow(flow, tx)
575  logger.info('Flow was stored: correlation_id=%s, flow_id=%s', correlation_id, flow_id)
576  message_utils.send_install_commands(rules, correlation_id)
577 
578  MessageItem.delete_flow(old_flow['flowid'], old_flow, correlation_id, tx)
579 
580  payload = {'payload': flow, 'clazz': MT_FLOW_RESPONSE}
581  message_utils.send_info_message(payload, correlation_id)
582 
583  except Exception as e:
584  logger.exception('Can not update flow: %s', e.message)
585  message_utils.send_error_message(
586  correlation_id, "UPDATE_FAILURE", e.message, flow_id)
587  raise
588 
589  return True
590 
592  op = self.payload['operation'].upper()
593 
594  # (nmarchenko) I do that for readability and to avoid double binary
595  # negation
596  allow = False
597 
598  if op == "CREATE" and features_status[FEATURE_CREATE_FLOW]:
599  return allow
600  if op == "PUSH" and features_status[FEATURE_PUSH_FLOW]:
601  return allow
602  if op == "PUSH_PROPAGATE" and features_status[FEATURE_CREATE_FLOW]:
603  return allow
604  if op == "DELETE" and features_status[FEATURE_DELETE_FLOW]:
605  return allow
606  if op == "UNPUSH" and features_status[FEATURE_UNPUSH_FLOW]:
607  return allow
608  if op == "UNPUSH_PROPAGATE" and features_status[FEATURE_DELETE_FLOW]:
609  return allow
610  if op == "UPDATE" and features_status[FEATURE_UPDATE_FLOW]:
611  return allow
612 
613  return not allow
614 
615  def flow_operation(self):
616  correlation_id = self.correlation_id
617  timestamp = self.timestamp
618  payload = self.payload
619 
620  operation = payload['operation']
621  flows = payload['payload']
622  forward = flows['forward']
623  reverse = flows['reverse']
624  flow_id = forward['flowid']
625 
626  if self.not_allow_flow_operation():
627  logger.info('Flow %s request is not allow: '
628  'timestamp=%s, correlation_id=%s, payload=%s',
629  operation, timestamp, correlation_id, payload)
630 
631  # TODO: We really should use the reply-to field, at least in NB, so that we know to send the response.
632  op = payload['operation'].upper()
633  if op == "PUSH" or op == "PUSH_PROPAGATE" or op == "UNPUSH" or op == "UNPUSH_PROPAGATE":
634  message_utils.send_error_message(
635  correlation_id, 'REQUEST_INVALID', op+"-FAILURE - NOT ALLOWED RIGHT NOW - Toggle the feature to allow this behavior", "",
636  destination="NORTHBOUND", topic=config.KAFKA_NORTHBOUND_TOPIC)
637 
638  return True
639 
640  logger.info('Flow %s request processing: '
641  'timestamp=%s, correlation_id=%s, payload=%s',
642  operation, timestamp, correlation_id, payload)
643 
644  tx = None
645  # flow_sem.acquire(timeout=10) # wait 10 seconds .. then proceed .. possibly causing some issue.
646  neo4j_update_lock.acquire()
647  try:
648  OP = operation.upper()
649  if OP == "CREATE" or OP == "PUSH" or OP == "PUSH_PROPAGATE":
650  propagate = (OP == "CREATE" or OP == "PUSH_PROPAGATE")
651  from_nb = (OP == "PUSH" or OP == "PUSH_PROPAGATE")
652  tx = graph.begin()
653  self.create_flow(flow_id, forward, correlation_id, tx, propagate, from_nb)
654  self.create_flow(flow_id, reverse, correlation_id, tx, propagate, from_nb)
655  tx.commit()
656  tx = None
657 
658  elif OP == "DELETE" or OP == "UNPUSH" or OP == "UNPUSH_PROPAGATE":
659  tx = graph.begin()
660  propagate = (OP == "DELETE" or OP == "UNPUSH_PROPAGATE")
661  from_nb = (OP == "UNPUSH" or OP == "UNPUSH_PROPAGATE")
662  MessageItem.delete_flow(flow_id, forward, correlation_id, tx, propagate, from_nb)
663  MessageItem.delete_flow(flow_id, reverse, correlation_id, tx, propagate, from_nb)
664  if not from_nb:
665  message_utils.send_info_message({'payload': forward, 'clazz': MT_FLOW_RESPONSE}, correlation_id)
666  message_utils.send_info_message({'payload': reverse, 'clazz': MT_FLOW_RESPONSE}, correlation_id)
667  tx.commit()
668  tx = None
669 
670  elif OP == "UPDATE":
671  tx = graph.begin()
672  MessageItem.update_flow(flow_id, forward, correlation_id, tx)
673  MessageItem.update_flow(flow_id, reverse, correlation_id, tx)
674  tx.commit()
675  tx = None
676 
677  else:
678  logger.warn('Flow operation is not supported: '
679  'operation=%s, timestamp=%s, correlation_id=%s,',
680  operation, timestamp, correlation_id)
681  except Exception:
682  if tx is not None:
683  tx.rollback()
684  # flow_sem.release()
685  raise
686 
687  finally:
688  #flow_sem.release()
689  neo4j_update_lock.release()
690 
691  logger.info('Flow %s request processed: '
692  'timestamp=%s, correlation_id=%s, payload=%s',
693  operation, timestamp, correlation_id, payload)
694 
695  return True
696 
697  @staticmethod
698  def fetch_isls(pull=True,sort_key='src_switch'):
699  """
700  :return: an unsorted list of ISL relationships with all properties pulled from the db if pull=True
701  """
702  try:
703  # query = "MATCH (a:switch)-[r:isl]->(b:switch) RETURN r ORDER BY r.src_switch, r.src_port"
704  isls=[]
705  rels = graph.match(rel_type="isl")
706  for rel in rels:
707  if pull:
708  graph.pull(rel)
709  isls.append(rel)
710 
711  if sort_key:
712  isls = sorted(isls, key=lambda x: x[sort_key])
713 
714  return isls
715  except Exception as e:
716  logger.exception('FAILED to get ISLs from the DB ', e.message)
717  raise
718 
720  payload = {
721  'switches': [],
722  'isls': [],
723  'flows': flow_utils.get_flows(),
724  'clazz': MT_NETWORK}
725  message_utils.send_to_topic(
726  payload, self.correlation_id, message_utils.MT_INFO,
727  destination="WFM_FLOW_LCM", topic=config.KAFKA_FLOW_TOPIC)
728 
730  payload = message_utils.make_features_status_response()
731  for feature, status in features_status.items():
732  transport_key = features_status_app_to_transport_map[feature]
733  setattr(payload, transport_key, status)
734 
735  message_utils.send_to_topic(payload, self.correlation_id,
736  message_type=message_utils.MT_INFO,
737  destination="NORTHBOUND",
738  topic=config.KAFKA_NORTHBOUND_TOPIC)
739  return True
740 
742  for transport_key in features_status_transport_to_app_map:
743  app_key = features_status_transport_to_app_map[transport_key]
744  try:
745  status = self.payload[transport_key]
746  except KeyError:
747  continue
748 
749  current = features_status[app_key]
750  logger.info(
751  'Set feature %s status to %s, previous value %s',
752  app_key, status, current)
753  features_status[app_key] = status
754 
755  update_config()
756 
757  return True
758 
760  diff = flow_utils.validate_switch_rules(self.payload['switch_id'],
761  self.payload['flows'])
762  message_utils.send_validation_rules_response(diff["missing_rules"],
763  diff["excess_rules"],
764  diff["proper_rules"],
765  self.correlation_id)
766  return True
767 
769  switch_id = self.payload['switch_id']
770 
771  diff = flow_utils.validate_switch_rules(switch_id,
772  self.payload['flows'])
773  sync_actions = flow_utils.build_commands_to_sync_rules(switch_id,
774  diff["missing_rules"])
775  commands = sync_actions["commands"]
776  if commands:
777  logger.info('Install commands for switch %s are to be sent: %s',
778  switch_id, commands)
779  message_utils.send_force_install_commands(switch_id, commands,
780  self.correlation_id)
781 
782  return True
783 
784  def sync_switch_rules(self):
785  switch_id = self.payload['switch_id']
786  rules_to_sync = self.payload['rules']
787 
788  logger.debug('Switch rules synchronization for rules: %s', rules_to_sync)
789 
790  sync_actions = flow_utils.build_commands_to_sync_rules(switch_id,
791  rules_to_sync)
792  commands = sync_actions["commands"]
793  if commands:
794  logger.info('Install commands for switch %s are to be sent: %s',
795  switch_id, commands)
796  message_utils.send_force_install_commands(switch_id, commands,
797  self.correlation_id)
798 
799  message_utils.send_sync_rules_response(sync_actions["installed_rules"],
800  self.correlation_id)
801  return True
802 
804  message_utils.send_dump_rules_request(self.payload['switch_id'],
805  self.correlation_id)
806  return True
807 
808  def update_payload_lifecycle(self, life_cycle):
809  for key, value in (
810  ('time_create', life_cycle.ctime),
811  ('time_modify', life_cycle.mtime)):
812  if not value:
813  continue
814  self.payload[key] = value.as_java_timestamp()
815 
816  def link_props_put(self):
817  link_props = self._unpack_link_props()
818  protected = link_props.extract_protected_props()
819  if protected:
821  link_props, 'property(es) %s is can\'t be changed'.format(
822  ', '.join(repr(x) for x in sorted(protected))))
823 
824  with graph.begin() as tx:
825  link_props_utils.create_if_missing(tx, link_props)
826  link_props_utils.set_props_and_propagate_to_isl(tx, link_props)
827 
828  actual_link_props = link_props_utils.read(tx, link_props)
829 
830  payload = message_utils.make_link_props_response(
831  self.payload, actual_link_props)
832  message_utils.send_link_props_response(payload, self.correlation_id)
833 
834  def link_props_drop(self):
835  lookup_mask = self._unpack_link_props(key='lookup_mask')
836  with graph.begin() as tx:
837  removed_records = link_props_utils.drop_by_mask(tx, lookup_mask)
838  for link_props in removed_records:
839  isl = model.InterSwitchLink.new_from_link_props(link_props)
840  isl_utils.del_props(tx, isl, link_props.props)
841 
842  response_batch = [
843  message_utils.make_link_props_response(self.payload, x)
844  for x in removed_records]
845  message_utils.send_link_props_chunked_response(
846  response_batch, self.correlation_id)
847 
848  def _unpack_link_props(self, key='link_props'):
849  try:
850  link_props = model.LinkProps.new_from_java(
851  self.payload[key])
852  except (KeyError, ValueError, TypeError) as e:
854  return link_props
def _unpack_link_props(self, key='link_props')
def get(section, option)
Definition: config.py:43
def fetch_isls(pull=True, sort_key='src_switch')
def delete_flow(flow_id, flow, correlation_id, parent_tx=None, propagate=True, from_nb=False)
def create_flow(flow_id, flow, correlation_id, tx, propagate=True, from_nb=False)
def update_flow(flow_id, flow, correlation_id, tx)