20 from kafka
import KafkaProducer
23 from topologylistener
import model
25 producer = KafkaProducer(bootstrap_servers=config.KAFKA_BOOTSTRAP_SERVERS)
26 logger = logging.getLogger(__name__)
28 MT_ERROR =
"org.openkilda.messaging.error.ErrorMessage" 29 MT_COMMAND =
"org.openkilda.messaging.command.CommandMessage" 30 MT_COMMAND_REPLY =
"org.openkilda.messaging.command.CommandWithReplyToMessage" 31 MT_INFO =
"org.openkilda.messaging.info.InfoMessage" 32 MT_INFO_CHUNKED =
'org.openkilda.messaging.info.ChunkedInfoMessage' 33 MT_INFO_FLOW_STATUS =
"org.openkilda.messaging.info.flow.FlowStatusResponse" 34 MT_ERROR_DATA =
"org.openkilda.messaging.error.ErrorData" 36 MI_LINK_PROPS_RESPONSE = (
37 'org.openkilda.messaging.te.response.LinkPropsResponse')
50 bandwidth, transit_vlan, flow_id, output_action,
54 for path_node
in path_nodes:
55 if path_node[
'switch_id'] == src_switch:
56 output_port = int(path_node[
'port_no'])
59 raise ValueError(
'Output port was not found for ingress flow rule',
60 "path={}".
format(path_nodes))
62 logger.debug(
'build_ingress_flow: flow_id=%s, cookie=%s, src_switch=%s, src_port=%s, src_vlan=%s, transit_vlan=%s, output_port=%s, output_action=%s, bandwidth=%s, meter_id=%s',
63 flow_id, cookie, src_switch, src_port, src_vlan, transit_vlan, output_port, output_action, bandwidth, meter_id)
66 flow.clazz =
"org.openkilda.messaging.command.flow.InstallIngressFlow" 67 flow.transaction_id = 0
70 flow.switch_id = src_switch
71 flow.input_port = src_port
72 flow.output_port = output_port
73 flow.input_vlan_id = src_vlan
74 flow.transit_vlan_id = transit_vlan
75 flow.output_vlan_type = output_action
76 flow.bandwidth = bandwidth
77 flow.meter_id = meter_id
84 stored_flow[
'src_switch'],
85 stored_flow[
'src_port'], stored_flow[
'src_vlan'],
86 stored_flow[
'bandwidth'],
87 stored_flow[
'transit_vlan'],
88 stored_flow[
'flowid'], output_action,
89 stored_flow[
'cookie'], stored_flow[
'meter_id'])
93 transit_vlan, flow_id, output_action, cookie):
96 for path_node
in path_nodes:
97 if path_node[
'switch_id'] == dst_switch:
98 input_port = int(path_node[
'port_no'])
101 raise ValueError(
'Input port was not found for egress flow rule',
102 "path={}".
format(path_nodes))
104 logger.debug(
'build_egress_flow: flow_id=%s, cookie=%s, dst_switch=%s, dst_port=%s, dst_vlan=%s, transit_vlan=%s, input_port=%s, output_action=%s',
105 flow_id, cookie, dst_switch, dst_port, dst_vlan, transit_vlan, input_port, output_action)
108 flow.clazz =
"org.openkilda.messaging.command.flow.InstallEgressFlow" 109 flow.transaction_id = 0
110 flow.flowid = flow_id
112 flow.switch_id = dst_switch
113 flow.input_port = input_port
114 flow.output_port = dst_port
115 flow.transit_vlan_id = transit_vlan
116 flow.output_vlan_id = dst_vlan
117 flow.output_vlan_type = output_action
125 stored_flow[
'dst_switch'], stored_flow[
'dst_port'],
126 stored_flow[
'dst_vlan'],
127 stored_flow[
'transit_vlan'],
128 stored_flow[
'flowid'], output_action,
135 logger.debug(
'build_intermediate_flows: flow_id=%s, cookie=%s, switch=%s, input_port=%s, output_port=%s, transit_vlan=%s',
136 flow_id, cookie, switch, match, action, vlan)
139 flow.clazz =
"org.openkilda.messaging.command.flow.InstallTransitFlow" 140 flow.transaction_id = 0
141 flow.flowid = flow_id
143 flow.switch_id = switch
144 flow.input_port = match
145 flow.output_port = action
146 flow.transit_vlan_id = vlan
152 bandwidth, flow_id, output_action, cookie,
154 logger.debug(
'build_one_switch_flow: flow_id=%s, cookie=%s, switch=%s, input_port=%s, output_port=%s, input_vlan_id=%s, output_vlan_id=%s, output_vlan_type=%s, bandwidth=%s, meter_id=%s',
155 flow_id, cookie, switch, src_port, dst_port, src_vlan, dst_vlan,
156 output_action, bandwidth, meter_id)
159 flow.clazz =
"org.openkilda.messaging.command.flow.InstallOneSwitchFlow" 160 flow.transaction_id = 0
161 flow.flowid = flow_id
163 flow.switch_id = switch
164 flow.input_port = src_port
165 flow.output_port = dst_port
166 flow.input_vlan_id = src_vlan
167 flow.output_vlan_id = dst_vlan
168 flow.output_vlan_type = output_action
169 flow.bandwidth = bandwidth
170 flow.meter_id = meter_id
177 flow.clazz =
"org.openkilda.messaging.command.flow.InstallOneSwitchFlow" 178 flow.transaction_id = 0
179 flow.flowid = stored_flow[
'flowid']
180 flow.cookie = stored_flow[
'cookie']
181 flow.switch_id = switch
182 flow.input_port = stored_flow[
'src_port']
183 flow.output_port = stored_flow[
'dst_port']
184 flow.input_vlan_id = stored_flow[
'src_vlan']
185 flow.output_vlan_id = stored_flow[
'dst_vlan']
186 flow.output_vlan_type = output_action
187 flow.bandwidth = stored_flow[
'bandwidth']
188 flow.meter_id = stored_flow[
'meter_id']
195 flow.clazz =
"org.openkilda.messaging.command.flow.RemoveFlow" 196 flow.transaction_id = 0
197 flow.flowid = flow_id
199 flow.switch_id = switch
200 flow.meter_id = meter_id
201 flow.criteria = {
'cookie': cookie,
'in_port': in_port,
'in_vlan': in_vlan,
'out_port': out_port}
208 message.clazz =
"org.openkilda.messaging.info.system.FeatureTogglesResponse" 215 message.clazz =
'org.openkilda.messaging.command.switches.DumpRulesRequest' 216 message.switch_id = switch_id
217 reply_to = {
"reply_to": config.KAFKA_TOPO_ENG_TOPIC }
219 topic=config.KAFKA_SPEAKER_TOPIC,
226 message.clazz =
'org.openkilda.messaging.info.switches.SyncRulesResponse' 227 message.missing_rules = list(missing_rules)
228 message.excess_rules = list(excess_rules)
229 message.proper_rules = list(proper_rules)
231 destination=
"NORTHBOUND",
232 topic=config.KAFKA_NORTHBOUND_TOPIC)
237 message.clazz =
'org.openkilda.messaging.info.switches.SyncRulesResponse' 238 message.installed_rules = list(installed_rules)
240 destination=
"NORTHBOUND",
241 topic=config.KAFKA_NORTHBOUND_TOPIC)
246 message.clazz =
'org.openkilda.messaging.command.flow.BatchInstallRequest' 247 message.switch_id = switch_id
248 message.flow_commands = flow_commands
250 topic=config.KAFKA_SPEAKER_TOPIC)
255 self.__dict__.update(vals)
261 topic=config.KAFKA_FLOW_TOPIC,
264 :param extra: a dict that will be added to the message. Useful for adding reply_to for Command With Reply. 267 message.payload = payload
268 message.clazz = message_type
269 message.destination = destination
270 message.timestamp = model.TimeProperty.now().as_java_timestamp()
271 message.correlation_id = correlation_id
274 kafka_message = b
'{}'.
format(message.to_json())
275 logger.debug(
'Send message: topic=%s, message=%s', topic, kafka_message)
276 message_result = producer.send(topic, kafka_message)
277 message_result.get(timeout=5)
285 send_to_topic(payload, correlation_id, MT_INFO,
"WFM_CACHE", config.KAFKA_CACHE_TOPIC)
289 error_description, destination="WFM",
290 topic=config.KAFKA_FLOW_TOPIC):
292 data = {
"error-type": error_type,
293 "error-message": error_message,
294 "error-description": error_description,
295 "clazz": MT_ERROR_DATA}
296 send_to_topic(data, correlation_id, MT_ERROR, destination, topic)
301 flow_utils.get_rules() creates the flow rules starting with ingress, then transit, then egress. For the install, 302 we would like to send the commands in opposite direction - egress, then transit, then ingress. Consequently, 303 the for logic should go in reverse 305 for flow_rule
in reversed(flow_rules):
316 destination=
"WFM", topic=config.KAFKA_FLOW_TOPIC)
321 Build the message for each switch node in the path and send the message to both the speaker and the flow topic 323 :param nodes: array of dicts: switch_id; flow_id; cookie 327 logger.debug(
'Send Delete Commands: node count=%d', len(nodes))
329 data =
build_delete_flow(str(node[
'switch_id']), str(node[
'flow_id']), node[
'cookie'],
330 node[
'meter_id'], node[
'in_port'], node[
'in_vlan'],
341 destination=
"WFM", topic=config.KAFKA_FLOW_TOPIC)
347 'link_props': link_props,
349 'clazz': MI_LINK_PROPS_RESPONSE}
358 payload, correlation_id, MT_INFO, destination=
'NORTHBOUND',
359 topic=config.KAFKA_NORTHBOUND_TOPIC)
363 next_correlation_id = uuid.uuid4()
364 for payload
in batch:
366 payload, correlation_id, MT_INFO_CHUNKED,
367 destination=
'NORTHBOUND', topic=config.KAFKA_NORTHBOUND_TOPIC,
368 extra={
'next_request_id': next_correlation_id})
369 correlation_id, next_correlation_id = next_correlation_id, uuid.uuid4()
373 None, correlation_id, MT_INFO_CHUNKED,
374 destination=
'NORTHBOUND', topic=config.KAFKA_NORTHBOUND_TOPIC,
375 extra={
'next_request_id':
None})
def send_delete_commands(nodes, correlation_id)
def build_egress_flow_from_db(stored_flow, output_action, cookie)
def build_ingress_flow(path_nodes, src_switch, src_port, src_vlan, bandwidth, transit_vlan, flow_id, output_action, cookie, meter_id)
def send_install_commands(flow_rules, correlation_id)
def make_features_status_response()
def build_delete_flow(switch, flow_id, cookie, meter_id, in_port, in_vlan, out_port)
def send_link_props_response(payload, correlation_id, chunked=False)
def send_sync_rules_response(installed_rules, correlation_id)
def build_one_switch_flow_from_db(switch, stored_flow, output_action)
def send_dump_rules_request(switch_id, correlation_id)
def send_info_message(payload, correlation_id)
def send_to_topic(payload, correlation_id, message_type, destination="WFM", topic=config.KAFKA_FLOW_TOPIC, extra=None)
def send_cache_message(payload, correlation_id)
def send_link_props_chunked_response(batch, correlation_id)
def send_force_install_commands(switch_id, flow_commands, correlation_id)
def build_ingress_flow_from_db(stored_flow, output_action)
def build_intermediate_flows(switch, match, action, vlan, flow_id, cookie)
def make_link_props_response(request, link_props, error=None)
def send_error_message(correlation_id, error_type, error_message, error_description, destination="WFM", topic=config.KAFKA_FLOW_TOPIC)
def build_egress_flow(path_nodes, dst_switch, dst_port, dst_vlan, transit_vlan, flow_id, output_action, cookie)
def send_validation_rules_response(missing_rules, excess_rules, proper_rules, correlation_id)
def build_one_switch_flow(switch, src_port, src_vlan, dst_port, dst_vlan, bandwidth, flow_id, output_action, cookie, meter_id)