Open Kilda Java Documentation
switch_replace_master.py
Go to the documentation of this file.
1 import json
2 
3 from py2neo import Graph, Path, remote
4 
5 
6 class KildaDBActions(object):
7 
8  def __init__(self, db_addr, user, passwd, port, secure, scheme):
9  self._db_addr = db_addr
10  self._user = user
11  self._passwd = passwd
12  self._port = port
13  self._secure = secure
14  self._scheme = scheme
15  self.graph = None
16 
17  self._get_db_connection()
18 
19  def _get_db_connection(self):
20  uri = "{}:{}/db/data".format(
21  self._scheme, self._db_addr, self._port)
22  self.graph = Graph(
23  uri,
24  user=self._user,
25  password=self._passwd,
26  secure=False,
27  http_port=self._port
28  )
29 
30 
31  def _find_all_affected_flow_segments(self, old_switch_id):
32  """
33  MATCH() -[rel:flow_segment]-> ()
34  WHERE rel.dst_switch = "{}" OR rel.src_switch = "{}"
35  RETURN rel;
36  """
37  query = """
38  MATCH() -[rel:flow_segment]-> ()
39  WHERE rel.dst_switch = "{sw_id}" OR rel.src_switch = "{sw_id}"
40  RETURN distinct rel;
41  """.format(sw_id=old_switch_id)
42  print query
43  return self.graph.data(query)
44 
45  def _find_all_affected_flows(self, old_switch_id, links):
46  """
47  MATCH() -[rel:flow]-> ()
48  WHERE rel.dst_switch = "{sw_id}" OR rel.src_switch = "{sw_id}"
49  OR rel.flowid IN [...]
50  RETURN rel;
51  """
52  query = """
53  MATCH() -[rel:flow]-> ()
54  WHERE rel.dst_switch = "{sw_id}" OR rel.src_switch = "{sw_id}" OR rel.flowid IN {links}
55  RETURN distinct rel;
56  """.format(sw_id=old_switch_id, links=json.dumps(links))
57  print query
58  return self.graph.data(query)
59 
60  def _migrate_segments(self, affected_segments, old_switch_id, new_switch_id,
61  replug_port_map):
62  for segment in affected_segments:
63  if segment['rel'].get('dst_switch') == old_switch_id:
64  segment['rel']['dst_switch'] = new_switch_id
65  segment['rel']['dst_port'] = replug_port_map[str(segment['rel'].get('dst_port'))]
66 
67  if segment['rel'].get('src_switch') == old_switch_id:
68  segment['rel']['src_switch'] = new_switch_id
69  segment['rel']['src_port'] = replug_port_map[str(segment['rel'].get('src_port'))]
70 
71  return affected_segments
72 
73  def _migrate_flows(self, affected_flows, old_switch_id, new_switch_id,
74  replug_port_map):
75  for flow in affected_flows:
76  if flow['rel'].get('dst_switch') == old_switch_id:
77  flow['rel']['dst_switch'] = new_switch_id
78  flow['rel']['dst_port'] = replug_port_map[str(flow['rel'].get('dst_port'))]
79 
80  if flow['rel'].get('src_switch') == old_switch_id:
81  flow['rel']['src_switch'] = new_switch_id
82  flow['rel']['src_port'] = replug_port_map[str(flow['rel'].get('src_port'))]
83 
84 
85  flow_path = json.loads(flow['rel'].get("flowpath"))
86  for path_segment in flow_path['path']:
87  if path_segment['switch_id'] == old_switch_id:
88  path_segment['switch_id'] = new_switch_id
89  path_segment['port_no'] = replug_port_map[str(
90  path_segment['port_no'])]
91 
92  flow['rel']["flowpath"] = json.dumps(flow_path)
93 
94  return affected_flows
95 
96  def update_isl_bandwidth(self, src_switch, src_port, dst_switch, dst_port):
97  """ WARNING CLONED FROM KILDA TOPOLOGY CLASSES!!! """
98 
99  available_bw_query = (
100  "MATCH (src:switch {{name:'{src_switch}'}}), (dst:switch {{name:'{dst_switch}'}}) WITH src,dst "
101  " MATCH (src)-[i:isl {{ src_port:{src_port}, dst_port: {dst_port}}}]->(dst) WITH src,dst,i "
102  " OPTIONAL MATCH (src)-[fs:flow_segment {{ src_port:{src_port}, dst_port: {dst_port}, ignore_bandwidth: false }}]->(dst) "
103  " WITH sum(fs.bandwidth) AS used_bandwidth, i as i "
104  " SET i.available_bandwidth = i.max_bandwidth - used_bandwidth "
105  )
106  params = {
107  'src_switch': src_switch,
108  'src_port': src_port,
109  'dst_switch': dst_switch,
110  'dst_port': dst_port,
111  }
112  self.tx.run(available_bw_query.format(**params))
113 
114  def apply_migrated_data(self, affected_segments, affected_flows):
115  self.tx = self.graph.begin(autocommit=False)
116 
117  # Recreate Flow Segments
118  for segment in affected_segments:
119  neo_repr = segment['rel'].__repr__()
120  self.tx.run(
121  """
122  MATCH (a),(b)
123  WHERE a.name = "{src_sw}" AND b.name = "{dst_sw}"
124  CREATE (a)-[r:flow_segment {params}]->(b)
125  RETURN r
126  """.format(
127  src_sw=segment['rel']['src_switch'],
128  dst_sw=segment['rel']['dst_switch'],
129  params=neo_repr.split('-[:flow_segment ')[1].split(']->(')[0]
130  ))
131  # Clean Old Segments
132  self.tx.run("""
133  MATCH ()-[ rel:flow_segment ]->()
134  WHERE id(rel) = {seg_id}
135  DELETE rel;
136  """.format(
137  seg_id=remote(segment['rel'])._id,
138  ))
139  # Update Affected ISL
141  src_switch=segment['rel']['src_switch'],
142  src_port=segment['rel']['src_port'],
143  dst_switch=segment['rel']['dst_switch'],
144  dst_port=segment['rel']['dst_port'])
145 
146  # Recreate Flows
147  for flow in affected_flows:
148  neo_repr = flow['rel'].__repr__()
149  self.tx.run(
150  """
151  MATCH (a),(b)
152  WHERE a.name = "{src_sw}" AND b.name = "{dst_sw}"
153  CREATE (a)-[ r:flow {params} ]->(b)
154  RETURN r
155  """.format(
156  src_sw=flow['rel']['src_switch'],
157  dst_sw=flow['rel']['dst_switch'],
158  params=neo_repr.split('-[:flow ')[1].split(']->(')[0]
159  ))
160  # Clean Old Flows
161  self.tx.run("""
162  MATCH ()-[ rel:flow ]->()
163  WHERE id(rel) = {flow_id}
164  DELETE rel;
165  """.format(
166  flow_id=remote(flow['rel'])._id,
167  ))
168 
169  self.tx.commit()
170 
171 
172 def _macify_switch_id(dpid):
173  dpid = dpid.lower().replace(":", '').replace("sw", '')
174  c = list(dpid[::-2])
175  c.reverse()
176  return ":".join(
177  map(
178  lambda a: ''.join(a),
179  zip(dpid[::2], c)
180  )
181  )
182 
183 
184 def _read_replug_ports_map_from_file(old_switch_id, new_swithc_id):
185  file_name = "{}-to-{}-ports.json".format(
186  old_switch_id.replace(':', ''),
187  new_swithc_id.replace(':', '')
188  )
189  with open(file_name, 'r') as ports_map:
190  ports_map_data = json.load(ports_map)
191 
192  return ports_map_data
193 
194 def start_migration(old_switch_id, new_switch_id, db_connection_config,
195  replug_port_map=None):
196  # get old_switch_id
197  old_switch_id = _macify_switch_id(old_switch_id)
198  # get new_switch_id
199  new_switch_id = _macify_switch_id(new_switch_id)
200 
201  # get replug_port_map in format "old port" -> "new_port"
202  if not replug_port_map:
203  replug_port_map = _read_replug_ports_map_from_file(
204  old_switch_id, new_switch_id)
205 
206  kilda_db = KildaDBActions(**db_connection_config)
207  # 0 STEP
208  # Get all affected flow-segments from NEO
209  affected_segments = kilda_db._find_all_affected_flow_segments(
210  old_switch_id)
211  parent_flows = list(set([
212  segment['rel'].get('flowid')
213  for segment in affected_segments
214  ]))
215  # Get all affected flows from NEO
216  affected_flows = kilda_db._find_all_affected_flows(old_switch_id, links=parent_flows)
217  # 1 STEP
218  # Performf in memory update all affected segments
219  affected_segments = kilda_db._migrate_segments(
220  affected_segments, old_switch_id, new_switch_id, replug_port_map)
221  # Performf in memory update all affected flows
222  affected_flows = kilda_db._migrate_flows(
223  affected_flows, old_switch_id, new_switch_id, replug_port_map)
224  # Prepate transaction and commit cahnges to DB
225  kilda_db.apply_migrated_data(affected_segments, affected_flows)
226 
227 
228 def do_test():
229  """Simple test with semi random data"""
230  db_connection_config = {
231  "db_addr": "localhost",
232  "user": 'neo4j',
233  "passwd": '....',
234  "port": 7474,
235  "secure": False,
236  "scheme": 'http'
237  }
238  old_switch_id = "01:00:00:22:3d:5a:04:87"
239  new_switch_id = "00:00:00:22:3d:5a:04:87"
240  replug_port_map = {str(i): i for i in xrange(100)}
241 
243  old_switch_id=old_switch_id,
244  new_switch_id=new_switch_id,
245  db_connection_config=db_connection_config,
246  replug_port_map=replug_port_map)
247 
248 if __name__ == "__main__":
249  import argparse
250 
251  parser = argparse.ArgumentParser(description='Switch Migration tool')
252 
253  parser.add_argument('--old_switch_id', type=str,
254  help='Old Switch dpid')
255  parser.add_argument('--new_switch_id', type=str,
256  help='new Switch dpid')
257 
258  parser.add_argument('--test', type=bool,
259  help='test', default=False)
260 
261  parser.add_argument('--neo_scheme', type=str,
262  help='Neo4j scheme name', default='http')
263  parser.add_argument('--neo_host', type=str,
264  help='Neo4j host name or IP', default='locahost')
265  parser.add_argument('--neo_port', type=int,
266  help='Neo4j port', default=7474)
267  parser.add_argument('--neo_secure', type=bool,
268  help='Neo4j secure', default=False)
269 
270  parser.add_argument('--neo_user', type=str,
271  help='Neo4j user name', default='neo4j')
272  parser.add_argument('--neo_passwd', type=str,
273  help='Neo4j password')
274 
275  args = parser.parse_args()
276 
277  if args.test:
278  do_test()
279  else:
280  old_switch_id = args.old_switch_id
281  new_switch_id = args.new_switch_id
282 
283  db_connection_config = {
284  "db_addr": args.neo_host,
285  "user": args.neo_user,
286  "passwd": args.neo_passwd,
287  "port": args.neo_port,
288  "secure": args.neo_secure,
289  "scheme": args.neo_scheme
290  }
292  old_switch_id=old_switch_id,
293  new_switch_id=new_switch_id,
294  db_connection_config=db_connection_config)
295 
def __init__(self, db_addr, user, passwd, port, secure, scheme)
def get(section, option)
Definition: config.py:43
def apply_migrated_data(self, affected_segments, affected_flows)
def start_migration(old_switch_id, new_switch_id, db_connection_config, replug_port_map=None)
def update_isl_bandwidth(self, src_switch, src_port, dst_switch, dst_port)