17 from kafka
import KafkaConsumer, KafkaProducer
18 import json, time, requests
20 MT_INFO =
"org.openkilda.messaging.info.InfoMessage" 21 MT_SWITCH =
"org.openkilda.messaging.info.event.SwitchInfoData" 22 MT_ISL =
"org.openkilda.messaging.info.event.IslInfoData" 24 bootstrapServer =
'kafka.pendev:9092' 27 producer = KafkaProducer(bootstrap_servers=bootstrapServer)
34 for n
in range(1, loopSize):
40 linked_id_next = switch_id + 1
41 linked_id_prev = switch_id - 1
43 if linked_id_next == loopSize:
46 if linked_id_prev < 1:
47 linked_id_prev = loopSize - 1
49 switch_id = str(switch_id).zfill(2)
50 linked_id_next = str(linked_id_next).zfill(2)
51 linked_id_prev = str(linked_id_prev).zfill(2)
54 node[
'name'] =
"00:00:00:00:00:00:00:{}".
format(switch_id)
55 outgoing_relationships = []
56 outgoing_relationships.append(
"00:00:00:00:00:00:00:{}".
format(linked_id_prev))
57 outgoing_relationships.append(
"00:00:00:00:00:00:00:{}".
format(linked_id_next))
58 outgoing_relationships.sort()
59 node[
'outgoing_relationships'] = outgoing_relationships
62 producer.send(topic, b
'{{"clazz": "{}", "timestamp": 23478952134, "payload": {{"clazz": "{}", "switch_id": "00:00:00:00:00:00:00:{}", "state": "ADDED"}}}}'.
format(MT_INFO, MT_SWITCH, switch_id))
63 producer.send(topic, b
'{{"clazz": "{}", "timestamp": 23478952136, "payload": {{' 64 b
'"clazz": "{}", "latency_ns": 1123, "path": [{{"switch_id": "00:00:00:00:00:00:00:{}", "port_no": 1, "seq_id": "0", "segment_latency": 1123}}, {{"switch_id": "00:00:00:00:00:00:00:{}", "port_no": 2, "seq_id": "1"}}]}}}}'.
format(MT_INFO, MT_ISL, switch_id, linked_id_next))
65 producer.send(topic, b
'{{"clazz": "{}", "timestamp": 23478952136, "payload": {{' 66 b
'"clazz": "{}", "latency_ns": 1123, "path": [{{"switch_id": "00:00:00:00:00:00:00:{}", "port_no": 2, "seq_id": "0", "segment_latency": 1123}}, {{"switch_id": "00:00:00:00:00:00:00:{}", "port_no": 1, "seq_id": "1"}}]}}}}'.
format(MT_INFO, MT_ISL, switch_id, linked_id_prev))
68 producer.send(topic, b
'{{"clazz": "{}", "timestamp": 23478952136, "payload": {{' 69 b
'"clazz": "{}", "latency_ns": 1123, "path": [{{"switch_id": "00:00:00:00:00:00:00:{}", "port_no": 3, "seq_id": "0", "segment_latency": 1123}}, {{"switch_id": "00:00:00:00:00:00:00:{}", "port_no": 4, "seq_id": "1"}}]}}}}'.
format(MT_INFO, MT_ISL, switch_id, linked_id_next))
70 producer.send(topic, b
'{{"clazz": "{}", "timestamp": 23478952136, "payload": {{' 71 b
'"clazz": "{}", "latency_ns": 1123, "path": [{{"switch_id": "00:00:00:00:00:00:00:{}", "port_no": 4, "seq_id": "0", "segment_latency": 1123}}, {{"switch_id": "00:00:00:00:00:00:00:{}", "port_no": 3, "seq_id": "1"}}]}}}}'.
format(MT_INFO, MT_ISL, switch_id, linked_id_prev))
74 headers = {
'Content-Type':
'application/json'}
76 result_recv = requests.get(
'http://localhost', headers=headers)
78 recv_topo = result_recv.text
79 sent_topo = json.dumps(topology, default=
lambda o: o.__dict__, sort_keys=
True)
81 if recv_topo == sent_topo:
82 print "Topology created and validated" 84 print "Error in test please check."