21 from topologylistener
import model
23 from topologylistener
import config
24 from topologylistener
import db
25 from topologylistener
import exc
26 from topologylistener
import isl_utils
27 from topologylistener
import link_props_utils
31 logger = logging.getLogger(__name__)
32 graph = flow_utils.graph
34 'active':
'ACTIVATED',
35 'inactive':
'DEACTIVATED',
39 MT_SWITCH =
"org.openkilda.messaging.info.event.SwitchInfoData" 40 MT_SWITCH_EXTENDED =
"org.openkilda.messaging.info.event.SwitchInfoExtendedData" 41 MT_ISL =
"org.openkilda.messaging.info.event.IslInfoData" 42 MT_PORT =
"org.openkilda.messaging.info.event.PortInfoData" 43 MT_FLOW_INFODATA =
"org.openkilda.messaging.info.flow.FlowInfoData" 44 MT_FLOW_RESPONSE =
"org.openkilda.messaging.info.flow.FlowResponse" 45 MT_VALID_REQUEST =
"org.openkilda.messaging.command.switches.SwitchRulesValidateRequest" 46 MT_SYNC_REQUEST =
"org.openkilda.messaging.command.switches.SwitchRulesSyncRequest" 47 MT_NETWORK =
"org.openkilda.messaging.info.discovery.NetworkInfoData" 48 MT_SWITCH_RULES =
"org.openkilda.messaging.info.rule.SwitchFlowEntries" 50 MT_STATE_TOGGLE =
"org.openkilda.messaging.command.system.FeatureToggleStateRequest" 51 MT_TOGGLE =
"org.openkilda.messaging.command.system.FeatureToggleRequest" 52 MT_NETWORK_TOPOLOGY_CHANGE = (
53 "org.openkilda.messaging.info.event.NetworkTopologyChange")
54 CD_NETWORK =
"org.openkilda.messaging.command.discovery.NetworkCommandData" 55 CD_FLOWS_SYNC_REQUEST =
'org.openkilda.messaging.command.FlowsSyncRequest' 56 CD_LINK_PROPS_PUT =
'org.openkilda.messaging.te.request.LinkPropsPut' 57 CD_LINK_PROPS_DROP =
'org.openkilda.messaging.te.request.LinkPropsDrop' 59 FEATURE_SYNC_OFRULES =
'sync_rules_on_activation' 60 FEATURE_REROUTE_ON_ISL_DISCOVERY =
'flows_reroute_on_isl_discovery' 61 FEATURE_CREATE_FLOW =
'create_flow' 62 FEATURE_UPDATE_FLOW =
'update_flow' 63 FEATURE_DELETE_FLOW =
'delete_flow' 64 FEATURE_PUSH_FLOW =
'push_flow' 65 FEATURE_UNPUSH_FLOW =
'unpush_flow' 68 FEATURE_SYNC_OFRULES:
False,
69 FEATURE_REROUTE_ON_ISL_DISCOVERY:
True,
70 FEATURE_CREATE_FLOW:
False,
71 FEATURE_UPDATE_FLOW:
False,
72 FEATURE_DELETE_FLOW:
False,
73 FEATURE_PUSH_FLOW:
False,
74 FEATURE_UNPUSH_FLOW:
False,
77 features_status_app_to_transport_map = {
78 FEATURE_SYNC_OFRULES:
'sync_rules',
79 FEATURE_REROUTE_ON_ISL_DISCOVERY:
'reflow_on_switch_activation',
80 FEATURE_CREATE_FLOW:
'create_flow',
81 FEATURE_UPDATE_FLOW:
'update_flow',
82 FEATURE_DELETE_FLOW:
'delete_flow',
83 FEATURE_PUSH_FLOW:
'push_flow',
84 FEATURE_UNPUSH_FLOW:
'unpush_flow' 87 features_status_transport_to_app_map = {
89 for app, transport
in features_status_app_to_transport_map.items()}
94 neo4j_update_lock = threading.RLock()
98 q =
'MERGE (target:config {name: "config"})\n' 99 q += db.format_set_fields(db.escape_fields(
100 {x:
'$' + x
for x
in features_status_app_to_transport_map.values()},
101 raw_values=
True), field_prefix=
'target.')
103 y: features_status[x]
104 for x, y
in features_status_app_to_transport_map.items()}
106 with graph.begin()
as tx:
107 db.log_query(
'CONFIG update', q, p)
112 q =
'MATCH (target:config {name: "config"}) RETURN target LIMIT 2' 113 db.log_query(
'CONFIG read', q,
None)
114 with graph.begin()
as tx:
117 config_node = db.fetch_one(cursor)[
'target']
118 for feature, name
in features_status_app_to_transport_map.items():
119 features_status[feature] = config_node[name]
122 'There is no persistent config in DB, fallback to' 133 self.
type = message.get(
"clazz")
140 timestamp = message[
'timestamp']
141 timestamp = model.TimeProperty.new_from_java_timestamp(timestamp)
143 timestamp = model.TimeProperty.now()
148 self, default=
lambda o: o.__dict__, sort_keys=
True, indent=4)
153 return command
if message_type ==
'unknown' else message_type
163 event_handled =
False 166 if self.
payload[
'state'] ==
"ADDED":
168 elif self.
payload[
'state'] ==
"ACTIVATED":
170 elif self.
payload[
'state']
in (
"DEACTIVATED",
"REMOVED"):
175 if self.
payload[
'state'] ==
"DISCOVERED":
177 elif self.
payload[
'state']
in (
"FAILED",
"MOVED"):
182 if self.
payload[
'state'] ==
"DOWN":
188 message_utils.send_cache_message(self.
payload,
205 if features_status[FEATURE_SYNC_OFRULES]:
220 CD_LINK_PROPS_PUT, CD_LINK_PROPS_DROP):
224 if not event_handled:
225 logger.error(
'Message was not handled correctly: message=%s',
229 except Exception
as e:
230 logger.exception(
"Exception during handling message")
243 payload = message_utils.make_link_props_response(
244 self.
payload,
None, error=str(e))
245 message_utils.send_link_props_response(
250 switch_id = self.
payload[
'switch_id']
252 logger.info(
'Switch %s activation request: timestamp=%s',
255 with graph.begin()
as tx:
256 flow_utils.precreate_switches(tx, switch_id)
258 q =
'MATCH (target:switch {name: $dpid}) SET target.state="active"' 259 p = {
'dpid': switch_id}
260 db.log_query(
'SWITCH activate', q, p)
264 switch_id = self.
payload[
'switch_id']
266 logger.info(
'Switch %s creation request: timestamp=%s',
269 with graph.begin()
as tx:
270 flow_utils.precreate_switches(tx, switch_id)
273 'address': self.
payload[
'address'],
274 'hostname': self.
payload[
'hostname'],
275 'description': self.
payload[
'description'],
276 'controller': self.
payload[
'controller'],
278 q =
'MATCH (target:switch {name: $dpid})\n' + db.format_set_fields(
280 {x:
'$' + x
for x
in p}, raw_values=
True),
281 field_prefix=
'target.')
282 p[
'dpid'] = switch_id
284 db.log_query(
'SWITCH create', q, p)
288 switch_id = self.
payload[
'switch_id']
289 logger.info(
'Switch %s deactivation request', switch_id)
291 with graph.begin()
as tx:
292 flow_utils.precreate_switches(tx, switch_id)
294 q = (
'MATCH (target:switch {name: $dpid}) ' 295 'SET target.state="inactive"')
296 tx.run(q, {
'dpid': switch_id})
298 isl_utils.switch_unplug(tx, switch_id)
302 :return: Ideally, this should return true IFF discovery is deleted or deactivated. 305 switch_id = path[0][
'switch_id']
306 port = int(path[0][
'port_no'])
308 effective_policy = config.get(
"isl_failover_policy",
"effective_policy")
309 logger.info(
'Isl failure: %s_%d -- apply policy %s: timestamp=%s',
310 switch_id, port, effective_policy, self.
timestamp)
312 is_moved = self.
payload[
'state'] ==
'MOVED' 314 with graph.begin()
as tx:
315 updated = isl_utils.disable_by_endpoint(
317 updated.sort(key=
lambda x: (x.source, x.dest))
321 life_cycle = isl_utils.get_life_cycle_fields(tx, isl)
326 logger.error(
'There is no ISL on %s_%s', switch_id, port)
329 switch_id = self.
payload[
'switch_id']
330 port_id = int(self.
payload[
'port_no'])
332 logger.info(
'Port %s_%d deletion request: timestamp=%s',
336 with graph.begin()
as tx:
337 for isl
in isl_utils.disable_by_endpoint(
340 isl_utils.increase_cost(
342 config.ISL_COST_WHEN_PORT_DOWN,
343 config.ISL_COST_WHEN_PORT_DOWN)
344 isl_utils.increase_cost(
346 config.ISL_COST_WHEN_PORT_DOWN,
347 config.ISL_COST_WHEN_PORT_DOWN)
349 logger.info(
"There is no ISL on %s_%s", switch_id, port_id)
353 Two parts to creating an ISL: 354 (1) create the relationship itself 355 (2) add any link properties, if they exist. 357 NB: The query used for (2) is the same as in the TER 358 TODO: either share the same query as library in python, or handle in java 360 :return: success or failure (boolean) 363 latency = int(self.
payload[
'latency_ns'])
364 a_switch = path[0][
'switch_id']
365 a_port = int(path[0][
'port_no'])
366 b_switch = path[1][
'switch_id']
367 b_port = int(path[1][
'port_no'])
368 speed = int(self.
payload[
'speed'])
369 available_bandwidth = int(self.
payload[
'available_bandwidth'])
371 isl = model.InterSwitchLink.new_from_java(self.
payload)
372 isl.ensure_path_complete()
374 logger.info(
'ISL %s create request', isl)
375 with graph.begin()
as tx:
376 flow_utils.precreate_switches(
377 tx, isl.source.dpid, isl.dest.dpid)
378 isl_utils.create_if_missing(tx, self.
timestamp, isl)
379 isl_utils.set_props(tx, isl, {
382 'max_bandwidth': available_bandwidth,
385 isl_utils.update_status(tx, isl, mtime=self.
timestamp)
386 isl_utils.resolve_conflicts(tx, isl)
388 life_cycle = isl_utils.get_life_cycle_fields(tx, isl)
395 src_sw, src_pt, dst_sw, dst_pt = a_switch, a_port, b_switch, b_port
396 query =
'MATCH (src:switch)-[i:isl]->(dst:switch) ' 397 query +=
' WHERE i.src_switch = "%s" ' \
398 ' AND i.src_port = %s ' \
399 ' AND i.dst_switch = "%s" ' \
400 ' AND i.dst_port = %s ' % (src_sw, src_pt, dst_sw, dst_pt)
401 query +=
' MATCH (lp:link_props) ' 402 query +=
' WHERE lp.src_switch = "%s" ' \
403 ' AND lp.src_port = %s ' \
404 ' AND lp.dst_switch = "%s" ' \
405 ' AND lp.dst_port = %s ' % (src_sw, src_pt, dst_sw, dst_pt)
406 query +=
' SET i += lp ' 412 flow_utils.update_isl_bandwidth(src_sw, src_pt, dst_sw, dst_pt)
414 logger.info(
'ISL %s have been created/updated', isl)
421 if self.
payload[
'state'] !=
"DISCOVERED":
423 if not features_status[FEATURE_REROUTE_ON_ISL_DISCOVERY]:
430 'clazz': MT_NETWORK_TOPOLOGY_CHANGE,
431 'type':
'ENDPOINT_ADD',
432 'switch_id': node[
'switch_id'],
433 'port_number': node[
'port_no']}
435 message_utils.send_cache_message(
439 def create_flow(flow_id, flow, correlation_id, tx, propagate=True, from_nb=False):
441 :param propagate: If true, send to switch 442 :param from_nb: If true, send response to NORTHBOUND API; otherwise to FLOW_TOPOLOGY 447 rules = flow_utils.build_rules(flow)
449 logger.info(
'Flow rules were built: correlation_id=%s, flow_id=%s',
450 correlation_id, flow_id)
452 flow_utils.store_flow(flow, tx)
454 logger.info(
'Flow was stored: correlation_id=%s, flow_id=%s',
455 correlation_id, flow_id)
458 message_utils.send_install_commands(rules, correlation_id)
459 logger.info(
'Flow rules INSTALLED: correlation_id=%s, flow_id=%s', correlation_id, flow_id)
462 message_utils.send_info_message({
'payload': flow,
'clazz': MT_FLOW_RESPONSE}, correlation_id)
465 logger.info(
'Flow rules NOT PROPAGATED: correlation_id=%s, flow_id=%s', correlation_id, flow_id)
466 data = {
"payload":{
"flowid": flow_id,
"status":
"UP"},
467 "clazz": message_utils.MT_INFO_FLOW_STATUS}
468 message_utils.send_to_topic(
470 correlation_id=correlation_id,
471 message_type=message_utils.MT_INFO,
472 destination=
"NORTHBOUND",
473 topic=config.KAFKA_NORTHBOUND_TOPIC
476 except Exception
as e:
477 logger.exception(
'Can not create flow: %s', flow_id)
480 message_utils.send_error_message(correlation_id,
"CREATION_FAILURE", e.message, flow_id)
483 message_utils.send_error_message(correlation_id,
"PUSH_FAILURE", e.message, flow_id,
484 destination=
"NORTHBOUND", topic=config.KAFKA_NORTHBOUND_TOPIC)
490 def delete_flow(flow_id, flow, correlation_id, parent_tx=None, propagate=True, from_nb=False):
492 Simple algorithm - delete the stuff in the DB, send delete commands, send a response. 493 Complexity - each segment in the path may have a separate cookie, so that information needs to be gathered. 494 NB: Each switch in the flow should get a delete command. 496 # TODO: eliminate flowpath as part of delete_flow request; rely on flow_id only 497 # TODO: Add state to flow .. ie "DELETING", as part of refactoring project to add state 498 - eg: flow_utils.update_state(flow, DELETING, parent_tx) 500 :param parent_tx: If there is a larger transaction to use, then use it. 501 :return: True, unless an exception is raised. 507 flow_cookie = int(flow[
'cookie'])
508 transit_vlan = int(flow[
'transit_vlan'])
510 current_node = {
'switch_id': flow[
'src_switch'],
'flow_id': flow_id,
'cookie': flow_cookie,
511 'meter_id': flow[
'meter_id'],
'in_port': flow[
'src_port'],
'in_vlan': flow[
'src_vlan']}
512 nodes = [current_node]
514 segments = flow_utils.fetch_flow_segments(flow_id, flow_cookie)
515 for segment
in segments:
516 current_node[
'out_port'] = segment[
'src_port']
519 segment_cookie = segment.get(
'cookie', flow_cookie)
520 current_node = {
'switch_id': segment[
'dst_switch'],
'flow_id': flow_id,
'cookie': segment_cookie,
521 'meter_id':
None,
'in_port': segment[
'dst_port'],
'in_vlan': transit_vlan,
522 'out_port': segment[
'dst_port']}
523 nodes.append(current_node)
525 current_node[
'out_port'] = flow[
'dst_port']
528 logger.info(
'Flow rules remove start: correlation_id=%s, flow_id=%s, path=%s', correlation_id, flow_id,
530 message_utils.send_delete_commands(nodes, correlation_id)
531 logger.info(
'Flow rules removed end : correlation_id=%s, flow_id=%s', correlation_id, flow_id)
535 logger.info(
'Flow rules from NB: correlation_id=%s, flow_id=%s', correlation_id, flow_id)
536 data = {
"payload":{
"flowid": flow_id,
"status":
"DOWN"},
537 "clazz": message_utils.MT_INFO_FLOW_STATUS}
538 message_utils.send_to_topic(
540 correlation_id=correlation_id,
541 message_type=message_utils.MT_INFO,
542 destination=
"NORTHBOUND",
543 topic=config.KAFKA_NORTHBOUND_TOPIC
546 flow_utils.remove_flow(flow, parent_tx)
548 logger.info(
'Flow was removed: correlation_id=%s, flow_id=%s', correlation_id, flow_id)
550 except Exception
as e:
551 logger.exception(
'Can not delete flow: %s', e.message)
554 message_utils.send_error_message(correlation_id,
"DELETION_FAILURE", e.message, flow_id)
557 message_utils.send_error_message( correlation_id,
"UNPUSH_FAILURE", e.message, flow_id,
558 destination=
"NORTHBOUND", topic=config.KAFKA_NORTHBOUND_TOPIC)
566 old_flow = flow_utils.get_old_flow(flow)
571 logger.info(
'Flow rules were built: correlation_id=%s, flow_id=%s', correlation_id, flow_id)
572 rules = flow_utils.build_rules(flow)
574 flow_utils.store_flow(flow, tx)
575 logger.info(
'Flow was stored: correlation_id=%s, flow_id=%s', correlation_id, flow_id)
576 message_utils.send_install_commands(rules, correlation_id)
578 MessageItem.delete_flow(old_flow[
'flowid'], old_flow, correlation_id, tx)
580 payload = {
'payload': flow,
'clazz': MT_FLOW_RESPONSE}
581 message_utils.send_info_message(payload, correlation_id)
583 except Exception
as e:
584 logger.exception(
'Can not update flow: %s', e.message)
585 message_utils.send_error_message(
586 correlation_id,
"UPDATE_FAILURE", e.message, flow_id)
592 op = self.
payload[
'operation'].upper()
598 if op ==
"CREATE" and features_status[FEATURE_CREATE_FLOW]:
600 if op ==
"PUSH" and features_status[FEATURE_PUSH_FLOW]:
602 if op ==
"PUSH_PROPAGATE" and features_status[FEATURE_CREATE_FLOW]:
604 if op ==
"DELETE" and features_status[FEATURE_DELETE_FLOW]:
606 if op ==
"UNPUSH" and features_status[FEATURE_UNPUSH_FLOW]:
608 if op ==
"UNPUSH_PROPAGATE" and features_status[FEATURE_DELETE_FLOW]:
610 if op ==
"UPDATE" and features_status[FEATURE_UPDATE_FLOW]:
620 operation = payload[
'operation']
621 flows = payload[
'payload']
622 forward = flows[
'forward']
623 reverse = flows[
'reverse']
624 flow_id = forward[
'flowid']
627 logger.info(
'Flow %s request is not allow: ' 628 'timestamp=%s, correlation_id=%s, payload=%s',
629 operation, timestamp, correlation_id, payload)
632 op = payload[
'operation'].upper()
633 if op ==
"PUSH" or op ==
"PUSH_PROPAGATE" or op ==
"UNPUSH" or op ==
"UNPUSH_PROPAGATE":
634 message_utils.send_error_message(
635 correlation_id,
'REQUEST_INVALID', op+
"-FAILURE - NOT ALLOWED RIGHT NOW - Toggle the feature to allow this behavior",
"",
636 destination=
"NORTHBOUND", topic=config.KAFKA_NORTHBOUND_TOPIC)
640 logger.info(
'Flow %s request processing: ' 641 'timestamp=%s, correlation_id=%s, payload=%s',
642 operation, timestamp, correlation_id, payload)
646 neo4j_update_lock.acquire()
648 OP = operation.upper()
649 if OP ==
"CREATE" or OP ==
"PUSH" or OP ==
"PUSH_PROPAGATE":
650 propagate = (OP ==
"CREATE" or OP ==
"PUSH_PROPAGATE")
651 from_nb = (OP ==
"PUSH" or OP ==
"PUSH_PROPAGATE")
653 self.
create_flow(flow_id, forward, correlation_id, tx, propagate, from_nb)
654 self.
create_flow(flow_id, reverse, correlation_id, tx, propagate, from_nb)
658 elif OP ==
"DELETE" or OP ==
"UNPUSH" or OP ==
"UNPUSH_PROPAGATE":
660 propagate = (OP ==
"DELETE" or OP ==
"UNPUSH_PROPAGATE")
661 from_nb = (OP ==
"UNPUSH" or OP ==
"UNPUSH_PROPAGATE")
662 MessageItem.delete_flow(flow_id, forward, correlation_id, tx, propagate, from_nb)
663 MessageItem.delete_flow(flow_id, reverse, correlation_id, tx, propagate, from_nb)
665 message_utils.send_info_message({
'payload': forward,
'clazz': MT_FLOW_RESPONSE}, correlation_id)
666 message_utils.send_info_message({
'payload': reverse,
'clazz': MT_FLOW_RESPONSE}, correlation_id)
672 MessageItem.update_flow(flow_id, forward, correlation_id, tx)
673 MessageItem.update_flow(flow_id, reverse, correlation_id, tx)
678 logger.warn(
'Flow operation is not supported: ' 679 'operation=%s, timestamp=%s, correlation_id=%s,',
680 operation, timestamp, correlation_id)
689 neo4j_update_lock.release()
691 logger.info(
'Flow %s request processed: ' 692 'timestamp=%s, correlation_id=%s, payload=%s',
693 operation, timestamp, correlation_id, payload)
700 :return: an unsorted list of ISL relationships with all properties pulled from the db if pull=True 705 rels = graph.match(rel_type=
"isl")
712 isls = sorted(isls, key=
lambda x: x[sort_key])
715 except Exception
as e:
716 logger.exception(
'FAILED to get ISLs from the DB ', e.message)
723 'flows': flow_utils.get_flows(),
725 message_utils.send_to_topic(
727 destination=
"WFM_FLOW_LCM", topic=config.KAFKA_FLOW_TOPIC)
730 payload = message_utils.make_features_status_response()
731 for feature, status
in features_status.items():
732 transport_key = features_status_app_to_transport_map[feature]
733 setattr(payload, transport_key, status)
736 message_type=message_utils.MT_INFO,
737 destination=
"NORTHBOUND",
738 topic=config.KAFKA_NORTHBOUND_TOPIC)
742 for transport_key
in features_status_transport_to_app_map:
743 app_key = features_status_transport_to_app_map[transport_key]
745 status = self.
payload[transport_key]
749 current = features_status[app_key]
751 'Set feature %s status to %s, previous value %s',
752 app_key, status, current)
753 features_status[app_key] = status
760 diff = flow_utils.validate_switch_rules(self.
payload[
'switch_id'],
762 message_utils.send_validation_rules_response(diff[
"missing_rules"],
763 diff[
"excess_rules"],
764 diff[
"proper_rules"],
769 switch_id = self.
payload[
'switch_id']
771 diff = flow_utils.validate_switch_rules(switch_id,
773 sync_actions = flow_utils.build_commands_to_sync_rules(switch_id,
774 diff[
"missing_rules"])
775 commands = sync_actions[
"commands"]
777 logger.info(
'Install commands for switch %s are to be sent: %s',
779 message_utils.send_force_install_commands(switch_id, commands,
785 switch_id = self.
payload[
'switch_id']
786 rules_to_sync = self.
payload[
'rules']
788 logger.debug(
'Switch rules synchronization for rules: %s', rules_to_sync)
790 sync_actions = flow_utils.build_commands_to_sync_rules(switch_id,
792 commands = sync_actions[
"commands"]
794 logger.info(
'Install commands for switch %s are to be sent: %s',
796 message_utils.send_force_install_commands(switch_id, commands,
799 message_utils.send_sync_rules_response(sync_actions[
"installed_rules"],
804 message_utils.send_dump_rules_request(self.
payload[
'switch_id'],
810 (
'time_create', life_cycle.ctime),
811 (
'time_modify', life_cycle.mtime)):
814 self.
payload[key] = value.as_java_timestamp()
818 protected = link_props.extract_protected_props()
821 link_props,
'property(es) %s is can\'t be changed'.
format(
822 ', '.join(repr(x)
for x
in sorted(protected))))
824 with graph.begin()
as tx:
825 link_props_utils.create_if_missing(tx, link_props)
826 link_props_utils.set_props_and_propagate_to_isl(tx, link_props)
828 actual_link_props = link_props_utils.read(tx, link_props)
830 payload = message_utils.make_link_props_response(
831 self.
payload, actual_link_props)
832 message_utils.send_link_props_response(payload, self.
correlation_id)
836 with graph.begin()
as tx:
837 removed_records = link_props_utils.drop_by_mask(tx, lookup_mask)
838 for link_props
in removed_records:
839 isl = model.InterSwitchLink.new_from_link_props(link_props)
840 isl_utils.del_props(tx, isl, link_props.props)
843 message_utils.make_link_props_response(self.
payload, x)
844 for x
in removed_records]
845 message_utils.send_link_props_chunked_response(
848 def _unpack_link_props(self, key='link_props'):
850 link_props = model.LinkProps.new_from_java(
852 except (KeyError, ValueError, TypeError)
as e:
def link_props_drop(self)
def _unpack_link_props(self, key='link_props')
def update_feature_toggles(self)
def activate_switch(self)
def validate_and_sync_switch_rules(self)
def handle_flow_topology_sync(self)
def get_message_type(self)
def fetch_isls(pull=True, sort_key='src_switch')
def delete_flow(flow_id, flow, correlation_id, parent_tx=None, propagate=True, from_nb=False)
def handle_link_props(self)
def __init__(self, message)
def validate_switch_rules(self)
def sync_switch_rules(self)
def not_allow_flow_operation(self)
def create_flow(flow_id, flow, correlation_id, tx, propagate=True, from_nb=False)
def handle_topology_change(self)
def get_feature_toggle_state(self)
def update_payload_lifecycle(self, life_cycle)
def send_dump_rules_request(self)
def isl_discovery_failed(self)
def update_flow(flow_id, flow, correlation_id, tx)