Open Kilda Java Documentation
test_link_props.py
Go to the documentation of this file.
1 # Copyright 2018 Telstra Open Source
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 #
15 
16 import json
17 import time
18 import uuid
19 
20 from topologylistener import exc
21 from topologylistener import config
22 from topologylistener import isl_utils
23 from topologylistener import link_props_utils
24 from topologylistener import message_utils
25 from topologylistener import messageclasses
26 from topologylistener import model
27 from topologylistener.tests import share
28 
29 
31  endpoint_alpha = model.NetworkEndpoint(share.make_datapath_id(1), 4)
32  endpoint_beta = model.NetworkEndpoint(share.make_datapath_id(2), 6)
33  endpoint_gamma = model.NetworkEndpoint(share.make_datapath_id(3), 8)
34 
35  isl_alpha_beta = model.InterSwitchLink(
36  model.IslPathNode(endpoint_alpha.dpid, endpoint_alpha.port),
37  model.IslPathNode(endpoint_beta.dpid, endpoint_beta.port))
38  isl_beta_alpha = isl_alpha_beta.reversed()
39 
40  def setUp(self):
41  super(Abstract, self).setUp()
42 
43  for isl in (self.isl_alpha_beta, self.isl_beta_alpha):
44  isl_info = share.isl_info_payload(isl)
45  command = share.command(isl_info)
46  self.assertTrue(messageclasses.MessageItem(command).handle())
47 
48  def _put(self, subject):
49  request = share.link_props_request(subject)
50  payload = share.link_props_put_payload(request)
51  self.feed_service(share.command(payload))
52  return payload
53 
54  def _get_kafka_response(self, offset=0, expect_class=message_utils.MT_INFO):
55  kafka_backlog = share.env.kafka_producer_stub.backlog
56 
57  self.assertTrue(offset < len(kafka_backlog))
58 
59  kafka_record = kafka_backlog[offset]
60  self.assertEqual(config.KAFKA_NORTHBOUND_TOPIC, kafka_record.topic)
61 
62  message = json.loads(kafka_record.payload)
63  self.assertEqual(expect_class, message['clazz'])
64 
65  return message['payload']
66 
67 
69  def test_put(self):
70  link_props = model.LinkProps(
71  self.endpoint_beta, self.endpoint_gamma,
72  props={'cost': 5, 'custom': 'test'})
73 
74  # to make time_create and time_modify different
75  link_props.time_create = model.TimeProperty.new_from_java_timestamp(
76  link_props.time_create.as_java_timestamp() - 100)
77 
78  # to make a difference in time fields, if incorrect values(not from
79  # passed object) are used
80  time.sleep(0.001)
81 
82  self._put(link_props)
83 
84  with self.open_neo4j_session() as tx:
85  persistent = link_props_utils.read(tx, link_props)
86  self.assertEquals(link_props, persistent)
87 
88  response = self._get_kafka_response()
89  self.assertEqual(
90  message_utils.MI_LINK_PROPS_RESPONSE, response['clazz'])
91  self.assertIsNone(response['error'])
92 
93  encoded_link_props = share.link_props_request(link_props)
94  encoded_link_props = json.loads(json.dumps(encoded_link_props))
95  self.assertEqual(encoded_link_props, response['link_props'])
96 
98  unique_value = str(uuid.uuid4())
99  link_props = model.LinkProps(
100  self.endpoint_alpha, self.endpoint_beta,
101  props={'test': unique_value})
102  self._put(link_props)
103 
104  with self.open_neo4j_session() as tx:
105  persistent = link_props_utils.read(tx, link_props)
106  self.assertEqual(link_props, persistent)
107 
108  isl_db = isl_utils.fetch(
109  tx, model.InterSwitchLink.new_from_link_props(link_props))
110  tx.graph.pull(isl_db)
111  self.assertEqual(unique_value, isl_db['test'])
112 
114  link_props = model.LinkProps(
115  self.endpoint_beta, self.endpoint_alpha,
116  props={'latency': -2})
117  self._put(link_props)
118 
119  response = self._get_kafka_response()
120  self.assertEqual(
121  message_utils.MI_LINK_PROPS_RESPONSE, response['clazz'])
122  self.assertIsNone(response['link_props'])
123  self.assertIsNotNone(response['error'])
124 
126  link_props = model.LinkProps(
127  self.endpoint_beta, self.endpoint_gamma)
128  link_props.source = model.NetworkEndpoint(
129  self.endpoint_alpha.dpid, None)
130  self._put(link_props)
131 
132  response = self._get_kafka_response()
133  self.assertEqual(
134  message_utils.MI_LINK_PROPS_RESPONSE, response['clazz'])
135  self.assertIsNotNone(response['error'])
136  self.assertIsNone(response['link_props'])
137 
138 
140  def setUp(self):
141  super(TestLinkProps02Occupied, self).setUp()
142 
143  alpha_beta = model.LinkProps(
144  self.endpoint_alpha, self.endpoint_beta,
145  props={'cost': '32', 'name': 'alpha-beta'})
146  alpha_gamma = model.LinkProps(
147  self.endpoint_alpha, self.endpoint_gamma,
148  props={'cost': '96', 'name': 'alpha-gamma'})
149  beta_gamma = model.LinkProps(
150  self.endpoint_beta, self.endpoint_gamma,
151  props={'cost': '64', 'name': 'beta-gamma'})
152 
153  self._put(alpha_beta)
154  self._put(alpha_gamma)
155  self._put(beta_gamma)
156 
157  def test_update(self):
158  link_props = model.LinkProps(
159  self.endpoint_alpha, self.endpoint_beta,
160  props={'cost': 1, 'extra': 'new'})
161  self._put(link_props)
162 
163  with self.open_neo4j_session() as tx:
164  persistent = link_props_utils.read(tx, link_props)
165  self.assertEqual(
166  {'cost': 1, 'extra': 'new', 'name': 'alpha-beta'},
167  persistent.props)
168 
169  def test_drop(self):
170  lookup_mask = model.LinkProps(self.endpoint_alpha, self.endpoint_beta)
171 
172  with self.open_neo4j_session() as tx:
173  isl_db = isl_utils.fetch(tx, model.InterSwitchLink(
174  self.endpoint_alpha, self.endpoint_beta))
175  tx.graph.pull(isl_db)
176  self.assertEqual('alpha-beta', isl_db['name'])
177 
178  self._drop(lookup_mask)
179 
180  self._ensure_missing(lookup_mask)
181  self._ensure_exists(
184 
185  with self.open_neo4j_session() as tx:
186  isl_db = isl_utils.fetch(tx, model.InterSwitchLink(
187  self.endpoint_alpha, self.endpoint_beta))
188  tx.graph.pull(isl_db)
189  self.assertNotIn('name', isl_db)
190 
191  def test_drop_multi(self):
192  any_endpoint = model.NetworkEndpoint(None, None)
193  lookup_mask = model.LinkProps(self.endpoint_alpha, any_endpoint)
194  self._drop(lookup_mask)
195 
196  self._ensure_missing(
199  self._ensure_exists(
201 
202  def test_drop_reject(self):
203  any_endpoint = model.NetworkEndpoint(None, None)
204  lookup_mask = model.LinkProps(any_endpoint, any_endpoint)
205  self._drop(lookup_mask)
206 
207  self._ensure_exists(
211 
212  response = self._get_kafka_response(
213  expect_class=message_utils.MT_INFO_CHUNKED)
214  self.assertIsNone(response)
215 
216  response = self._get_kafka_response(
217  offset=1, expect_class=message_utils.MT_INFO_CHUNKED)
218  self.assertIsNotNone(response)
219  self.assertEqual(
220  message_utils.MI_LINK_PROPS_RESPONSE, response['clazz'])
221  self.assertIsNone(response['link_props'])
222  self.assertIsNotNone(response['error'])
223 
224  def _ensure_exists(self, *batch):
225  with self.open_neo4j_session() as tx:
226  for subject in batch:
227  link_props_utils.read(tx, subject)
228 
229  def _ensure_missing(self, *batch):
230  with self.open_neo4j_session() as tx:
231  for subject in batch:
232  try:
233  link_props_utils.read(tx, subject)
234  raise AssertionError(
235  'Record {} exist (must not exist)'.format(subject))
236  except exc.DBRecordNotFound:
237  pass
238 
239  def _drop(self, subject):
240  request = share.link_props_request(subject)
241  payload = share.link_props_drop_payload(request)
242  self.feed_service(share.command(payload))
243  return payload
def feed_service(self, message, can_fail=False)
Definition: share.py:267