31 graph = db.create_p2n_driver()
32 logger = logging.getLogger(__name__)
34 default_rules = [
'0x8000000000000001',
'0x8000000000000002',
38 cookie_flag_forward = 0x4000000000000000
39 cookie_flag_reverse = 0x2000000000000000
45 if cookie & 0xE000000000000000:
46 is_match = cookie & cookie_flag_forward
48 is_match = (cookie & 0x0080000000000000) == 0
55 if cookie & 0xE000000000000000:
56 is_match = cookie & cookie_flag_reverse
58 is_match = (cookie & 0x0080000000000000) != 0
64 ((cookie ^ 0xffffffffffffffff) + 1) * -1
if cookie < 0
else cookie)
65 if value.endswith(
"L"):
76 if not int(input_vlan_id):
77 return "PUSH" if int(output_vlan_id)
else "NONE" 78 return "REPLACE" if int(output_vlan_id)
else "POP" 82 bandwidth, flowid, cookie, meter_id, output_action,
85 message_utils.build_one_switch_flow(
86 src_switch, src_port, src_vlan, dst_port, dst_vlan,
87 bandwidth, flowid, output_action, cookie, meter_id)]
90 def get_rules(src_switch, src_port, src_vlan, dst_switch, dst_port, dst_vlan,
91 bandwidth, transit_vlan, flowid, cookie, flowpath, meter_id,
99 nodes = flowpath.get(
"path")
105 flows.append(message_utils.build_ingress_flow(
106 nodes, src_switch, src_port, src_vlan, bandwidth,
107 transit_vlan, flowid, output_action, cookie, meter_id))
109 for i
in range(1, len(nodes)-1, 2):
113 if src[
'switch_id'] != dst[
'switch_id']:
114 msg =
'Found non-paired node in the flowpath: {}'.
format(flowpath)
116 raise ValueError(msg)
118 segment_cookie = src.get(
'cookie', cookie)
120 flows.append(message_utils.build_intermediate_flows(
121 src[
'switch_id'], src[
'port_no'], dst[
'port_no'], transit_vlan, flowid,
125 egress_flow_cookie = cookie
127 egress_flow_cookie = nodes[-1].
get(
'cookie', cookie)
129 flows.append(message_utils.build_egress_flow(
130 nodes, dst_switch, dst_port, dst_vlan,
131 transit_vlan, flowid, output_action, egress_flow_cookie))
138 if flow[
'src_switch'] == flow[
'dst_switch']:
140 return get_rules(output_action=output_action, **flow)
145 Deletes the flow and its flow segments. Start with flow segments (symmetrical mirror of store_flow). 146 Leverage a parent transaction if it exists, otherwise create / close the transaction within this function. 148 - flowid **AND** cookie are *the* primary keys for a flow: 149 - both the forward and the reverse flow use the same flowid 151 NB: store_flow is used for uni-direction .. whereas flow_id is used both directions .. need cookie to differentiate 154 logger.info(
'Remove flow: %s', flow[
'flowid'])
155 tx = parent_tx
if parent_tx
else graph.begin()
157 query =
"MATCH (:switch)-[f:flow {{ flowid: '{}', cookie: {} }}]->(:switch) DELETE f".
format(flow[
'flowid'], flow[
'cookie'])
158 result = tx.run(query).
data()
166 This function focuses on just creating the starting/ending switch relationship for a flow. 170 " (src:switch {{name:'{src_switch}'}}) " 171 " ON CREATE SET src.state = 'inactive' " 173 " (dst:switch {{name:'{dst_switch}'}}) " 174 " ON CREATE SET dst.state = 'inactive' " 175 "MERGE (src)-[f:flow {{" 176 " flowid:'{flowid}', " 177 " cookie: {cookie} }} ]->(dst) " 179 " f.meter_id = {meter_id}, " 180 " f.bandwidth = {bandwidth}, " 181 " f.ignore_bandwidth = {ignore_bandwidth}, " 182 " f.src_port = {src_port}, " 183 " f.dst_port = {dst_port}, " 184 " f.src_switch = '{src_switch}', " 185 " f.dst_switch = '{dst_switch}', " 186 " f.src_vlan = {src_vlan}, " 187 " f.dst_vlan = {dst_vlan}, " 188 " f.transit_vlan = {transit_vlan}, " 189 " f.description = '{description}', " 190 " f.last_updated = '{last_updated}', " 191 " f.flowpath = '{flowpath}' " 193 flow_data[
'flowpath'].pop(
'clazz',
None)
194 flow_data[
'last_updated'] = calendar.timegm(time.gmtime())
195 flow_data[
'flowpath'] = json.dumps(flow_data[
'flowpath'])
197 tx.run(query.format(**flow_data))
199 graph.run(query.format(**flow_data))
204 This function creates each segment relationship in a flow, and then it calls the function to 205 update bandwidth. This should always be down when creating/merging flow segments. 207 To create segments, we leverages the flow path .. and the flow path is a series of nodes, where 208 each 2 nodes are the endpoints of an ISL. 210 flow = copy.deepcopy(_flow)
211 create_segment_query = (
213 "(src:switch {{name:'{src_switch}'}}) " 214 "ON CREATE SET src.state = 'inactive' " 216 "(dst:switch {{name:'{dst_switch}'}}) " 217 "ON CREATE SET dst.state = 'inactive' " 219 "(src)-[fs:flow_segment {{flowid: '{flowid}', parent_cookie: {parent_cookie} }}]->(dst) " 221 "fs.cookie = {cookie}, " 222 "fs.src_switch = '{src_switch}', " 223 "fs.src_port = {src_port}, " 224 "fs.dst_switch = '{dst_switch}', " 225 "fs.dst_port = {dst_port}, " 226 "fs.seq_id = {seq_id}, " 227 "fs.segment_latency = {segment_latency}, " 228 "fs.bandwidth = {bandwidth}, " 229 "fs.ignore_bandwidth = {ignore_bandwidth} " 233 flow_cookie = flow[
'cookie']
234 flow[
'parent_cookie'] = flow_cookie
235 logger.debug(
'MERGE Flow Segments : %s [path: %s]', flow[
'flowid'], flow_path)
237 for i
in range(0, len(flow_path), 2):
241 flow[
'src_switch'] = src[
'switch_id']
242 flow[
'src_port'] = src[
'port_no']
243 flow[
'seq_id'] = src[
'seq_id']
245 flow[
'segment_latency'] = src.get(
'segment_latency',
'NULL')
247 flow[
'dst_switch'] = dst[
'switch_id']
248 flow[
'dst_port'] = dst[
'port_no']
251 flow[
'cookie'] = dst.get(
'cookie', flow_cookie)
256 tx.run(create_segment_query.format(**flow))
258 graph.run(create_segment_query.format(**flow))
265 As commented elsewhere, current algorithm for flow path is to use both endpoints of a segment, each as their own 266 node. So, make sure we have an even number of them. 268 flow_path = flow[
'flowpath'][
'path']
269 if len(flow_path) % 2 != 0:
271 msg =
'Found un-even number of nodes in the flowpath: {}'.
format(flow_path)
273 raise ValueError(msg)
279 Whenever adjusting flow segments, always update available bandwidth. Even when creating a flow 280 where we might remove anything old and then create the new .. it isn't guaranteed that the 281 old segments are the same as the new segements.. so update bandwidth to be save. 284 flowid = flow[
'flowid']
285 parent_cookie = flow[
'cookie']
286 logger.debug(
'DELETE Flow Segments : flowid: %s parent_cookie: 0x%x [path: %s]', flowid, parent_cookie, flow_path)
287 delete_segment_query = (
288 "MATCH (:switch)-[fs:flow_segment {{ flowid: '{}', parent_cookie: {} }}]->(:switch) DELETE fs" 291 tx.run(delete_segment_query.format(flowid, parent_cookie))
293 graph.run(delete_segment_query.format(flowid, parent_cookie))
299 :param flowid: the ID for the entire flow, typically consistent across updates, whereas the cookie may change 300 :param parent_cookie: the cookie for the flow as a whole; individual segments may vary 301 :return: array of segments 304 "MATCH (:switch)-[fs:flow_segment {{ flowid: '{}',parent_cookie: {} }}]->(:switch) RETURN fs ORDER BY fs.seq_id" 307 result = graph.run(fetch_query.format(flowid, parent_cookie)).
data()
308 return [dict(x[
'fs'])
for x
in result]
313 logger.debug(
'Update ISL Bandwidth from Flow Segments : %s [path: %s]', flow[
'flowid'], flow_path)
316 for i
in range(0, len(flow_path), 2):
324 This will update the available_bandwidth for the isl that matches the src/dst information. 325 It does this by looking for all flow segments over the ISL, where ignore_bandwidth = false. 326 Because there may not be any segments, have to use "OPTIONAL MATCH" 330 available_bw_query = (
331 "MATCH (src:switch {{name:'{src_switch}'}}), (dst:switch {{name:'{dst_switch}'}}) WITH src,dst " 332 " MATCH (src)-[i:isl {{ src_port:{src_port}, dst_port: {dst_port}}}]->(dst) WITH src,dst,i " 333 " OPTIONAL MATCH (src)-[fs:flow_segment {{ src_port:{src_port}, dst_port: {dst_port}, ignore_bandwidth: false }}]->(dst) " 334 " WITH sum(fs.bandwidth) AS used_bandwidth, i as i " 335 " SET i.available_bandwidth = i.max_bandwidth - used_bandwidth " 338 logger.debug(
'Update ISL Bandwidth from %s:%d --> %s:%d' % (src_switch, src_port, dst_switch, dst_port))
340 'src_switch': src_switch,
341 'src_port': src_port,
342 'dst_switch': dst_switch,
343 'dst_port': dst_port,
345 query = available_bw_query.format(**params)
354 Create a :flow relationship between the starting and ending switch, as well as 355 create :flow_segment relationships between every switch in the path. 357 NB: store_flow is used for uni-direction .. whereas flow_id is used both directions .. need cookie to differentiate 360 :param tx: The transaction to use, or no transaction. 365 logger.debug(
'STORE Flow : %s', flow[
'flowid'])
373 :param one_row: The typical result from query - ie MATCH (a:switch)-[r:flow]->(b:switch) RETURN r 374 :return: a fully dict'd object 376 path = json.loads(one_row[
'r']['flowpath'])
377 flow = json.loads(json.dumps(one_row[
'r'], 378 default=lambda o: o.__dict__,
380 path.setdefault(
'clazz',
'org.openkilda.messaging.info.event.PathInfoData')
381 flow[
'flowpath'] = path
387 "MATCH (a:switch)-[r:flow {{flowid: '{}'}}]->(b:switch) " 388 " WHERE r.cookie <> {} RETURN r " 390 old_flows = graph.run(query.format(
391 new_flow[
'flowid'], int(new_flow[
'cookie']))).
data()
394 message =
'Flow {} not found'.
format(new_flow[
'flowid'])
395 logger.error(message)
397 raise Exception(message)
399 logger.info(
'Flows were found: %s', old_flows)
401 for data
in old_flows:
403 logger.info(
'check cookies: %s ? %s',
404 new_flow[
'cookie'], old_flow[
'cookie'])
406 logger.info(
'Flow was found: flow=%s', old_flow)
407 return dict(old_flow)
411 'Requested flow {}(cookie={}) don\'t found corresponding flow (with ' 412 'matching direction in Neo4j)'.
format(
413 new_flow[
'flowid'], new_flow[
'cookie']))
419 query =
"MATCH (a:switch)-[r:flow]->(b:switch) RETURN r" 421 result = graph.run(query).
data()
425 flow[
'state'] =
'CACHED' 426 flow_pair = flows.get(flow[
'flowid'], {})
428 flow_pair[
'forward'] = flow
430 flow_pair[
'reverse'] = flow
431 flows[flow[
'flowid']] = flow_pair
433 logger.info(
'Got flows: %s', flows.values())
434 except Exception
as e:
435 logger.exception(
'"Can not get flows: %s', e.message)
437 return flows.values()
441 switches = [x.lower()
for x
in nodes]
444 for dpid
in switches:
446 "MERGE (sw:switch {{name:'{}'}}) " 447 "ON CREATE SET sw.state = 'inactive' " 448 "ON MATCH SET sw.tx_override_workaround = 'dummy'").
format(dpid)
449 logger.info(
'neo4j-query: %s', q)
454 query =
"MATCH p = (:switch)-[fs:flow_segment]->(sw:switch) " \
455 "WHERE sw.name='{}' " \
457 result = graph.run(query.format(switch_id)).
data()
462 for relationship
in result:
463 segments.append(relationship[
'fs'])
465 logger.debug(
'Found segments for switch %s: %s', switch_id, segments)
471 query =
"MATCH (sw:switch)-[r:flow]->(:switch) " \
472 "WHERE sw.name='{}' RETURN r" 473 result = graph.run(query.format(switch_id)).
data()
479 logger.debug(
'Found flows for switch %s: %s', switch_id, flows)
486 Perform validation of provided rules against the switch flows. 489 switch_cookies = [x[
'cookie']
for x
in switch_rules]
492 missing_rules = set()
498 for segment
in flow_segments:
499 cookie = segment.get(
'cookie', segment[
'parent_cookie'])
501 if cookie
not in switch_cookies:
502 logger.warn(
'Rule %s is not found on switch %s', cookie, switch_id)
503 missing_rules.add(cookie)
505 proper_rules.add(cookie)
509 for flow
in ingress_flows:
510 cookie = flow[
'cookie']
512 if cookie
not in switch_cookies:
513 logger.warn(
"Ingress or one-switch flow %s is missing on switch %s", cookie, switch_id)
514 missing_rules.add(cookie)
516 proper_rules.add(cookie)
519 for cookie
in switch_cookies:
520 if cookie
not in proper_rules
and cookie_to_hex(cookie)
not in default_rules:
521 logger.warn(
'Rule %s is excessive on the switch %s', cookie, switch_id)
522 excess_rules.add(cookie)
526 level = logging.ERROR
531 "missing_rules": missing_rules,
532 "excess_rules": excess_rules,
533 "proper_rules": proper_rules}
535 logger.log(level,
'Switch %s rules validation result: %s', switch_id, diff)
542 Build install commands to sync provided rules with the switch flows. 545 installed_rules = set()
549 for segment
in flow_segments:
550 cookie = segment.get(
'cookie', segment[
'parent_cookie'])
552 if cookie
in switch_rules:
553 logger.info(
'Rule %s is to be (re)installed on switch %s', cookie, switch_id)
554 installed_rules.add(cookie)
558 for flow
in ingress_flows:
559 cookie = flow[
'cookie']
561 if cookie
in switch_rules:
562 installed_rules.add(cookie)
567 if flow[
'src_switch'] == flow[
'dst_switch']:
568 logger.info(
"One-switch flow %s is to be (re)installed on switch %s", cookie, switch_id)
569 commands.append(message_utils.build_one_switch_flow_from_db(switch_id, flow, output_action))
571 logger.info(
"Ingress flow %s is to be (re)installed on switch %s", cookie, switch_id)
572 commands.append(message_utils.build_ingress_flow_from_db(flow, output_action))
574 return {
"commands": commands,
"installed_rules": installed_rules}
579 Build a command to install required rules for the segment destination. 583 if segment[
'src_switch'] == segment[
'dst_switch']:
584 msg =
'One-switch flow segment {} is provided.'.
format(segment)
586 raise ValueError(msg)
588 parent_cookie = segment[
'parent_cookie']
589 flow_id = segment[
'flowid']
592 logger.error(
"Flow with id %s was not found, cookie %s",
593 flow_id, parent_cookie)
597 switch_id = segment[
'dst_switch']
598 segment_cookie = segment[
'cookie']
601 if switch_id == flow[
'dst_switch']:
602 yield message_utils.build_egress_flow_from_db(flow, output_action, segment_cookie)
604 in_port = segment[
'dst_port']
607 if paired_segment
is None:
608 msg =
'Paired segment for switch {} and cookie {} has not been found.'.
format(switch_id, parent_cookie)
610 raise ValueError(msg)
612 out_port = paired_segment[
'src_port']
614 yield message_utils.build_intermediate_flows(
615 switch_id, in_port, out_port, flow[
'transit_vlan'],
616 flow[
'flowid'], segment_cookie)
620 query =
"MATCH ()-[r:flow]->() WHERE r.flowid='{}' " \
621 "and r.cookie={} RETURN r" 622 result = graph.run(query.format(flow_id, cookie)).
data()
627 logger.debug(
'Found flow for id %s and cookie %s: %s', flow_id, cookie, flow)
632 query =
"MATCH p = (sw:switch)-[fs:flow_segment]->(:switch) " \
633 "WHERE sw.name='{}' AND fs.parent_cookie={} " \
635 result = graph.run(query.format(switch_id, parent_cookie)).
data()
639 segment = result[0][
'fs']
640 logger.debug(
'Found segment for switch %s and parent_cookie %s: %s', switch_id, parent_cookie, segment)
def get_rules(src_switch, src_port, src_vlan, dst_switch, dst_port, dst_vlan, bandwidth, transit_vlan, flowid, cookie, flowpath, meter_id, output_action, k)
def cookie_to_hex(cookie)
def merge_flow_relationship(flow_data, tx=None)
def delete_flow_segments(flow, tx=None)
def get_one_switch_rules(src_switch, src_port, src_vlan, dst_port, dst_vlan, bandwidth, flowid, cookie, meter_id, output_action, k)
def is_reverse_cookie(cookie)
def store_flow(flow, tx=None)
def update_isl_bandwidth(src_switch, src_port, dst_switch, dst_port, tx=None)
def is_same_direction(first, second)
def update_flow_segment_available_bw(flow, tx=None)
def precreate_switches(tx, nodes)
def choose_output_action(input_vlan_id, output_vlan_id)
def remove_flow(flow, parent_tx=None)
def is_forward_cookie(cookie)
def build_commands_to_sync_rules(switch_id, switch_rules)
def validate_switch_rules(switch_id, switch_rules)
def get_flow_by_id_and_cookie(flow_id, cookie)
def get_flows_by_src_switch(switch_id)
def merge_flow_segments(_flow, tx=None)
def fetch_flow_segments(flowid, parent_cookie)
def get_old_flow(new_flow)
def hydrate_flow(one_row)
def get_flow_segment_by_src_switch_and_cookie(switch_id, parent_cookie)
def build_install_command_from_segment(segment)
def get_flow_segments_by_dst_switch(switch_id)