20 from kafka
import KafkaConsumer
22 KAFKA_BOOTSTRAP_SERVERS =
'hadoop01.staging.pen:6667' 26 parser = argparse.ArgumentParser()
27 parser.add_argument(
'server', action=
'store', help=
'Kafka server:port.')
28 parser.add_argument(
'topic', action=
'store', help=
'Kafka topic to listen on.')
29 return parser.parse_args()
33 group_uuid = uuid.uuid4()
35 auto_offset_reset=
'latest',
39 consumer.subscribe(topics)
41 for message
in consumer:
45 if __name__ ==
"__main__":
47 format=
'%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
50 logger = logging.getLogger(__name__)