17 from __future__
import absolute_import
19 from bottle
import get, post, request, run, Bottle, response, request, install
20 from mininet.net
import Mininet
21 from mininet.node
import RemoteController, OVSKernelSwitch, Host, OVSSwitch
22 from mininet.clean
import cleanup
23 from mininet.link
import TCLink
24 from mininet.util
import errRun
26 from jsonschema
import validate
28 from logging.config
import dictConfig
35 from functools
import wraps
44 "Add the OpenFlow13 Protocol" 46 params[
'protocols'] =
'OpenFlow13' 47 OVSSwitch.__init__(self, name, **params)
52 Mininet looks for this during stop(). It exists in OVSSwitch. Kilda, at the moment, 53 doesn't like this batch operation (and it shouldn't be done in batch) 55 logger.info (
"IGNORE batchStartup()")
56 for switch
in switches:
58 logger.warn (
" .... BATCH = TRUE !!!!!!")
64 Mininet looks for this during stop(). It exists in OVSSwitch. Kilda, at the moment, 65 doesn't like this batch operation (and it shouldn't be done in batch) 67 logger.info (
"IGNORE batchShutdown()")
68 for switch
in switches:
70 logger.warn (
" .... BATCH = TRUE !!!!!!")
76 Wrap a Bottle request so that a log line is emitted after it's handled. 77 (This decorator can be extended to take the desired logger as a param.) 80 def _log_to_logger(*args, **kwargs):
81 actual_response = fn(*args, **kwargs)
82 logger.info(
'%s %s %s %s' % (request.remote_addr,
86 return actual_response
94 if request.query.get(_)
is None:
96 return "%s: %s must be specified\n" % (request.path, _)
97 return __(dict([(_, request.query.get(_))
for _
in pars]))
102 install(log_to_logger)
105 "$schema":
"http://json-schema.org/draft-04/schema#",
175 "$schema":
"http://json-schema.org/draft-04/schema#",
203 "$schema":
"http://json-schema.org/draft-04/schema#",
230 controllers_schema = {
231 "$schema":
"http://json-schema.org/draft-04/schema#",
264 return {
"name": controller.name,
265 "host": controller.ip,
266 "port": controller.port}
275 if len(switch.intfs) > 0:
276 for i
in switch.intfs:
277 intf = switch.intfs[i]
278 intfs.append({
'name': intf.name,
280 'status': intf.status()})
281 return {
'name': switch.name,
283 'connected': switch.connected(),
296 name =
"{}:{}".
format(link.intf1.name, link.intf2.name)
297 if link.intf1.name < link.intf2.name:
298 name =
"{}:{}".
format(link.intf2.name, link.intf1.name)
303 return {
'name':
link_name(link),
'status': link.status()}
314 logger.info(
"*** Creating Topology" )
315 validate(request.json, topology_schema)
316 net = Mininet( controller=RemoteController, switch=KildaSwitch, build=
False )
319 logger.info(
"*** Creating (Remote) controllers" )
320 for controller
in request.json[
'controllers']:
321 name = controller[
'name']
322 host = controller[
'host']
323 port = controller[
'port']
324 logger.info(
"===> adding controller name={}, host={}, port={}".
format(name, host, port))
325 ip = socket.gethostbyname(host)
326 net.addController (name, ip=ip, port=port)
329 logger.info(
"*** Creating switches" )
330 for switch
in request.json[
'switches']:
331 name = switch[
'name']
332 dpid = switch[
'dpid']
333 if type(dpid)
is unicode:
334 dpid = dpid.encode(
'ascii',
'ignore')
335 logger.info(
"===> adding switch name={}, dpid={}".
format(name, dpid))
336 net.addSwitch( name=name, dpid=dpid )
339 logger.info(
"*** Creating Switch:Switch links" )
340 for link
in request.json[
'links']:
341 node1 = link[
'node1']
342 node2 = link[
'node2']
343 logger.info(
"===> adding link {} -> {}".
format(node1, node2))
344 net.addLink( node1, node2 )
347 logger.info(
"*** Creating hosts\n" )
348 for switch
in net.switches:
350 h = net.addHost(
'h%s' % switch.name)
351 net.addLink(h, switch)
354 logger.info(
"*** Starting network" )
358 response.content_type =
'application/json' 359 result = json.dumps({
'controllers': [
controller_info(x)
for x
in net.controllers],
360 'switches': [
switch_info(x)
for x
in net.switches],
361 'links': [
link_info(x)
for x
in net.links]})
363 logger.info (
"*** returning {}".
format(result))
373 @post(
'/create_random_linear_topology')
388 needs_to_be_refactored =
True 389 if needs_to_be_refactored:
390 return json.dumps({
'status':
'refactor me'})
395 switch_count = request.json[
'switches']
396 link_count = request.json[
'links']
397 num_worker_threads = request.json.get(
'threads', 10)
398 logger.debug(
"==> switch count={}; link count={}; num threads = {}".
399 format(switch_count,link_count, num_worker_threads))
401 add_controllers(request.json[
'controllers'])
406 names = Queue.Queue()
407 for i
in range(switch_count):
408 name =
"s" + str(i+1)
411 def add_switch_worker():
414 switch = OVSKernelSwitch(name, protocols=
'OpenFlow13', inNamespace=
False)
415 switches[name] = switch
416 switch.start(controllers)
418 logger.debug(
"==> added switch name={}".
format(name))
421 if num_worker_threads > len(_switch_threads):
422 logger.debug(
"==> Starting Switch Threads {} .. {}".
423 format(len(_switch_threads),num_worker_threads))
424 for i
in range(len(_switch_threads), num_worker_threads):
425 t = threading.Thread(target=add_switch_worker)
427 _switch_threads.append(t)
430 logger.debug(
"==> Num Switch Threads is >= num_worker_threads {},{}".
431 format(len(_switch_threads),num_worker_threads))
439 ep_tuples = Queue.Queue()
446 for i
in range(switch_count-1):
447 ep1 =
"s" + str(i+1);
448 ep2 =
"s" + str(i+2);
452 lock = threading.Lock()
454 for i
in range(switch_count):
455 sid =
"s" + str(i+1);
456 switch_locks[sid] = threading.Lock()
458 def add_link_worker():
460 if ep_tuples.qsize() == 0:
464 success1 = success2 =
False 467 logger.debug(
"==> ++++ lock {}".
format(threading.current_thread().getName()))
468 for _
in range (ep_tuples.qsize()):
469 ept = ep_tuples.get()
471 success1 = switch_locks[ept[0]].acquire(
False)
472 success2 = switch_locks[ept[1]].acquire(
False)
473 logger.debug(
"==> switches ???? {} {} {} {}".
format(ept[0],ept[1],success1, success2))
475 if success1
and success2:
477 if success1: switch_locks[ept[0]].release()
478 if success2: switch_locks[ept[1]].release()
480 ep_tuples.task_done()
483 logger.debug(
"==> ---- lock {}".
format(threading.current_thread().getName()))
488 if success1
and success2:
491 link = TCLink(switches[ept[0]], switches[ept[1]])
492 link.intf1.node.attach(link.intf1)
493 link.intf2.node.attach(link.intf2)
496 logger.debug(
"==> switches ++++ {} {}".
format(ept[0],ept[1]))
499 logger.debug(
"==>==> ## ERROR adding link, putting back on queue={}".
format(
505 switch_locks[ept[0]].release()
506 switch_locks[ept[1]].release()
507 ep_tuples.task_done()
510 if num_worker_threads > len(_link_threads):
511 logger.debug(
"==> Starting Link Threads {} .. {}".
512 format(len(_link_threads),num_worker_threads))
513 for i
in range(len(_link_threads),num_worker_threads):
514 t = threading.Thread(target=add_link_worker)
516 _link_threads.append(t)
519 logger.debug(
"==> Num Link Threads is >= num_worker_threads {},{}".
520 format(len(_link_threads),num_worker_threads))
523 while not ep_tuples.empty():
525 logger.debug(
"==> LINKS: {} ".
format(len(links)))
526 logger.debug(
"==> QUEUE: {} {}".
format(ep_tuples.qsize(), ep_tuples.empty()))
528 response.content_type =
'application/json' 529 return json.dumps({
'status':
'ok'})
532 @
get(
'/switch/<name>')
534 response.content_type =
'application/json' 540 response.content_type =
'appliation/json' 546 validate(request.json, switches_schema)
547 add_switches(request.json[
'switches'])
548 response.content_type =
'application/json' 554 response.content_type =
'application/json' 560 validate(request.json, links_schema)
561 add_links(request.json[
'links'])
562 response.content_type =
'application/json' 568 response.content_type =
'application/json' 574 validate(request.json, controllers_schema)
575 add_controllers(request.json[
'controllers'])
576 response.content_type =
'application/json' 583 logger.info(
"*** Clean Topology" )
585 logger.info(
"--> calling mininet.stop()" )
589 return {
'status':
'ok'}
594 return {
'status':
'ok'}
598 run(host=
'0.0.0.0', port=port, debug=
True)
613 This is to setup rules for host to switch / switch to host. 614 It honors what we are trying to accomlish with testing Kilda: 615 1) Kilda will put rules on one or more switches 616 2) This code will put rules on the switches outside that set. For instance, if we are 617 testing Kilda in a single switch scenario (s3), then this code will be used 618 to put rules on s2 and s4 so that s3 can be tested properly. 619 3) To keep things simple, we leverage the host attached to s2 and s4 to do a ping 620 4) These rules setup the host to switch port as in, and then the switch to be tested as the 626 result =
"strip_vlan," 629 result =
"push_vlan:0x8100,mod_vlan_vid:{},".
format(out_vlan)
631 result =
"mod_vlan_vid:{},".
format(out_vlan)
636 """add reciprocal rules to a switch to emulate kilda rules""" 638 logger.info(
"** Adding flows to {}".
format(switch_id))
640 in_match =
"" if in_vlan == 0
else ",dl_vlan={}".
format(in_vlan)
641 out_match =
"" if out_vlan == 0
else ",dl_vlan={}".
format(out_vlan)
646 noise =
"idle_timeout=0,priority=1000" 647 in_rule =
"{},in_port={}{},actions={}output:{}".
format(noise, in_port, in_match, in_action, out_port)
648 out_rule =
"{},in_port={}{},actions={}output:{}".
format(noise, out_port, out_match, out_action, in_port)
649 print(
"ingress rule: {}".
format(in_rule))
650 print(
"egress rule: {}".
format(out_rule))
652 subprocess.Popen([
"ovs-ofctl",
"-O",
"OpenFlow13",
"add-flow",switch_id,in_rule],
653 stdout=subprocess.PIPE).wait()
655 subprocess.Popen([
"ovs-ofctl",
"-O",
"OpenFlow13",
"add-flow",switch_id,out_rule],
656 stdout=subprocess.PIPE).wait()
665 """remove rules from switch 3 to emulate kilda clear rules""" 666 print(
"** Remove flows from {}".
format(switch_id))
667 in_rule =
"in_port={}".
format(in_port)
668 out_rule =
"in_port={}".
format(out_port)
669 subprocess.Popen([
"ovs-ofctl",
"-O",
"OpenFlow13",
"del-flows",switch_id,in_rule],
670 stdout=subprocess.PIPE).wait()
671 subprocess.Popen([
"ovs-ofctl",
"-O",
"OpenFlow13",
"del-flows",switch_id,out_rule],
672 stdout=subprocess.PIPE).wait()
681 result = host1.cmd(
'ping -c1 -w1 %s' % (host2.IP()) )
682 lines = result.split(
"\n")
683 if "1 packets received" in lines[3]:
684 print "CONNECTION BETWEEN ", host1.IP(),
"and", host2.IP()
687 print "NO CONNECTION BETWEEN ", host1.IP(),
"and", host2.IP()
691 @
get(
'/checkpingtraffic')
697 1) add host/switch ingress/egress rules on Src and Dst 699 3) remove host/switch ingress/egress rules on Src and Dst 709 src_switch = p[
'srcswitch']
710 src_port = p[
'srcport']
711 src_vlan = int(p[
'srcvlan'])
712 dst_switch = p[
'dstswitch']
713 dst_port = p[
'dstport']
714 dst_vlan = int(p[
'dstvlan'])
716 logger.info(
"** PING request received: src={}:{}x{} dst={}:{}x{}".
format(
717 src_switch,src_port,src_vlan,dst_switch,dst_port,dst_vlan
721 src_host_port = 2
if src_switch ==
"00000001" else 3
724 logger.info (
"--> adding host/switch rules" )
731 logger.info (
"--> ping" )
732 src_host = net.nameToNode[
"h%s" % src_switch]
733 dst_host = net.nameToNode[
"h%s" % dst_switch]
734 successful_ping =
pingable(src_host, dst_host)
736 logger.info (
"--> remove host/switch rules" )
741 response.status = 200
744 response.status = 503
748 ofctl_start=
'ovs-ofctl -O OpenFlow13 add-flow' 750 @
get(
"/add_default_flows")
754 cmd1 =
"%s %s idle_timeout=0,priority=1,actions=drop" % (ofctl_start, switch)
755 cmd2 =
"%s %s idle_timeout=0,priority=2,dl_type=0x88cc,action=output:controller" % (ofctl_start, switch)
756 result1 = os.system(cmd1)
757 result2 = os.system(cmd2)
758 return {
'result1': result1,
'result2': result2}
761 @
get(
"/add_ingress_flow")
767 outport = p[
'outport']
768 priority = p[
'priority']
770 cmd1 =
"%s %s idle_timeout=0,priority=%s,in_port=%s,actions=push_vlan:0x8100,mod_vlan_vid:%s,output:%s" % \
771 (ofctl_start, switch,priority,inport,vlan,outport)
772 result1 = os.system(cmd1)
773 return {
'result1': result1}
776 @
get(
"/add_egress_flow")
782 outport = p[
'outport']
783 priority = p[
'priority']
785 cmd1 =
"%s %s idle_timeout=0,priority=%s,in_port=%s,dl_vlan=%s,actions=strip_vlan,output:%s" % \
786 (ofctl_start, switch,priority,inport,vlan,outport)
787 result1 = os.system(cmd1)
788 return {
'result1': result1}
791 @
get(
"/add_transit_flow")
797 outport = p[
'outport']
798 priority = p[
'priority']
800 cmd1 =
"%s %s idle_timeout=0,priority=%s,in_port=%s,dl_vlan=%s,actions=output:%s" % \
801 (ofctl_start, switch,priority,inport,vlan,outport)
802 result1 = os.system(cmd1)
803 return {
'result1': result1}
807 """Get the global variables defined and initialized""" 810 with open(
"/app/log.json",
"r") as fd: 811 logging.config.dictConfig(json.load(fd)) 813 logger = logging.getLogger() 821 if __name__ ==
'__main__':
def batchStartup(cls, switches, run=errRun)
def start_server(interface, port)
def check_ping_traffic(p)
def controller_info(controller)
def required_parameters(pars)
def pingable(host1, host2)
def get_output_actions(in_vlan, out_vlan)
Flow Debugging Section.
def clear_single_switch_rules(switch_id, in_port, out_port)
def __init__(self, name, params)
def add_single_switch_rules(switch_id, in_port, out_port, in_vlan=0, out_vlan=0)
def batchShutdown(cls, switches, run=errRun)