26 from messageclasses
import MessageItem
27 from topologylistener
import model
29 logger = logging.getLogger(__name__)
31 known_messages = [
'org.openkilda.messaging.info.event.SwitchInfoData',
32 'org.openkilda.messaging.info.event.SwitchInfoExtendedData',
33 'org.openkilda.messaging.info.event.IslInfoData',
34 'org.openkilda.messaging.info.event.PortInfoData',
35 'org.openkilda.messaging.info.flow.FlowInfoData',
36 'org.openkilda.messaging.info.rule.SwitchFlowEntries']
37 known_commands = [
'org.openkilda.messaging.command.flow.FlowCreateRequest',
38 'org.openkilda.messaging.command.flow.FlowDeleteRequest',
39 'org.openkilda.messaging.command.flow.FlowUpdateRequest',
40 'org.openkilda.messaging.command.flow.FlowPathRequest',
41 'org.openkilda.messaging.command.flow.FlowGetRequest',
42 'org.openkilda.messaging.command.flow.FlowsGetRequest',
43 'org.openkilda.messaging.command.flow.FlowRerouteRequest',
44 'org.openkilda.messaging.command.system.FeatureToggleRequest',
45 'org.openkilda.messaging.command.system.FeatureToggleStateRequest',
46 'org.openkilda.messaging.command.switches.SwitchRulesSyncRequest',
47 'org.openkilda.messaging.command.switches.SwitchRulesValidateRequest',
48 'org.openkilda.messaging.command.discovery.NetworkCommandData',
49 'org.openkilda.messaging.command.FlowsSyncRequest',
50 'org.openkilda.messaging.te.request.LinkPropsDrop',
51 'org.openkilda.messaging.te.request.LinkPropsPut']
62 pool = gevent.pool.Pool(pool_size)
63 logger.info(
'Started gevent pool with size %d', pool_size)
65 consumer = kafkareader.create_consumer(config)
69 raw_event = kafkareader.read_message(consumer)
70 logger.debug(
'READ MESSAGE %s', raw_event)
73 if event.get_message_type()
in known_messages\
74 or event.get_command()
in known_commands:
75 pool.spawn(topology_event_handler, event)
77 logger.debug(
'Received unknown type or command %s', raw_event)
79 except Exception
as e:
80 logger.exception(e.message)
87 while not event_handled
and attempts < 5:
88 event_handled = event.handle()
91 logger.error(
'Unable to process event: %s', event.get_type())
92 logger.error(
'Message body: %s',
dump_object(event))
95 logger.debug(
'Event processed for: %s, correlation_id: %s',
96 event.get_type(), event.correlation_id)
def topology_event_handler(event)