Open Kilda Java Documentation
flows.py
Go to the documentation of this file.
1 # Copyright 2017 Telstra Open Source
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 #
15 
16 from flask import Flask, flash, redirect, render_template, request, session, abort, url_for, Response, jsonify
17 from flask_login import LoginManager, UserMixin, login_required, login_user, logout_user, current_user
18 
19 from app import application
20 from app import db
21 from app import utils
22 
23 import sys, os
24 import requests
25 import json
26 import random
27 import time
28 import uuid
29 import ConfigParser
30 
31 from kafka import KafkaConsumer, KafkaProducer
32 
33 from . import neo4j_tools
34 
35 
36 config = ConfigParser.RawConfigParser()
37 config.read('topology_engine_rest.ini')
38 
39 group = config.get('kafka', 'consumer.group')
40 topic = config.get('kafka', 'kafka.topic.flow')
41 
42 try:
43  environment_naming_prefix = config.get('kafka',
44  'environment.naming.prefix')
45  if environment_naming_prefix.strip():
46  group = '_'.join([environment_naming_prefix, group])
47  topic = '_'.join([environment_naming_prefix, topic])
48 except ConfigParser.NoOptionError:
49  pass
50 
51 bootstrap_servers_property = config.get('kafka', 'bootstrap.servers')
52 bootstrap_servers = [x.strip() for x in bootstrap_servers_property.split(',')]
53 
54 neo4j_connect = neo4j_tools.connect(config)
55 
56 
57 class Flow(object):
58  def toJSON(self):
59  return json.dumps(self, default=lambda o: o.__dict__, sort_keys=False, indent=4)
60 
61 class Message(object):
62  def toJSON(self):
63  return json.dumps(self, default=lambda o: o.__dict__, sort_keys=False, indent=4)
64 
65 
66 def build_ingress_flow(expandedRelationships, src_switch, src_port, src_vlan, bandwidth, transit_vlan, flow_id, outputAction):
67  match = src_port
68  for relationship in expandedRelationships:
69  if relationship['data']['src_switch'] == src_switch:
70  action = relationship['data']['src_port']
71  flow = Flow()
72  flow.command = "install_ingress_flow"
73  flow.destination = "CONTROLLER"
74  flow.cookie = flow_id
75  flow.switch_id = src_switch
76  flow.input_port= int(src_port)
77  flow.output_port = action
78  flow.input_vlan_id = int(src_vlan)
79  flow.transit_vlan_id = int(transit_vlan)
80  flow.output_vlan_type = outputAction
81  flow.bandwidth = bandwidth
82  flow.meter_id = assign_meter_id()
83  return flow
84 
85 
86 def build_egress_flow(expandedRelationships, dst_switch, dst_port, dst_vlan, transit_vlan, flow_id, outputAction):
87  action = dst_port
88  for relationship in expandedRelationships:
89  if relationship['data']['dst_switch'] == dst_switch:
90  match = relationship['data']['dst_port']
91  flow = Flow()
92  flow.command = "install_egress_flow"
93  flow.destination = "CONTROLLER"
94  flow.cookie = flow_id
95  flow.switch_id = dst_switch
96  flow.input_port = int(match)
97  flow.output_port = int(dst_port)
98  flow.transit_vlan_id = int(transit_vlan)
99  flow.output_vlan_id = int(dst_vlan)
100  flow.output_vlan_type = outputAction
101  return flow
102 
103 
104 def build_intermediate_flows(expandedRelationships, transit_vlan, i, flow_id):
105  # output action is always NONE for transit vlan id
106  match = expandedRelationships[i]['data']['dst_port']
107  action = expandedRelationships[i+1]['data']['src_port']
108  switch = expandedRelationships[i]['data']['dst_switch']
109  flow = Flow()
110  flow.command = "install_transit_flow"
111  flow.destination = "CONTROLLER"
112  flow.cookie = flow_id
113  flow.switch_id = switch
114  flow.input_port = int(match)
115  flow.output_port = int(action)
116  flow.transit_vlan_id = int(transit_vlan)
117  return flow
118 
119 
120 def build_one_switch_flow(switch, src_port, src_vlan, dst_port, dst_vlan, bandwidth, flow_id, outputAction):
121  flow = Flow()
122  flow.command = "install_one_switch_flow"
123  flow.destination = "CONTROLLER"
124  flow.cookie = flow_id
125  flow.switch_id = switch
126  flow.input_port = int(src_port)
127  flow.output_port = int(dst_port)
128  flow.input_vlan_id = int(src_vlan)
129  flow.output_vlan_id = int(dst_vlan)
130  flow.bandwidth = bandwidth
131  flow.input_meter_id = assign_meter_id()
132  flow.output_meter_id = assign_meter_id()
133  flow.output_vlan_type = outputAction
134  return flow
135 
136 def build_delete_flow(switch, flow_id):
137  flow = Flow()
138  flow.command = "delete_flow"
139  flow.destination = "CONTROLLER"
140  flow.cookie = flow_id
141  flow.switch_id = switch
142  return flow
143 
144 def expand_relationships(relationships):
145  fullRelationships = []
146  for relationship in relationships:
147  fullRelationships.append(json.loads((requests.get(relationship, auth=('neo4j', 'temppass'))).text))
148  return fullRelationships
149 
150 def get_relationships(src_switch, src_port, dst_switch, dst_port):
151  query = (
152  "MATCH (a:switch{{name:'{}'}}),(b:switch{{name:'{}'}}), p = shortestPath((a)-[:isl*..100]->(b)) where ALL(x in nodes(p) "
153  "WHERE x.state = 'active') "
154  "RETURN p").format(src_switch, dst_switch)
155  match = neo4j_connect.run(query)
156  if match:
157  return match[0]['relationships']
158  return []
159 
160 
162  return random.randrange(99, 4000,1)
163 
165  return "123"
166 
168  # zero means meter should not be actually installed on switch
169  # zero value should be used only for software switch based testing
170  return 0
171 
172 def choose_output_action(input_vlan_id, output_vlan_id):
173  # TODO: move to Storm Flow Topology
174  if not input_vlan_id or input_vlan_id == 0:
175  if not output_vlan_id or output_vlan_id == 0:
176  output_action_type = "NONE"
177  else:
178  output_action_type = "PUSH"
179  else:
180  if not output_vlan_id or output_vlan_id == 0:
181  output_action_type = "POP"
182  else:
183  output_action_type = "REPLACE"
184  return output_action_type
185 
186 def api_v1_topology_get_one_switch_flows(src_switch, src_port, src_vlan, dst_switch, dst_port, dst_vlan, bandwidth, flow_id):
187  forwardOutputAction = choose_output_action(int(src_vlan), int(dst_vlan))
188  reverseOutputAction = choose_output_action(int(dst_vlan), int(src_vlan))
189  return [[build_one_switch_flow(src_switch, src_port, src_vlan, dst_port, dst_vlan, bandwidth, flow_id, forwardOutputAction)],
190  [build_one_switch_flow(dst_switch, dst_port, dst_vlan, src_port, src_vlan, bandwidth, flow_id, reverseOutputAction)]]
191 
192 def api_v1_topology_get_path(src_switch, src_port, src_vlan, dst_switch, dst_port, dst_vlan,
193  bandwidth, transit_vlan, flow_id):
194  relationships = get_relationships(src_switch, src_port, dst_switch, dst_port)
195  outputAction = choose_output_action(int(src_vlan), int(dst_vlan))
196  if relationships:
197  expandedRelationships = expand_relationships(relationships)
198  flows = []
199  flows.append(build_ingress_flow(expandedRelationships, src_switch, src_port, src_vlan, bandwidth, transit_vlan, flow_id, outputAction))
200  intermediateFlowCount = len(expandedRelationships) - 1
201  i = 0
202  while i < intermediateFlowCount:
203  flows.append(build_intermediate_flows(expandedRelationships, transit_vlan, i, flow_id))
204  i += 1
205  flows.append(build_egress_flow(expandedRelationships, dst_switch, dst_port, dst_vlan, transit_vlan, flow_id, outputAction))
206  return flows
207  else:
208  return False
209 
210 
211 @application.route('/api/v1/health-check', methods=["GET"])
213  return '{"status": "ok"}'
214 
215 @application.route('/api/v1/flow/<flowid>', methods=["GET", "DELETE"])
216 #@login_required
217 def api_v1_flow(flowid):
218  query = "MATCH (a:switch)-[r:flow {{flowid: '{}'}}]->(b:switch) {} r"
219 
220  if request.method == 'GET':
221  result = neo4j_connect.run(query.format(flowid, "return")).data()
222  if request.method == 'DELETE':
223  producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
224  switches = neo4j_connect.run("MATCH (a:switch)-[r:flow {{flowid: '{}'}}]->(b:switch) return r.flowpath limit 1".format(flowid)).evaluate()
225  for switch in switches:
226  message = Message()
227  message.data = build_delete_flow(switch, str(flowid))
228  message.type = "COMMAND"
229  message.timestamp = 42
230  kafkamessage = b'{}'.format(message.toJSON())
231  print 'topic: {}, message: {}'.format(topic, kafkamessage)
232  messageresult = producer.send(topic, kafkamessage)
233  messageresult.get(timeout=5)
234  result = neo4j_connect.run(query.format(flowid, "delete")).data()
235  return json.dumps(result)
236 
237 
238 @application.route('/api/v1/flow', methods=["POST"])
239 #@login_required
241  if request.method == 'POST':
242  producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
243  content = json.loads('{}'.format(request.data))
244  flowID = assign_flow_id()
245 
246  if content['src_switch'] == content['dst_switch']:
248  content['src_switch'], content['src_port'], content['src_vlan'],
249  content['dst_switch'], content['dst_port'], content['dst_vlan'],
250  content['bandwidth'], flowID)
251  else:
252  transitVlanForward = assign_transit_vlan()
253  forwardFlows = api_v1_topology_get_path(
254  content['src_switch'], content['src_port'], content['src_vlan'],
255  content['dst_switch'], content['dst_port'], content['dst_vlan'],
256  content['bandwidth'], transitVlanForward, flowID)
257 
258  transitVlanReturn = assign_transit_vlan()
259  reverseFlows = api_v1_topology_get_path(
260  content['dst_switch'], content['dst_port'], content['dst_vlan'],
261  content['src_switch'], content['src_port'], content['src_vlan'],
262  content['bandwidth'], transitVlanReturn, flowID)
263 
264  allflows = [forwardFlows, reverseFlows]
265 
266  if not forwardFlows or not reverseFlows:
267  response = {"result": "failed", "message": "unable to find valid path in the network"}
268  return json.dumps(response)
269 
270  forwardFlowSwitches = [str(f.switch_id) for f in forwardFlows]
271  reverseFlowSwitches = [str(f.switch_id) for f in reverseFlows]
272 
273 
274  for flows in allflows:
275  for flow in flows:
276  message = Message()
277  message.data = flow
278  message.type = "COMMAND"
279  message.timestamp = 42
280  kafkamessage = b'{}'.format(message.toJSON())
281  print 'topic: {}, message: {}'.format(topic, kafkamessage)
282  messageresult = producer.send(topic, kafkamessage)
283  result = messageresult.get(timeout=5)
284 
285  a_switchNode = neo4j_connect.find_one('switch', property_key='name', property_value='{}'.format(content['src_switch']))
286  b_switchNode = neo4j_connect.find_one('switch', property_key='name', property_value='{}'.format(content['dst_switch']))
287 
288  if not a_switchNode or not b_switchNode:
289  return '{"result": "failed"}'
290 
291  pathQuery = "MATCH (u:switch {{name:'{}'}}), (r:switch {{name:'{}'}}) MERGE (u)-[:flow {{flowid:'{}', src_port: '{}', dst_port: '{}', src_switch: '{}', dst_switch: '{}', flowpath: {}}}]->(r)"
292 
293  pathForwardQuery = pathQuery.format(a_switchNode['name'], b_switchNode['name'], flowID, content['src_port'], content['dst_port'], content['src_switch'], content['dst_switch'], str(forwardFlowSwitches))
294  pathReverseQuery = pathQuery.format(b_switchNode['name'], a_switchNode['name'], flowID, content['dst_port'], content['src_port'], content['dst_switch'], content['src_switch'], str(reverseFlowSwitches))
295 
296  neo4j_connect.run(pathForwardQuery)
297  neo4j_connect.run(pathReverseQuery)
298 
299  response = {"result": "sucessful", "flowID": flowID}
300  return json.dumps(response)
301 
302 
303 @application.route('/api/v1/push/flows', methods=["PUT"])
304 #@login_required
306  return jsonify(successes=0, failures=0, messages=["come back later"])
307 
308 
def build_intermediate_flows(expandedRelationships, transit_vlan, i, flow_id)
Definition: flows.py:104
def toJSON(self)
Definition: flows.py:58
def build_delete_flow(switch, flow_id)
Definition: flows.py:136
def api_v1_topology_get_path(src_switch, src_port, src_vlan, dst_switch, dst_port, dst_vlan, bandwidth, transit_vlan, flow_id)
Definition: flows.py:193
def assign_meter_id()
Definition: flows.py:167
def api_v1_topology_get_one_switch_flows(src_switch, src_port, src_vlan, dst_switch, dst_port, dst_vlan, bandwidth, flow_id)
Definition: flows.py:186
def api_v1_push_flows()
Definition: flows.py:305
def get_relationships(src_switch, src_port, dst_switch, dst_port)
Definition: flows.py:150
def assign_transit_vlan()
Definition: flows.py:161
def toJSON(self)
Definition: flows.py:62
def choose_output_action(input_vlan_id, output_vlan_id)
Definition: flows.py:172
def api_v1_flow(flowid)
Definition: flows.py:217
def api_v1_health_check()
Definition: flows.py:212
def build_egress_flow(expandedRelationships, dst_switch, dst_port, dst_vlan, transit_vlan, flow_id, outputAction)
Definition: flows.py:86
def build_ingress_flow(expandedRelationships, src_switch, src_port, src_vlan, bandwidth, transit_vlan, flow_id, outputAction)
Definition: flows.py:66
def api_v1_create_flow()
Definition: flows.py:240
def expand_relationships(relationships)
Definition: flows.py:144
def build_one_switch_flow(switch, src_port, src_vlan, dst_port, dst_vlan, bandwidth, flow_id, outputAction)
Definition: flows.py:120
def assign_flow_id()
Definition: flows.py:164