19 from kafka
import KafkaConsumer
21 logger = logging.getLogger(__name__)
25 group = config.KAFKA_CONSUMER_GROUP
26 topic = config.KAFKA_TOPO_ENG_TOPIC
28 bootstrap_servers = config.KAFKA_BOOTSTRAP_SERVERS
30 logger.info(
'Connecting to kafka: group=%s, topic=%s, ' 31 'bootstrap_servers=%s', group, topic, str(bootstrap_servers))
37 auto_offset_reset=
'earliest')
38 consumer.subscribe([
'{}'.
format(topic)])
39 logger.info(
'Connected to kafka')
42 except Exception
as e:
43 logger.exception(
'Can not connect to Kafka: %s', e.message)
49 message = consumer.next()
50 if message.value
is not "":
53 logger.debug(
'sleeping')
56 except Exception
as e:
57 logger.exception(
'Can not read message: %s', e.message)
def create_consumer(config)
def read_message(consumer)