Open Kilda Java Documentation
messaging.py
Go to the documentation of this file.
1 # Copyright 2017 Telstra Open Source
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 #
15 
16 import kafka
17 import json
18 import pprint
19 import logging
20 import gevent
21 import sys
22 import time
23 from contextlib import contextmanager
24 
25 LOG = logging.getLogger(__name__)
26 
27 
28 def send_with_context(context, message):
29  send(context.kafka_bootstrap_servers, context.kafka_topic, message)
30 
31 
32 def send(bootstrap_servers, topic, message):
33  producer = kafka.KafkaProducer(bootstrap_servers=bootstrap_servers)
34  future = producer.send(topic, message)
35  future.get(timeout=60)
36 
37 
38 class ExitFromLoop(Exception):
39  pass
40 
41 
42 @contextmanager
43 def receive_with_context_async(context, expected_count=None):
44  records = []
45 
46  def collector(record):
47  try:
48  data = json.loads(record.value)
49  if (data['correlation_id'] == context.correlation_id and
50  data['destination'] == 'CTRL_CLIENT'):
51  LOG.debug('New message in topic:\n%s', pprint.pformat(data))
52  records.append(record)
53  if expected_count is not None and len(records) >= expected_count:
54  raise ExitFromLoop()
55  except ExitFromLoop as ex:
56  raise ex
57  except Exception:
58  LOG.exception('error on %s', record)
59 
60  progress_green_thread = gevent.spawn(progress)
61 
62  offset = get_last_offset_with_context(context)
63  green_thread = gevent.spawn(receive_with_context, context, collector,
64  offset)
65 
66  yield records
67 
68  green_thread.join(context.timeout)
69  green_thread.kill()
70  progress_green_thread.kill()
71  sys.stdout.write("\r")
72  sys.stdout.flush()
73 
74 
75 def receive_with_context(context, callback, offset=None):
76  receive(context.kafka_bootstrap_servers, context.kafka_topic, callback,
77  offset)
78 
79 
80 def receive(bootstrap_servers, topic, callback, offset):
81  consumer = kafka.KafkaConsumer(bootstrap_servers=bootstrap_servers,
82  enable_auto_commit=False)
83 
84  partition = kafka.TopicPartition(topic, 0)
85  consumer.assign([partition])
86  if offset is not None:
87  consumer.seek(partition, offset)
88  for msg in consumer:
89  try:
90  callback(msg)
91  except ExitFromLoop as ex:
92  return
93 
94 
96  consumer = kafka.KafkaConsumer(
97  bootstrap_servers=context.kafka_bootstrap_servers,
98  enable_auto_commit=False)
99 
100  partition = kafka.TopicPartition(context.kafka_topic, 0)
101  consumer.assign([partition])
102  pos = consumer.position(partition)
103  consumer.close(autocommit=False)
104  return pos
105 
106 
107 def progress():
108  while True:
109  sys.stderr.write('.')
110  sys.stderr.flush()
111  time.sleep(0.5)
def receive_with_context(context, callback, offset=None)
Definition: messaging.py:75
def receive(bootstrap_servers, topic, callback, offset)
Definition: messaging.py:80
def send_with_context(context, message)
Definition: messaging.py:28
def receive_with_context_async(context, expected_count=None)
Definition: messaging.py:43
def send(bootstrap_servers, topic, message)
Definition: messaging.py:32
def get_last_offset_with_context(context)
Definition: messaging.py:95