Open Kilda Java Documentation
isl_utils.py
Go to the documentation of this file.
1 # Copyright 2017 Telstra Open Source
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 #
15 
16 import logging
17 import textwrap
18 
19 from topologylistener import db
20 from topologylistener import exc
21 from topologylistener import flow_utils
22 from topologylistener import model
23 from topologylistener import link_props_utils
24 
25 logger = logging.getLogger(__name__)
26 
27 
28 def create_if_missing(tx, timestamp, *links):
29  q = textwrap.dedent("""
30  MATCH (src:switch {name: $src_switch})
31  MATCH (dst:switch {name: $dst_switch})
32  MERGE (src) - [target:isl {
33  src_switch: $src_switch,
34  src_port: $src_port,
35  dst_switch: $dst_switch,
36  dst_port: $dst_port
37  }] -> (dst)
38  ON CREATE SET
39  target.status=$status, target.actual=$status,
40  target.latency=-1,
41  target.time_create=$timestamp,
42  target.time_modify=$timestamp
43  ON MATCH SET target.time_modify=$timestamp""")
44 
45  for isl in sorted(links):
46  for target in (isl, isl.reversed()):
47  p = _make_match(target)
48  p['status'] = 'inactive'
49  p['timestamp'] = str(timestamp)
50 
51  logger.info('Ensure ISL %s exists', target)
52  db.log_query('create ISL', q, p)
53  tx.run(q, p)
54 
55 
56 def fetch(tx, isl):
57  p = _make_match(isl)
58  q = textwrap.dedent("""
59  MATCH
60  (:switch {name: $src_switch})
61  -
62  [target:isl {
63  src_switch: $src_switch,
64  src_port: $src_port,
65  dst_switch: $dst_switch,
66  dst_port: $dst_port
67  }]
68  ->
69  (:switch {name: $dst_switch})
70  RETURN target""")
71 
72  db.log_query('ISL fetch', q, p)
73  cursor = tx.run(q, p)
74 
75  try:
76  target = db.fetch_one(cursor)['target']
77  except exc.DBEmptyResponse:
78  raise exc.DBRecordNotFound(q, p)
79 
80  return target
81 
82 
83 def fetch_by_endpoint(tx, endpoint):
84  q = textwrap.dedent("""
85  MATCH (src:switch)-[target:isl]->(:switch)
86  WHERE src.name=$src_switch AND target.src_port=$src_port
87  RETURN target""")
88  p = {
89  'src_switch': endpoint.dpid,
90  'src_port': endpoint.port}
91  cursor = tx.run(q, p)
92  return db.ResponseIterator((x['target'] for x in cursor), q, p)
93 
94 
95 def fetch_by_datapath(tx, dpid):
96  q = textwrap.dedent("""
97  MATCH (sw:switch {name: $src_switch})
98  - [target:isl {src_switch: $src_switch}] -> ()
99  RETURN target""")
100  p = {'src_switch': dpid}
101  cursor = tx.run(q, p)
102  return (x['target'] for x in cursor)
103 
104 
105 def touch(tx, isl, mtime=None):
106  logger.debug("Touch ISL %s", isl)
107 
108  if mtime is None:
109  mtime = model.TimeProperty.now()
110  props = {'time_modify': str(mtime)}
111  set_props(tx, isl, props)
112 
113 
114 def resolve_conflicts(tx, isl):
115  logger.info('Check ISL %s for conflicts', isl)
116 
117  involved = [
118  fetch(tx, isl), fetch(tx, isl.reversed())]
119  keep_dbid = {db.neo_id(x) for x in involved}
120 
121  involved.extend(fetch_by_endpoint(tx, isl.source))
122  involved.extend(fetch_by_endpoint(tx, isl.dest))
123 
124  _lock_affected_switches(tx, involved)
125 
126  for link in involved:
127  link_dbid = db.neo_id(link)
128  if link_dbid in keep_dbid:
129  continue
130 
131  link_isl = model.InterSwitchLink.new_from_db(link)
132  if is_active_status(link['actual']):
133  logger.error('Detected ISL %s conflict with %s. Please contact dev team', link_isl, isl)
134  # set_active_field(tx, link_dbid, 'inactive')
135  # update_status(tx, link_isl)
136  else:
137  logger.debug("Skip conflict ISL %s deactivation due to its current status - %s", link_isl, link['actual'])
138 
139 
140 def switch_unplug(tx, dpid, mtime=True):
141  logging.info("Deactivate all ISL to/from %s", dpid)
142 
143  involved = list(fetch_by_datapath(tx, dpid))
144  _lock_affected_switches(tx, involved, dpid)
145 
146  for db_link in involved:
147  source = model.IslPathNode(
148  db_link['src_switch'], db_link['src_port'])
149  dest = model.IslPathNode(db_link['dst_switch'], db_link['dst_port'])
150  isl = model.InterSwitchLink(source, dest, db_link['actual'])
151  logging.debug("Found ISL: %s", isl)
152 
153  set_active_field(tx, db.neo_id(db_link), 'inactive')
154  update_status(tx, isl, mtime=mtime)
155 
156 
157 def disable_by_endpoint(tx, endpoint, is_moved=False, mtime=True):
158  logging.debug('Locate all ISL starts on %s', endpoint)
159 
160  involved = list(fetch_by_endpoint(tx, endpoint))
161  _lock_affected_switches(tx, involved, endpoint.dpid)
162 
163  updated = []
164  for link in involved:
165  isl = model.InterSwitchLink.new_from_db(link)
166  logger.info('Deactivate ISL %s', isl)
167 
168  status = 'moved' if is_moved else 'inactive'
169  set_active_field(tx, db.neo_id(link), status)
170  update_status(tx, isl, mtime=mtime)
171 
172  updated.append(isl)
173 
174  return updated
175 
176 
177 def update_status(tx, isl, mtime=True):
178  logging.info("Sync status both sides of ISL %s to each other", isl)
179 
180  q = textwrap.dedent("""
181  MATCH
182  (:switch {name: $src_switch})
183  -
184  [self:isl {
185  src_switch: $src_switch,
186  src_port: $src_port,
187  dst_switch: $dst_switch,
188  dst_port: $dst_port
189  }]
190  ->
191  (:switch {name: $dst_switch})
192  MATCH
193  (:switch {name: $peer_src_switch})
194  -
195  [peer:isl {
196  src_switch: $peer_src_switch,
197  src_port: $peer_src_port,
198  dst_switch: $peer_dst_switch,
199  dst_port: $peer_dst_port
200  }]
201  ->
202  (:switch {name: $peer_dst_switch})
203 
204  WITH self, peer, CASE
205  WHEN self.actual = $status_up AND peer.actual = $status_up
206  THEN $status_up
207  WHEN self.actual = $status_moved OR peer.actual = $status_moved
208  THEN $status_moved
209  ELSE $status_down
210  END AS isl_status
211 
212  SET self.status=isl_status
213  SET peer.status=isl_status""")
214  p = {
215  'status_up': 'active',
216  'status_moved': 'moved',
217  'status_down': 'inactive'}
218  p.update(_make_match(isl))
219  p.update({'peer_' + k: v for k, v in _make_match(isl.reversed()).items()})
220 
221  expected_update_properties_count = 2
222  if mtime:
223  if not isinstance(mtime, model.TimeProperty):
224  mtime = model.TimeProperty.now()
225 
226  q += '\nSET self.time_modify=$mtime, peer.time_modify=$mtime'
227  p['mtime'] = str(mtime)
228  expected_update_properties_count = 4
229 
230  db.log_query('ISL update status', q, p)
231  cursor = tx.run(q, p)
232  cursor.evaluate() # to fetch first record
233 
234  stats = cursor.stats()
235  if stats['properties_set'] != expected_update_properties_count:
236  logger.error(
237  'Failed to sync ISL\'s %s records statuses. Looks like it is '
238  'unidirectional.', isl)
239 
240 
242  db_isl = fetch(tx, isl)
243  values = [db_isl['time_create'], db_isl['time_modify']]
244  for idx, item in enumerate(values):
245  if not item:
246  continue
247  values[idx] = model.TimeProperty.new_from_db(item)
248  return model.LifeCycleFields(*values)
249 
250 
251 def set_active_field(tx, neo_id, status):
252  q = textwrap.dedent("""
253  MATCH (:switch) - [target:isl] -> ()
254  WHERE id(target) = $id
255  SET target.actual = $status""")
256  p = {
257  'status': status,
258  'id': neo_id}
259  tx.run(q, p)
260 
261 
262 def increase_cost(tx, isl, amount, limit):
263  cost = get_cost(tx, isl)
264  if not cost:
265  cost = 0
266  if limit <= cost:
267  return
268 
269  set_cost(tx, isl, cost + amount)
270 
271 
272 def get_cost(tx, isl):
273  db_record = fetch(tx, isl)
274  value = db_record['cost']
275  if value is not None:
276  value = model.convert_integer(value)
277  return value
278 
279 
280 def set_cost(tx, isl, cost):
281  link_props = model.LinkProps.new_from_isl(isl)
282  link_props.props['cost'] = cost
283  try:
284  origin = link_props_utils.set_props_and_propagate_to_isl(
285  tx, link_props)
286  except exc.DBRecordNotFound:
287  origin = set_props(tx, isl, {'cost': cost})
288 
289  original_cost = origin.get('cost')
290  if original_cost != cost:
291  logger.warning(
292  'ISL %s cost have been changed from %s to %s',
293  isl, original_cost, cost)
294 
295 
296 def set_props(tx, isl, props):
297  target = fetch(tx, isl)
298  origin, update = db.locate_changes(target, props)
299  if update:
300  q = textwrap.dedent("""
301  MATCH (:switch)-[target:isl]->(:switch)
302  WHERE id(target)=$target_id
303  """) + db.format_set_fields(
304  db.escape_fields(update), field_prefix='target.')
305 
306  logger.debug('Push ISL properties: %r', update)
307  tx.run(q, {'target_id': db.neo_id(target)})
308 
309  return origin
310 
311 
312 def del_props(tx, isl, props):
313  logger.info(
314  'ISL drop %s props request: %s', isl, ', '.join(repr(x) for x in props))
315 
316  remove = ['target.{}'.format(db.escape(x)) for x in props]
317  if not remove:
318  return
319 
320  remove.insert(0, '')
321  p = _make_match(isl)
322  q = textwrap.dedent("""
323  MATCH
324  (:switch {name: $src_switch})
325  -
326  [target:isl {
327  src_switch: $src_switch,
328  src_port: $src_port,
329  dst_switch: $dst_switch,
330  dst_port: $dst_port
331  }]
332  ->
333  (:switch {name: $dst_switch})""") + '\nREMOVE '.join(remove)
334  db.log_query('ISL drop props', q, p)
335  stats = tx.run(q, p).stats()
336  return stats['contains_updates']
337 
338 
339 def _lock_affected_switches(tx, db_links, *extra):
340  affected_switches = set(extra)
341  for link in db_links:
342  affected_switches.add(link['src_switch'])
343  affected_switches.add(link['dst_switch'])
344 
345  flow_utils.precreate_switches(tx, *affected_switches)
346 
347 
348 def _make_match(isl):
349  return {
350  'src_switch': isl.source.dpid,
351  'src_port': isl.source.port,
352  'dst_switch': isl.dest.dpid,
353  'dst_port': isl.dest.port}
354 
355 
356 def is_active_status(status):
357  return status == 'active'
def set_active_field(tx, neo_id, status)
Definition: isl_utils.py:251
def set_cost(tx, isl, cost)
Definition: isl_utils.py:280
def update_status(tx, isl, mtime=True)
Definition: isl_utils.py:177
def fetch_by_datapath(tx, dpid)
Definition: isl_utils.py:95
def set_props(tx, isl, props)
Definition: isl_utils.py:296
def fetch_by_endpoint(tx, endpoint)
Definition: isl_utils.py:83
def switch_unplug(tx, dpid, mtime=True)
Definition: isl_utils.py:140
def del_props(tx, isl, props)
Definition: isl_utils.py:312
def is_active_status(status)
Definition: isl_utils.py:356
def touch(tx, isl, mtime=None)
Definition: isl_utils.py:105
def disable_by_endpoint(tx, endpoint, is_moved=False, mtime=True)
Definition: isl_utils.py:157
def increase_cost(tx, isl, amount, limit)
Definition: isl_utils.py:262
def resolve_conflicts(tx, isl)
Definition: isl_utils.py:114
def get_life_cycle_fields(tx, isl)
Definition: isl_utils.py:241
def create_if_missing(tx, timestamp, links)
Definition: isl_utils.py:28