16 from flask
import Flask, flash, redirect, render_template, request, session, abort, url_for, Response, jsonify
17 from flask_login
import LoginManager, UserMixin, login_required, login_user, logout_user, current_user
19 from app
import application
31 from kafka
import KafkaConsumer, KafkaProducer
33 from .
import neo4j_tools
36 config = ConfigParser.RawConfigParser()
37 config.read(
'topology_engine_rest.ini')
39 group = config.get(
'kafka',
'consumer.group')
40 topic = config.get(
'kafka',
'kafka.topic.flow')
43 environment_naming_prefix = config.get(
'kafka',
44 'environment.naming.prefix')
45 if environment_naming_prefix.strip():
46 group =
'_'.join([environment_naming_prefix, group])
47 topic =
'_'.join([environment_naming_prefix, topic])
48 except ConfigParser.NoOptionError:
51 bootstrap_servers_property = config.get(
'kafka',
'bootstrap.servers')
52 bootstrap_servers = [x.strip()
for x
in bootstrap_servers_property.split(
',')]
54 neo4j_connect = neo4j_tools.connect(config)
59 return json.dumps(self, default=
lambda o: o.__dict__, sort_keys=
False, indent=4)
63 return json.dumps(self, default=
lambda o: o.__dict__, sort_keys=
False, indent=4)
66 def build_ingress_flow(expandedRelationships, src_switch, src_port, src_vlan, bandwidth, transit_vlan, flow_id, outputAction):
68 for relationship
in expandedRelationships:
69 if relationship[
'data'][
'src_switch'] == src_switch:
70 action = relationship[
'data'][
'src_port']
72 flow.command =
"install_ingress_flow" 73 flow.destination =
"CONTROLLER" 75 flow.switch_id = src_switch
76 flow.input_port= int(src_port)
77 flow.output_port = action
78 flow.input_vlan_id = int(src_vlan)
79 flow.transit_vlan_id = int(transit_vlan)
80 flow.output_vlan_type = outputAction
81 flow.bandwidth = bandwidth
86 def build_egress_flow(expandedRelationships, dst_switch, dst_port, dst_vlan, transit_vlan, flow_id, outputAction):
88 for relationship
in expandedRelationships:
89 if relationship[
'data'][
'dst_switch'] == dst_switch:
90 match = relationship[
'data'][
'dst_port']
92 flow.command =
"install_egress_flow" 93 flow.destination =
"CONTROLLER" 95 flow.switch_id = dst_switch
96 flow.input_port = int(match)
97 flow.output_port = int(dst_port)
98 flow.transit_vlan_id = int(transit_vlan)
99 flow.output_vlan_id = int(dst_vlan)
100 flow.output_vlan_type = outputAction
106 match = expandedRelationships[i][
'data'][
'dst_port']
107 action = expandedRelationships[i+1][
'data'][
'src_port']
108 switch = expandedRelationships[i][
'data'][
'dst_switch']
110 flow.command =
"install_transit_flow" 111 flow.destination =
"CONTROLLER" 112 flow.cookie = flow_id
113 flow.switch_id = switch
114 flow.input_port = int(match)
115 flow.output_port = int(action)
116 flow.transit_vlan_id = int(transit_vlan)
122 flow.command =
"install_one_switch_flow" 123 flow.destination =
"CONTROLLER" 124 flow.cookie = flow_id
125 flow.switch_id = switch
126 flow.input_port = int(src_port)
127 flow.output_port = int(dst_port)
128 flow.input_vlan_id = int(src_vlan)
129 flow.output_vlan_id = int(dst_vlan)
130 flow.bandwidth = bandwidth
133 flow.output_vlan_type = outputAction
138 flow.command =
"delete_flow" 139 flow.destination =
"CONTROLLER" 140 flow.cookie = flow_id
141 flow.switch_id = switch
145 fullRelationships = []
146 for relationship
in relationships:
147 fullRelationships.append(json.loads((requests.get(relationship, auth=(
'neo4j',
'temppass'))).text))
148 return fullRelationships
152 "MATCH (a:switch{{name:'{}'}}),(b:switch{{name:'{}'}}), p = shortestPath((a)-[:isl*..100]->(b)) where ALL(x in nodes(p) " 153 "WHERE x.state = 'active') " 154 "RETURN p").
format(src_switch, dst_switch)
155 match = neo4j_connect.run(query)
157 return match[0][
'relationships']
162 return random.randrange(99, 4000,1)
174 if not input_vlan_id
or input_vlan_id == 0:
175 if not output_vlan_id
or output_vlan_id == 0:
176 output_action_type =
"NONE" 178 output_action_type =
"PUSH" 180 if not output_vlan_id
or output_vlan_id == 0:
181 output_action_type =
"POP" 183 output_action_type =
"REPLACE" 184 return output_action_type
189 return [[
build_one_switch_flow(src_switch, src_port, src_vlan, dst_port, dst_vlan, bandwidth, flow_id, forwardOutputAction)],
190 [
build_one_switch_flow(dst_switch, dst_port, dst_vlan, src_port, src_vlan, bandwidth, flow_id, reverseOutputAction)]]
193 bandwidth, transit_vlan, flow_id):
199 flows.append(
build_ingress_flow(expandedRelationships, src_switch, src_port, src_vlan, bandwidth, transit_vlan, flow_id, outputAction))
200 intermediateFlowCount = len(expandedRelationships) - 1
202 while i < intermediateFlowCount:
205 flows.append(
build_egress_flow(expandedRelationships, dst_switch, dst_port, dst_vlan, transit_vlan, flow_id, outputAction))
211 @application.route(
'/api/v1/health-check', methods=[
"GET"])
213 return '{"status": "ok"}' 215 @application.route(
'/api/v1/flow/<flowid>', methods=[
"GET",
"DELETE"])
218 query =
"MATCH (a:switch)-[r:flow {{flowid: '{}'}}]->(b:switch) {} r" 220 if request.method ==
'GET':
221 result = neo4j_connect.run(query.format(flowid,
"return")).
data()
222 if request.method ==
'DELETE':
223 producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
224 switches = neo4j_connect.run(
"MATCH (a:switch)-[r:flow {{flowid: '{}'}}]->(b:switch) return r.flowpath limit 1".
format(flowid)).evaluate()
225 for switch
in switches:
228 message.type =
"COMMAND" 229 message.timestamp = 42
230 kafkamessage = b
'{}'.
format(message.toJSON())
231 print 'topic: {}, message: {}'.
format(topic, kafkamessage)
232 messageresult = producer.send(topic, kafkamessage)
233 messageresult.get(timeout=5)
234 result = neo4j_connect.run(query.format(flowid,
"delete")).
data()
235 return json.dumps(result)
238 @application.route(
'/api/v1/flow', methods=[
"POST"])
241 if request.method ==
'POST':
242 producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
243 content = json.loads(
'{}'.
format(request.data))
246 if content[
'src_switch'] == content[
'dst_switch']:
248 content[
'src_switch'], content[
'src_port'], content[
'src_vlan'],
249 content[
'dst_switch'], content[
'dst_port'], content[
'dst_vlan'],
250 content[
'bandwidth'], flowID)
254 content[
'src_switch'], content[
'src_port'], content[
'src_vlan'],
255 content[
'dst_switch'], content[
'dst_port'], content[
'dst_vlan'],
256 content[
'bandwidth'], transitVlanForward, flowID)
260 content[
'dst_switch'], content[
'dst_port'], content[
'dst_vlan'],
261 content[
'src_switch'], content[
'src_port'], content[
'src_vlan'],
262 content[
'bandwidth'], transitVlanReturn, flowID)
264 allflows = [forwardFlows, reverseFlows]
266 if not forwardFlows
or not reverseFlows:
267 response = {
"result":
"failed",
"message":
"unable to find valid path in the network"}
268 return json.dumps(response)
270 forwardFlowSwitches = [str(f.switch_id)
for f
in forwardFlows]
271 reverseFlowSwitches = [str(f.switch_id)
for f
in reverseFlows]
274 for flows
in allflows:
278 message.type =
"COMMAND" 279 message.timestamp = 42
280 kafkamessage = b
'{}'.
format(message.toJSON())
281 print 'topic: {}, message: {}'.
format(topic, kafkamessage)
282 messageresult = producer.send(topic, kafkamessage)
283 result = messageresult.get(timeout=5)
285 a_switchNode = neo4j_connect.find_one(
'switch', property_key=
'name', property_value=
'{}'.
format(content[
'src_switch']))
286 b_switchNode = neo4j_connect.find_one(
'switch', property_key=
'name', property_value=
'{}'.
format(content[
'dst_switch']))
288 if not a_switchNode
or not b_switchNode:
289 return '{"result": "failed"}' 291 pathQuery =
"MATCH (u:switch {{name:'{}'}}), (r:switch {{name:'{}'}}) MERGE (u)-[:flow {{flowid:'{}', src_port: '{}', dst_port: '{}', src_switch: '{}', dst_switch: '{}', flowpath: {}}}]->(r)" 293 pathForwardQuery = pathQuery.format(a_switchNode[
'name'], b_switchNode[
'name'], flowID, content[
'src_port'], content[
'dst_port'], content[
'src_switch'], content[
'dst_switch'], str(forwardFlowSwitches))
294 pathReverseQuery = pathQuery.format(b_switchNode[
'name'], a_switchNode[
'name'], flowID, content[
'dst_port'], content[
'src_port'], content[
'dst_switch'], content[
'src_switch'], str(reverseFlowSwitches))
296 neo4j_connect.run(pathForwardQuery)
297 neo4j_connect.run(pathReverseQuery)
299 response = {
"result":
"sucessful",
"flowID": flowID}
300 return json.dumps(response)
303 @application.route(
'/api/v1/push/flows', methods=[
"PUT"])
306 return jsonify(successes=0, failures=0, messages=[
"come back later"])
def build_intermediate_flows(expandedRelationships, transit_vlan, i, flow_id)
def build_delete_flow(switch, flow_id)
def api_v1_topology_get_path(src_switch, src_port, src_vlan, dst_switch, dst_port, dst_vlan, bandwidth, transit_vlan, flow_id)
def api_v1_topology_get_one_switch_flows(src_switch, src_port, src_vlan, dst_switch, dst_port, dst_vlan, bandwidth, flow_id)
def get_relationships(src_switch, src_port, dst_switch, dst_port)
def assign_transit_vlan()
def choose_output_action(input_vlan_id, output_vlan_id)
def api_v1_health_check()
def build_egress_flow(expandedRelationships, dst_switch, dst_port, dst_vlan, transit_vlan, flow_id, outputAction)
def build_ingress_flow(expandedRelationships, src_switch, src_port, src_vlan, bandwidth, transit_vlan, flow_id, outputAction)
def expand_relationships(relationships)
def build_one_switch_flow(switch, src_port, src_vlan, dst_port, dst_vlan, bandwidth, flow_id, outputAction)