Open Kilda Java Documentation
kafkareader.py
Go to the documentation of this file.
1 # Copyright 2017 Telstra Open Source
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 #
15 
16 import time
17 import logging
18 
19 from kafka import KafkaConsumer
20 
21 logger = logging.getLogger(__name__)
22 
23 
24 def create_consumer(config):
25  group = config.KAFKA_CONSUMER_GROUP
26  topic = config.KAFKA_TOPO_ENG_TOPIC
27 
28  bootstrap_servers = config.KAFKA_BOOTSTRAP_SERVERS
29 
30  logger.info('Connecting to kafka: group=%s, topic=%s, '
31  'bootstrap_servers=%s', group, topic, str(bootstrap_servers))
32 
33  while True:
34  try:
35  consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers,
36  group_id=group,
37  auto_offset_reset='earliest')
38  consumer.subscribe(['{}'.format(topic)])
39  logger.info('Connected to kafka')
40  return consumer
41 
42  except Exception as e:
43  logger.exception('Can not connect to Kafka: %s', e.message)
44  time.sleep(5)
45 
46 
47 def read_message(consumer):
48  try:
49  message = consumer.next()
50  if message.value is not "":
51  return message.value
52  else:
53  logger.debug('sleeping')
54  time.sleep(1)
55 
56  except Exception as e:
57  logger.exception('Can not read message: %s', e.message)