Open Kilda Java Documentation
message_utils.py
Go to the documentation of this file.
1 # Copyright 2017 Telstra Open Source
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 #
15 
16 import json
17 import uuid
18 import logging
19 
20 from kafka import KafkaProducer
21 
22 import config
23 from topologylistener import model
24 
25 producer = KafkaProducer(bootstrap_servers=config.KAFKA_BOOTSTRAP_SERVERS)
26 logger = logging.getLogger(__name__)
27 
28 MT_ERROR = "org.openkilda.messaging.error.ErrorMessage"
29 MT_COMMAND = "org.openkilda.messaging.command.CommandMessage"
30 MT_COMMAND_REPLY = "org.openkilda.messaging.command.CommandWithReplyToMessage"
31 MT_INFO = "org.openkilda.messaging.info.InfoMessage"
32 MT_INFO_CHUNKED = 'org.openkilda.messaging.info.ChunkedInfoMessage'
33 MT_INFO_FLOW_STATUS = "org.openkilda.messaging.info.flow.FlowStatusResponse"
34 MT_ERROR_DATA = "org.openkilda.messaging.error.ErrorData"
35 
36 MI_LINK_PROPS_RESPONSE = (
37  'org.openkilda.messaging.te.response.LinkPropsResponse')
38 
39 
41  def to_json(self):
42  return json.dumps(self, cls=model.JSONEncoder)
43 
44 
45 class Flow(Abstract):
46  pass
47 
48 
49 def build_ingress_flow(path_nodes, src_switch, src_port, src_vlan,
50  bandwidth, transit_vlan, flow_id, output_action,
51  cookie, meter_id):
52  output_port = None
53 
54  for path_node in path_nodes:
55  if path_node['switch_id'] == src_switch:
56  output_port = int(path_node['port_no'])
57 
58  if not output_port:
59  raise ValueError('Output port was not found for ingress flow rule',
60  "path={}".format(path_nodes))
61 
62  logger.debug('build_ingress_flow: flow_id=%s, cookie=%s, src_switch=%s, src_port=%s, src_vlan=%s, transit_vlan=%s, output_port=%s, output_action=%s, bandwidth=%s, meter_id=%s',
63  flow_id, cookie, src_switch, src_port, src_vlan, transit_vlan, output_port, output_action, bandwidth, meter_id)
64 
65  flow = Flow()
66  flow.clazz = "org.openkilda.messaging.command.flow.InstallIngressFlow"
67  flow.transaction_id = 0
68  flow.flowid = flow_id
69  flow.cookie = cookie
70  flow.switch_id = src_switch
71  flow.input_port = src_port
72  flow.output_port = output_port
73  flow.input_vlan_id = src_vlan
74  flow.transit_vlan_id = transit_vlan
75  flow.output_vlan_type = output_action
76  flow.bandwidth = bandwidth
77  flow.meter_id = meter_id
78 
79  return flow
80 
81 
82 def build_ingress_flow_from_db(stored_flow, output_action):
83  return build_ingress_flow(stored_flow['flowpath']['path'],
84  stored_flow['src_switch'],
85  stored_flow['src_port'], stored_flow['src_vlan'],
86  stored_flow['bandwidth'],
87  stored_flow['transit_vlan'],
88  stored_flow['flowid'], output_action,
89  stored_flow['cookie'], stored_flow['meter_id'])
90 
91 
92 def build_egress_flow(path_nodes, dst_switch, dst_port, dst_vlan,
93  transit_vlan, flow_id, output_action, cookie):
94  input_port = None
95 
96  for path_node in path_nodes:
97  if path_node['switch_id'] == dst_switch:
98  input_port = int(path_node['port_no'])
99 
100  if not input_port:
101  raise ValueError('Input port was not found for egress flow rule',
102  "path={}".format(path_nodes))
103 
104  logger.debug('build_egress_flow: flow_id=%s, cookie=%s, dst_switch=%s, dst_port=%s, dst_vlan=%s, transit_vlan=%s, input_port=%s, output_action=%s',
105  flow_id, cookie, dst_switch, dst_port, dst_vlan, transit_vlan, input_port, output_action)
106 
107  flow = Flow()
108  flow.clazz = "org.openkilda.messaging.command.flow.InstallEgressFlow"
109  flow.transaction_id = 0
110  flow.flowid = flow_id
111  flow.cookie = cookie
112  flow.switch_id = dst_switch
113  flow.input_port = input_port
114  flow.output_port = dst_port
115  flow.transit_vlan_id = transit_vlan
116  flow.output_vlan_id = dst_vlan
117  flow.output_vlan_type = output_action
118 
119  return flow
120 
121 
122 def build_egress_flow_from_db(stored_flow, output_action, cookie):
123  print stored_flow
124  return build_egress_flow(stored_flow['flowpath']['path'],
125  stored_flow['dst_switch'], stored_flow['dst_port'],
126  stored_flow['dst_vlan'],
127  stored_flow['transit_vlan'],
128  stored_flow['flowid'], output_action,
129  cookie)
130 
131 
132 def build_intermediate_flows(switch, match, action, vlan, flow_id, cookie):
133  # output action is always NONE for transit vlan id
134 
135  logger.debug('build_intermediate_flows: flow_id=%s, cookie=%s, switch=%s, input_port=%s, output_port=%s, transit_vlan=%s',
136  flow_id, cookie, switch, match, action, vlan)
137 
138  flow = Flow()
139  flow.clazz = "org.openkilda.messaging.command.flow.InstallTransitFlow"
140  flow.transaction_id = 0
141  flow.flowid = flow_id
142  flow.cookie = cookie
143  flow.switch_id = switch
144  flow.input_port = match
145  flow.output_port = action
146  flow.transit_vlan_id = vlan
147 
148  return flow
149 
150 # TODO: A number of todos around why we have a special code parth for one switch flows
151 def build_one_switch_flow(switch, src_port, src_vlan, dst_port, dst_vlan,
152  bandwidth, flow_id, output_action, cookie,
153  meter_id):
154  logger.debug('build_one_switch_flow: flow_id=%s, cookie=%s, switch=%s, input_port=%s, output_port=%s, input_vlan_id=%s, output_vlan_id=%s, output_vlan_type=%s, bandwidth=%s, meter_id=%s',
155  flow_id, cookie, switch, src_port, dst_port, src_vlan, dst_vlan,
156  output_action, bandwidth, meter_id)
157 
158  flow = Flow()
159  flow.clazz = "org.openkilda.messaging.command.flow.InstallOneSwitchFlow"
160  flow.transaction_id = 0
161  flow.flowid = flow_id
162  flow.cookie = cookie
163  flow.switch_id = switch
164  flow.input_port = src_port
165  flow.output_port = dst_port
166  flow.input_vlan_id = src_vlan
167  flow.output_vlan_id = dst_vlan
168  flow.output_vlan_type = output_action
169  flow.bandwidth = bandwidth
170  flow.meter_id = meter_id
171 
172  return flow
173 
174 
175 def build_one_switch_flow_from_db(switch, stored_flow, output_action):
176  flow = Flow()
177  flow.clazz = "org.openkilda.messaging.command.flow.InstallOneSwitchFlow"
178  flow.transaction_id = 0
179  flow.flowid = stored_flow['flowid']
180  flow.cookie = stored_flow['cookie']
181  flow.switch_id = switch
182  flow.input_port = stored_flow['src_port']
183  flow.output_port = stored_flow['dst_port']
184  flow.input_vlan_id = stored_flow['src_vlan']
185  flow.output_vlan_id = stored_flow['dst_vlan']
186  flow.output_vlan_type = output_action
187  flow.bandwidth = stored_flow['bandwidth']
188  flow.meter_id = stored_flow['meter_id']
189 
190  return flow
191 
192 
193 def build_delete_flow(switch, flow_id, cookie, meter_id, in_port, in_vlan, out_port):
194  flow = Flow()
195  flow.clazz = "org.openkilda.messaging.command.flow.RemoveFlow"
196  flow.transaction_id = 0
197  flow.flowid = flow_id
198  flow.cookie = cookie
199  flow.switch_id = switch
200  flow.meter_id = meter_id
201  flow.criteria = {'cookie': cookie, 'in_port': in_port, 'in_vlan': in_vlan, 'out_port': out_port}
202 
203  return flow
204 
205 
207  message = Message()
208  message.clazz = "org.openkilda.messaging.info.system.FeatureTogglesResponse"
209 
210  return message
211 
212 
213 def send_dump_rules_request(switch_id, correlation_id):
214  message = Message()
215  message.clazz = 'org.openkilda.messaging.command.switches.DumpRulesRequest'
216  message.switch_id = switch_id
217  reply_to = {"reply_to": config.KAFKA_TOPO_ENG_TOPIC }
218  send_to_topic(message, correlation_id, MT_COMMAND_REPLY,
219  topic=config.KAFKA_SPEAKER_TOPIC,
220  extra=reply_to)
221 
222 
223 def send_validation_rules_response(missing_rules, excess_rules, proper_rules,
224  correlation_id):
225  message = Message()
226  message.clazz = 'org.openkilda.messaging.info.switches.SyncRulesResponse'
227  message.missing_rules = list(missing_rules)
228  message.excess_rules = list(excess_rules)
229  message.proper_rules = list(proper_rules)
230  send_to_topic(message, correlation_id, MT_INFO,
231  destination="NORTHBOUND",
232  topic=config.KAFKA_NORTHBOUND_TOPIC)
233 
234 
235 def send_sync_rules_response(installed_rules, correlation_id):
236  message = Message()
237  message.clazz = 'org.openkilda.messaging.info.switches.SyncRulesResponse'
238  message.installed_rules = list(installed_rules)
239  send_to_topic(message, correlation_id, MT_INFO,
240  destination="NORTHBOUND",
241  topic=config.KAFKA_NORTHBOUND_TOPIC)
242 
243 
244 def send_force_install_commands(switch_id, flow_commands, correlation_id):
245  message = Message()
246  message.clazz = 'org.openkilda.messaging.command.flow.BatchInstallRequest'
247  message.switch_id = switch_id
248  message.flow_commands = flow_commands
249  send_to_topic(message, correlation_id, MT_COMMAND,
250  topic=config.KAFKA_SPEAKER_TOPIC)
251 
252 
254  def add(self, vals):
255  self.__dict__.update(vals)
256 
257 
258 def send_to_topic(payload, correlation_id,
259  message_type,
260  destination="WFM",
261  topic=config.KAFKA_FLOW_TOPIC,
262  extra=None):
263  """
264  :param extra: a dict that will be added to the message. Useful for adding reply_to for Command With Reply.
265  """
266  message = Message()
267  message.payload = payload
268  message.clazz = message_type
269  message.destination = destination
270  message.timestamp = model.TimeProperty.now().as_java_timestamp()
271  message.correlation_id = correlation_id
272  if extra:
273  message.add(extra)
274  kafka_message = b'{}'.format(message.to_json())
275  logger.debug('Send message: topic=%s, message=%s', topic, kafka_message)
276  message_result = producer.send(topic, kafka_message)
277  message_result.get(timeout=5)
278 
279 
280 def send_info_message(payload, correlation_id):
281  send_to_topic(payload, correlation_id, MT_INFO)
282 
283 
284 def send_cache_message(payload, correlation_id):
285  send_to_topic(payload, correlation_id, MT_INFO, "WFM_CACHE", config.KAFKA_CACHE_TOPIC)
286 
287 
288 def send_error_message(correlation_id, error_type, error_message,
289  error_description, destination="WFM",
290  topic=config.KAFKA_FLOW_TOPIC):
291  # TODO: Who calls this .. need to pass in the right TOPIC
292  data = {"error-type": error_type,
293  "error-message": error_message,
294  "error-description": error_description,
295  "clazz": MT_ERROR_DATA}
296  send_to_topic(data, correlation_id, MT_ERROR, destination, topic)
297 
298 
299 def send_install_commands(flow_rules, correlation_id):
300  """
301  flow_utils.get_rules() creates the flow rules starting with ingress, then transit, then egress. For the install,
302  we would like to send the commands in opposite direction - egress, then transit, then ingress. Consequently,
303  the for logic should go in reverse
304  """
305  for flow_rule in reversed(flow_rules):
306  # TODO: (same as delete todo) Whereas this is part of the current workflow .. feels like we should have the workflow manager work
307  # as a hub and spoke ... ie: send delete to FL, get confirmation. Then send delete to DB, get confirmation.
308  # Then send a message to a FLOW_EVENT topic that says "FLOW CREATED"
309 
310  # TODO: sending commands directly to FL causes duplication of operations in FL, bacause WFM sends the commands too (for transaction tracking purpose)
311  # send_to_topic(flow_rule, correlation_id, MT_COMMAND,
312  # destination="CONTROLLER", topic=config.KAFKA_SPEAKER_TOPIC)
313 
314  # FIXME(surabujin): WFM reroute this message into CONTROLLER
315  send_to_topic(flow_rule, correlation_id, MT_COMMAND,
316  destination="WFM", topic=config.KAFKA_FLOW_TOPIC)
317 
318 
319 def send_delete_commands(nodes, correlation_id):
320  """
321  Build the message for each switch node in the path and send the message to both the speaker and the flow topic
322 
323  :param nodes: array of dicts: switch_id; flow_id; cookie
324  :return:
325  """
326 
327  logger.debug('Send Delete Commands: node count=%d', len(nodes))
328  for node in nodes:
329  data = build_delete_flow(str(node['switch_id']), str(node['flow_id']), node['cookie'],
330  node['meter_id'], node['in_port'], node['in_vlan'],
331  node['out_port'] )
332  # TODO: Whereas this is part of the current workflow .. feels like we should have the workflow manager work
333  # as a hub and spoke ... ie: send delete to FL, get confirmation. Then send delete to DB, get confirmation.
334  # Then send a message to a FLOW_EVENT topic that says "FLOW DELETED"
335 
336  # TODO: sending commands directly to FL causes duplication of operations in FL, bacause WFM sends the commands too (for transaction tracking purpose)
337  # send_to_topic(data, correlation_id, MT_COMMAND,
338  # destination="CONTROLLER", topic=config.KAFKA_SPEAKER_TOPIC)
339 
340  send_to_topic(data, correlation_id, MT_COMMAND,
341  destination="WFM", topic=config.KAFKA_FLOW_TOPIC)
342 
343 
344 def make_link_props_response(request, link_props, error=None):
345  return {
346  'request': request,
347  'link_props': link_props,
348  'error': error,
349  'clazz': MI_LINK_PROPS_RESPONSE}
350 
351 
352 def send_link_props_response(payload, correlation_id, chunked=False):
353  if chunked:
354  send_link_props_chunked_response([payload], correlation_id)
355  return
356 
358  payload, correlation_id, MT_INFO, destination='NORTHBOUND',
359  topic=config.KAFKA_NORTHBOUND_TOPIC)
360 
361 
362 def send_link_props_chunked_response(batch, correlation_id):
363  next_correlation_id = uuid.uuid4()
364  for payload in batch:
366  payload, correlation_id, MT_INFO_CHUNKED,
367  destination='NORTHBOUND', topic=config.KAFKA_NORTHBOUND_TOPIC,
368  extra={'next_request_id': next_correlation_id})
369  correlation_id, next_correlation_id = next_correlation_id, uuid.uuid4()
370 
371  # End chain marker
373  None, correlation_id, MT_INFO_CHUNKED,
374  destination='NORTHBOUND', topic=config.KAFKA_NORTHBOUND_TOPIC,
375  extra={'next_request_id': None})
def send_delete_commands(nodes, correlation_id)
def build_egress_flow_from_db(stored_flow, output_action, cookie)
def build_ingress_flow(path_nodes, src_switch, src_port, src_vlan, bandwidth, transit_vlan, flow_id, output_action, cookie, meter_id)
def send_install_commands(flow_rules, correlation_id)
def build_delete_flow(switch, flow_id, cookie, meter_id, in_port, in_vlan, out_port)
def send_link_props_response(payload, correlation_id, chunked=False)
def send_sync_rules_response(installed_rules, correlation_id)
def build_one_switch_flow_from_db(switch, stored_flow, output_action)
def send_dump_rules_request(switch_id, correlation_id)
def send_info_message(payload, correlation_id)
def send_to_topic(payload, correlation_id, message_type, destination="WFM", topic=config.KAFKA_FLOW_TOPIC, extra=None)
def send_cache_message(payload, correlation_id)
def send_link_props_chunked_response(batch, correlation_id)
def send_force_install_commands(switch_id, flow_commands, correlation_id)
def build_ingress_flow_from_db(stored_flow, output_action)
def build_intermediate_flows(switch, match, action, vlan, flow_id, cookie)
def make_link_props_response(request, link_props, error=None)
def send_error_message(correlation_id, error_type, error_message, error_description, destination="WFM", topic=config.KAFKA_FLOW_TOPIC)
def build_egress_flow(path_nodes, dst_switch, dst_port, dst_vlan, transit_vlan, flow_id, output_action, cookie)
def send_validation_rules_response(missing_rules, excess_rules, proper_rules, correlation_id)
def build_one_switch_flow(switch, src_port, src_vlan, dst_port, dst_vlan, bandwidth, flow_id, output_action, cookie, meter_id)