23 from contextlib
import contextmanager
25 LOG = logging.getLogger(__name__)
29 send(context.kafka_bootstrap_servers, context.kafka_topic, message)
32 def send(bootstrap_servers, topic, message):
33 producer = kafka.KafkaProducer(bootstrap_servers=bootstrap_servers)
34 future = producer.send(topic, message)
35 future.get(timeout=60)
46 def collector(record):
48 data = json.loads(record.value)
49 if (data[
'correlation_id'] == context.correlation_id
and 50 data[
'destination'] ==
'CTRL_CLIENT'):
51 LOG.debug(
'New message in topic:\n%s', pprint.pformat(data))
52 records.append(record)
53 if expected_count
is not None and len(records) >= expected_count:
55 except ExitFromLoop
as ex:
58 LOG.exception(
'error on %s', record)
60 progress_green_thread = gevent.spawn(progress)
63 green_thread = gevent.spawn(receive_with_context, context, collector,
68 green_thread.join(context.timeout)
70 progress_green_thread.kill()
71 sys.stdout.write(
"\r")
76 receive(context.kafka_bootstrap_servers, context.kafka_topic, callback,
80 def receive(bootstrap_servers, topic, callback, offset):
81 consumer = kafka.KafkaConsumer(bootstrap_servers=bootstrap_servers,
82 enable_auto_commit=
False)
84 partition = kafka.TopicPartition(topic, 0)
85 consumer.assign([partition])
86 if offset
is not None:
87 consumer.seek(partition, offset)
91 except ExitFromLoop
as ex:
96 consumer = kafka.KafkaConsumer(
97 bootstrap_servers=context.kafka_bootstrap_servers,
98 enable_auto_commit=
False)
100 partition = kafka.TopicPartition(context.kafka_topic, 0)
101 consumer.assign([partition])
102 pos = consumer.position(partition)
103 consumer.close(autocommit=
False)
109 sys.stderr.write(
'.')
def receive_with_context(context, callback, offset=None)
def receive(bootstrap_servers, topic, callback, offset)
def send_with_context(context, message)
def receive_with_context_async(context, expected_count=None)
def send(bootstrap_servers, topic, message)
def get_last_offset_with_context(context)