Open Kilda Java Documentation
share.py
Go to the documentation of this file.
1 # Copyright 2017 Telstra Open Source
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 #
15 
16 import functools
17 import itertools
18 import json
19 import logging
20 import os
21 import sys
22 import unittest
23 import uuid
24 
25 from topologylistener import db
26 from topologylistener import flow_utils
27 from topologylistener import message_utils
28 from topologylistener import messageclasses
29 
30 log = logging.getLogger(__name__)
31 
32 dpid_test_marker = 0xfffe000000000000
33 dpid_protected_bits = 0xffffff0000000000
34 
35 dpid_test_marker = 0xfffe000000000000
36 dpid_protected_bits = 0xffffff0000000000
37 
38 cookie_test_data_flag = 0x0010000000000000
39 
40 
41 def feed_isl_discovery(isl, **fields):
42  payload = isl_info_payload(isl, **fields)
43  return feed_message(command(payload))
44 
45 
46 def feed_message(message):
47  return messageclasses.MessageItem(message).handle()
48 
49 
50 def link_props_request(link_props):
51  return {
52  'source': {
53  'switch-id': link_props.source.dpid,
54  'port-id': link_props.source.port},
55  'dest': {
56  'switch-id': link_props.dest.dpid,
57  'port-id': link_props.dest.port},
58  'props': link_props.props,
59  'time_create': link_props.time_create.as_java_timestamp(),
60  'time_modify': link_props.time_modify.as_java_timestamp()}
61 
62 
64  return {
65  'link_props': request,
66  'clazz': messageclasses.CD_LINK_PROPS_PUT}
67 
68 
70  return {
71  'lookup_mask': request,
72  'clazz': messageclasses.CD_LINK_PROPS_DROP}
73 
74 
75 def feature_toggle_request(**fields):
76  payload = dict(fields)
77  payload['clazz'] = (
78  'org.openkilda.messaging.command.system.FeatureToggleRequest')
79  return payload
80 
81 
82 def isl_info_payload(isl, **fields):
83  payload = {
84  'state': 'DISCOVERED',
85  'latency_ns': 20,
86  'speed': 1000,
87  'available_bandwidth': 1000}
88  payload.update(fields)
89  payload.update({
90  'clazz': messageclasses.MT_ISL,
91  'path': [
92  {
93  'switch_id': isl.source.dpid,
94  'port_no': isl.source.port},
95  {
96  'switch_id': isl.dest.dpid,
97  'port_no': isl.dest.port}]})
98 
99  return payload
100 
101 
102 def command(payload, **fields):
103  message = {
104  'timestamp': 0,
105  'correlation_id': make_correlation_id('test')}
106  message.update(fields)
107  message.update({
108  'clazz': message_utils.MT_INFO,
109  'payload': payload})
110  return message
111 
112 
113 def make_correlation_id(prefix=''):
114  if prefix and prefix[-1] != '.':
115  prefix += '.'
116  return '{}{}'.format(prefix, uuid.uuid1())
117 
118 
119 def make_datapath_id(number):
120  if number & dpid_protected_bits:
121  raise ValueError(
122  'Invalid switch id {}: use protected bits'.format(number))
123  return long_as_dpid(number | dpid_test_marker)
124 
125 
127  drop_db_flows(tx)
128  drop_db_isls(tx)
129  drop_db_switches(tx)
131 
132 
133 def drop_db_isls(tx):
134  q = 'MATCH (:switch)-[self:isl|:link_props]->() RETURN self'
135  for data_set in tx.run(q):
136  rel = data_set['self']
137  if not is_test_dpid(rel['src_switch']):
138  continue
139  if not is_test_dpid(rel['dst_switch']):
140  continue
141 
142  tx.separate(rel)
143 
144 
146  q = 'MATCH (a:switch) RETURN a'
147  batch = (x['a'] for x in tx.run(q))
148  for node in batch:
149  if not is_test_dpid(node['name']):
150  continue
151  tx.delete(node)
152 
153 
155  q = 'MATCH (a:link_props) RETURN a'
156  batch = (x['a'] for x in tx.run(q))
157  for node in batch:
158  match = [
159  is_test_dpid(node[rel]) for rel in ('src_switch', 'dst_switch')]
160  if not all(match):
161  continue
162  tx.delete(node)
163 
164 
166  q_lookup = 'MATCH (:switch)-[a:flow]->(:switch) RETURN a'
167  q_delete = 'MATCH (:switch)-[a:flow]->(:switch) WHERE id(a)=$id DELETE a'
168  batch = (x['a'] for x in tx.run(q_lookup))
169  for relation in batch:
170  cookie = relation['cookie']
171  if cookie is None:
172  continue
173  try:
174  cookie = int(cookie)
175  if not (cookie & cookie_test_data_flag):
176  continue
177  except ValueError:
178  continue
179 
180  drop_db_flow_segments(tx, relation['flowid'])
181  tx.run(q_delete, {'id': db.neo_id(relation)})
182 
183 
184 def drop_db_flow_segments(tx, flow_id):
185  q = (
186  'MATCH (:switch)-[fs:flow_segment]->(:switch)\n'
187  'WHERE fs.flowid=$flow_id\n'
188  'DELETE fs')
189  tx.run(q, {'flow_id': flow_id})
190 
191 
192 def is_test_dpid(dpid):
193  dpid = dpid_as_long(dpid)
194  return dpid & dpid_protected_bits == dpid_test_marker
195 
196 
197 def dpid_as_long(dpid_str):
198  value = dpid_str.replace(':', '')
199  return int(value, 16)
200 
201 
202 def long_as_dpid(dpid):
203  value = hex(dpid)
204  i = iter(value)
205  chunked = [a + b for a, b in zip(i, i)]
206  chunked.pop(0)
207  return ':'.join(chunked)
208 
209 
210 class Environment(object):
211  def __init__(self):
212  self._monkey_patch_recovery = {}
214 
215  self.init_logging()
216  self.neo4j_connect = self.init_neo4j()
217 
218  self.monkey_patch()
219 
221  return tuple(self.kafka_producer_stub.backlog)
222 
224  self.kafka_producer_stub.backlog[:] = []
225 
226  def init_logging(self):
227  logging.basicConfig(level=logging.DEBUG, stream=sys.stdout)
228 
229  def init_neo4j(self):
230  return flow_utils.graph
231 
232  def monkey_patch(self):
233  for module, attr, replace in (
234  (message_utils, 'producer', self.kafka_producer_stub),):
235  current = getattr(module, attr)
236  if current is replace:
237  continue
238 
239  module_data = self._monkey_patch_recovery.setdefault(module, {})
240  module_data[attr] = current
241  setattr(module, attr, replace)
242 
243  message_utils.producer = self.kafka_producer_stub
244 
245 
246 class AbstractTest(unittest.TestCase):
247  path = functools.partial(os.path.join, os.path.dirname(__file__))
248 
249  def setUp(self):
250  self.log_separator()
251  self.drop_persistent_data()
252 
253  def log_separator(self):
254  separator = '*-' * 29 + '*'
255  prefix = '*' * 3
256  message = '\n'.join((
257  '', separator,
258  '{} Run test {}'.format(prefix, self.id()),
259  separator))
260  sys.stdout.flush()
261  logging.info(message)
262 
264  with env.neo4j_connect.begin() as tx:
266 
267  def feed_service(self, message, can_fail=False):
268  result = messageclasses.MessageItem(message).handle()
269  if not can_fail:
270  self.assertTrue(result)
271 
273  return env.neo4j_connect.begin()
274 
275  def load_data(self, name):
276  with open(self.path('data', name), 'rt') as stream:
277  return json.load(stream)
278 
279 
280 class KafkaProducerStub(object):
281  def __init__(self, backlog_size=32):
282  self.backlog_size = backlog_size
283  self.backlog = []
284 
285  def send(self, topic, payload=None):
286  record = KafkaSendRecord(topic, payload)
287  self.backlog.insert(0, record)
288  self.backlog[self.backlog_size:] = []
289  return KafkaSendFutureStub(record)
290 
291 
292 class KafkaSendFutureStub(object):
293  def __init__(self, record):
294  self.record = record
295 
296  def get(self, timeout=None):
297  log.debug('Send kafka record: %s', self.record)
298 
299 
300 class KafkaSendRecord(object):
301  payload_visibility_limit = 60
302  _counter = itertools.count()
303 
304  def __init__(self, topic, payload):
305  self.topic = topic
306  self.payload = payload
307  self.index = next(self._counter)
308 
309  def __str__(self):
310  payload = self.payload
311  if not isinstance(payload, basestring):
312  payload = str(payload)
313 
314  chunks = [
315  'index={}'.format(self.index),
316  'topic={!r}'.format(self.topic)]
317  if len(payload) < self.payload_visibility_limit:
318  chunks.append('payload={!r}'.format(payload))
319  else:
320  chunks.append('payload="""{!r}""" ... more {} chars'.format(
321  payload[:self.payload_visibility_limit],
322  len(payload) - self.payload_visibility_limit))
323  return 'KafkaSend{{{}}}'.format(', '.join(chunks))
324 
325 
326 # must be at the end of module
327 env = Environment()
def link_props_request(link_props)
Definition: share.py:50
def send(self, topic, payload=None)
Definition: share.py:285
def feed_service(self, message, can_fail=False)
Definition: share.py:267
def make_correlation_id(prefix='')
Definition: share.py:113
def feed_isl_discovery(isl, fields)
Definition: share.py:41
def dpid_as_long(dpid_str)
Definition: share.py:197
def feature_toggle_request(fields)
Definition: share.py:75
def command(payload, fields)
Definition: share.py:102
def isl_info_payload(isl, fields)
Definition: share.py:82
def __init__(self, backlog_size=32)
Definition: share.py:281
def link_props_drop_payload(request)
Definition: share.py:69
def link_props_put_payload(request)
Definition: share.py:63
def drop_db_flow_segments(tx, flow_id)
Definition: share.py:184
def feed_message(message)
Definition: share.py:46
def __init__(self, topic, payload)
Definition: share.py:304
def make_datapath_id(number)
Definition: share.py:119