Open Kilda Java Documentation
eventhandler.py
Go to the documentation of this file.
1 # Copyright 2017 Telstra Open Source
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 #
15 
16 import json
17 import logging
18 import time
19 
20 import gevent
21 import gevent.pool
22 import gevent.queue
23 
24 import config
25 import kafkareader
26 from messageclasses import MessageItem
27 from topologylistener import model
28 
29 logger = logging.getLogger(__name__)
30 
31 known_messages = ['org.openkilda.messaging.info.event.SwitchInfoData',
32  'org.openkilda.messaging.info.event.SwitchInfoExtendedData',
33  'org.openkilda.messaging.info.event.IslInfoData',
34  'org.openkilda.messaging.info.event.PortInfoData',
35  'org.openkilda.messaging.info.flow.FlowInfoData',
36  'org.openkilda.messaging.info.rule.SwitchFlowEntries']
37 known_commands = ['org.openkilda.messaging.command.flow.FlowCreateRequest',
38  'org.openkilda.messaging.command.flow.FlowDeleteRequest',
39  'org.openkilda.messaging.command.flow.FlowUpdateRequest',
40  'org.openkilda.messaging.command.flow.FlowPathRequest',
41  'org.openkilda.messaging.command.flow.FlowGetRequest',
42  'org.openkilda.messaging.command.flow.FlowsGetRequest',
43  'org.openkilda.messaging.command.flow.FlowRerouteRequest',
44  'org.openkilda.messaging.command.system.FeatureToggleRequest',
45  'org.openkilda.messaging.command.system.FeatureToggleStateRequest',
46  'org.openkilda.messaging.command.switches.SwitchRulesSyncRequest',
47  'org.openkilda.messaging.command.switches.SwitchRulesValidateRequest',
48  'org.openkilda.messaging.command.discovery.NetworkCommandData',
49  'org.openkilda.messaging.command.FlowsSyncRequest',
50  'org.openkilda.messaging.te.request.LinkPropsDrop',
51  'org.openkilda.messaging.te.request.LinkPropsPut']
52 
53 
54 def main_loop():
55  # pool_size = config.getint('gevent', 'worker.pool.size')
56  # (crimi) - Setting pool_size to 1 to avoid deadlocks. This is until we are able to demonstrate that
57  # the deadlocks are able to be avoided.
58  # An improvement would be to do the DB updates on single worker, allowing everything else to
59  # happen concurrently. But expected load for 1.0 isn't great .. more than manageable with 1 worker.
60  #
61  pool_size = 1
62  pool = gevent.pool.Pool(pool_size)
63  logger.info('Started gevent pool with size %d', pool_size)
64 
65  consumer = kafkareader.create_consumer(config)
66 
67  while True:
68  try:
69  raw_event = kafkareader.read_message(consumer)
70  logger.debug('READ MESSAGE %s', raw_event)
71  event = MessageItem(json.loads(raw_event))
72 
73  if event.get_message_type() in known_messages\
74  or event.get_command() in known_commands:
75  pool.spawn(topology_event_handler, event)
76  else:
77  logger.debug('Received unknown type or command %s', raw_event)
78 
79  except Exception as e:
80  logger.exception(e.message)
81 
82 
84  event_handled = False
85 
86  attempts = 0
87  while not event_handled and attempts < 5:
88  event_handled = event.handle()
89  attempts += 1
90  if not event_handled:
91  logger.error('Unable to process event: %s', event.get_type())
92  logger.error('Message body: %s', dump_object(event))
93  time.sleep(.1)
94 
95  logger.debug('Event processed for: %s, correlation_id: %s',
96  event.get_type(), event.correlation_id)
97 
98 
99 def dump_object(o):
100  return json.dumps(o, cls=model.JSONEncoder, sort_keys=True, indent=4)