Open Kilda Java Documentation
service.py
Go to the documentation of this file.
1 # Copyright 2017 Telstra Open Source
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 #
15 
16 import errno
17 import json
18 import subprocess
19 import threading
20 import time
21 
22 from kilda.traffexam import context as context_module
23 from kilda.traffexam import exc
24 from kilda.traffexam import model
25 from kilda.traffexam import system
26 
27 
28 class Abstract(system.NSIPDBMixin, context_module.ContextConsumer):
29  def __init__(self, context):
30  super().__init__(context)
31  self._pool = {}
32  self._lock = threading.Lock()
33 
34  def create(self, subject):
35  with self._lock:
36  try:
37  item = self._create(subject)
38  except exc.ServiceError:
39  raise
40  except Exception as e:
41  raise exc.ServiceCreateError(self, subject) from e
42  self._pool[self.key(item)] = item
43 
44  return item
45 
46  def list(self):
47  return tuple(self._pool.values())
48 
49  def lookup(self, key):
50  try:
51  item = self._pool[key]
52  except KeyError:
53  raise exc.ServiceLookupError(self, key) from None
54  return item
55 
56  def delete(self, key, ignore_missing=False):
57  with self._lock:
58  try:
59  subject = self._pool.pop(key)
60  except KeyError:
61  if not ignore_missing:
62  raise exc.ServiceLookupError(self, key) from None
63  return
64 
65  try:
66  self._delete(subject)
67  except exc.ServiceError:
68  raise
69  except Exception as e:
70  self._pool[key] = subject
71  raise exc.ServiceDeleteError(self, key, subject) from e
72 
73  def _create(self, subject):
74  raise NotImplementedError
75 
76  def _delete(self, subject):
77  raise NotImplementedError
78 
79  def key(self, subject):
80  raise NotImplementedError
81 
82  def get_gw_iface(self):
83  return self.context.shared_registry.fetch(system.VEthPair).ns
84 
85 
87  def key(self, subject):
88  return subject.tag
89 
90  def _create(self, subject):
91  tag = self.key(subject)
92  ifname = self.make_iface_name(tag)
93  ip = self.get_ipdb()
94  with ip.create(
95  kind='vlan', ifname=ifname, vlan_id=tag,
96  link=self.get_gw_iface()) as iface:
97  iface.up()
98 
99  iface = ip.interfaces[ifname].ro
100  subject.set_iface(model.NetworkIface(
101  ifname, index=iface.index, vlan_tag=tag))
102 
103  return subject
104 
105  def _delete(self, subject):
106  tag = self.key(subject)
107  ifname = self.make_iface_name(tag)
108  with self.get_ipdb().interfaces[ifname] as iface:
109  iface.remove()
110 
111  @staticmethod
112  def make_iface_name(tag):
113  return 'vlan.{}'.format(tag)
114 
115 
117  def key(self, subject):
118  return subject.idnr
119 
120  def _create(self, subject):
121  self._check_collision(subject)
122  if subject.iface is None:
123  subject.iface = model.NetworkIface(self.get_gw_iface())
124 
125  name = subject.iface.get_ipdb_key()
126  with self.get_ipdb().interfaces[name] as iface:
127  iface.add_ip(subject.address, mask=subject.prefix)
128  return subject
129 
130  def _delete(self, subject):
131  name = subject.iface.get_ipdb_key()
132  with self.get_ipdb().interfaces[name] as iface:
133  iface.del_ip(subject.address, mask=subject.prefix)
134 
135  def _check_collision(self, subject):
136  network = subject.network
137  for address in self._pool.values():
138  if not address.network.overlaps(network):
139  continue
140 
141  raise exc.ServiceCreateCollisionError(self, subject, address)
142 
143 
145  def key(self, subject):
146  return subject.idnr
147 
148  def get_report(self, key):
149  entity = self.lookup(key)
150 
151  proc = entity.proc
152  if proc.poll() is None:
153  return None
154 
155  out = []
156  for path in (
157  self.make_report_file_name(entity),
158  self.make_error_file_name(entity)):
159  with open(str(path), 'rt') as stream:
160  out.append(stream.read())
161 
162  if not filter(bool, out):
163  return None
164 
165  report, error = self.unpack_output(out)
166  return report, error
167 
168  def _create(self, subject):
169  if isinstance(subject, model.ConsumerEndpoint):
170  self._create_consumer(subject)
171  elif isinstance(subject, model.ProducerEndpoint):
172  self._create_producer(subject)
173  else:
174  raise ValueError('Unsupported payload {!r}'.format(subject))
175  return subject
176 
177  def _delete(self, subject):
178  for file in (
179  self.make_report_file_name(subject),
180  self.make_error_file_name(subject)):
181  try:
182  file.unlink()
183  except FileNotFoundError:
184  pass
185 
186  try:
187  for attempt in range(3):
188  if subject.proc.poll() is not None:
189  break
190  subject.proc.terminate()
191  time.sleep(1)
192  else:
193  subject.proc.kill()
194  except OSError as e:
195  if e.errno != errno.ESRCH:
196  raise
197 
198  subject.proc.wait()
199 
200  if isinstance(subject, model.ConsumerEndpoint):
201  subject.bind_address.free_port(subject.bind_port)
202 
203  def _create_consumer(self, subject):
204  subject.bind_port = subject.bind_address.alloc_port()
205  cmd = self.make_cmd_common_part(subject)
206  cmd += [
207  '--server',
208  '--one-off',
209  '--bind={}'.format(subject.bind_address.address),
210  '--port={}'.format(subject.bind_port)]
211  self.run_iperf(subject, cmd)
212 
213  def _create_producer(self, subject):
214  bandwidth = subject.bandwidth * 1024
215  if subject.burst_pkt:
216  bandwidth = '{}/{}'.format(bandwidth, subject.burst_pkt)
217 
218  cmd = self.make_cmd_common_part(subject)
219  cmd += [
220  '--client={}'.format(subject.remote_address.address),
221  '--port={}'.format(subject.remote_address.port),
222  '--bandwidth={}'.format(bandwidth),
223  '--time={}'.format(subject.time),
224  '--interval=1']
225  if subject.use_udp:
226  cmd.append('--udp')
227  self.run_iperf(subject, cmd)
228 
229  def make_cmd_common_part(self, subject):
230  cmd = [
231  'ip', 'netns', 'exec', self.context.make_network_namespace_name(),
232  'iperf3', '--json', '--interval=1']
233  if subject.bind_address is not None:
234  cmd.append('--bind={}'.format(subject.bind_address.address))
235  return cmd
236 
237  def run_iperf(self, subject, cmd):
238  report = open(str(self.make_report_file_name(subject)), 'wb')
239  err = open(str(self.make_error_file_name(subject)), 'wb')
240  proc = subprocess.Popen(cmd, stdout=report, stderr=err)
241 
242  subject.set_proc(proc)
243  self.context.children.add(proc)
244 
245  def make_report_file_name(self, subject):
246  return self.context.path('{}.json'.format(subject.idnr))
247 
248  def make_error_file_name(self, subject):
249  return self.context.path('{}.err'.format(subject.idnr))
250 
251  @staticmethod
252  def unpack_output(out):
253  stdout, stderr = out
254  if not stdout:
255  return {}, stderr
256 
257  try:
258  report = json.loads(stdout)
259  except (ValueError, TypeError) as e:
260  report = {}
261  if stderr:
262  stderr += '-+' * 30 + '-\n'
263  stderr += 'Can\'t decode iperf3 output: {}\n'.format(e)
264  stderr += 'Raw iperf3 output stats on next line\n'
265  stderr += stdout
266  return report, stderr
267 
268 
269 class Adapter(object):
270  def __init__(self, context):
271  self.address = IpAddressService(context)
272  self.vlan = VLANService(context)
273  self.endpoint = EndpointService(context)
def run_iperf(self, subject, cmd)
Definition: service.py:237
def __init__(self, context)
Definition: service.py:270
def __init__(self, context)
Definition: service.py:29
def delete(self, key, ignore_missing=False)
Definition: service.py:56
def create(self, subject)
Definition: service.py:34
def _delete(self, subject)
Definition: service.py:76
def make_error_file_name(self, subject)
Definition: service.py:248
def make_cmd_common_part(self, subject)
Definition: service.py:229
def _create(self, subject)
Definition: service.py:73
def make_report_file_name(self, subject)
Definition: service.py:245
def _create_producer(self, subject)
Definition: service.py:213
def _create_consumer(self, subject)
Definition: service.py:203
def key(self, subject)
Definition: service.py:79