#!/usr/local/bin/python2.7
import os
+import threading
from addr import *
from scapy.all import *
+class Sniff(threading.Thread):
+ filter = None
+ captured = None
+ packet = None
+ def __init__(self):
+ # clear packets buffered by scapy bpf
+ sniff(iface=LOCAL_IF, timeout=1)
+ super(Sniff, self).__init__()
+ def run(self):
+ self.captured = sniff(iface=LOCAL_IF, filter=self.filter,
+ timeout=3)
+ if self.captured:
+ self.packet = self.captured[0]
+
e=Ether(src=LOCAL_MAC, dst=REMOTE_MAC)
ip6=IPv6(src=FAKE_NET_ADDR6, dst=REMOTE_ADDR6)
tport=os.getpid() & 0xffff
print "Fill our receive buffer."
time.sleep(1)
+# srp1 cannot be used, fragment answer will not match outgoing ICMP6 packet
+sniffer = Sniff()
+sniffer.filter = \
+ "ip6 and src "+ip6.dst+" and dst "+ip6.src+" and proto ipv6-frag"
+sniffer.start()
+time.sleep(1)
+
print "Send ICMP6 packet too big packet with MTU 1272."
icmp6=ICMPv6PacketTooBig(mtu=1272)/data.payload
-# srp1 cannot be used, fragment answer will not match outgoing ICMP6 packet
-if os.fork() == 0:
- time.sleep(1)
- sendp(e/IPv6(src=LOCAL_ADDR6, dst=REMOTE_ADDR6)/icmp6, iface=LOCAL_IF)
- os._exit(0)
+sendp(e/IPv6(src=LOCAL_ADDR6, dst=REMOTE_ADDR6)/icmp6, iface=LOCAL_IF)
print "Path MTU discovery will not resend data, ICMP6 packet is ignored."
-ans=sniff(iface=LOCAL_IF, timeout=3, filter=
- "ip6 and src "+ip6.dst+" and dst "+ip6.src+" and proto ipv6-frag")
+sniffer.join(timeout=5)
print "IPv6 atomic fragments must not be generated."
frag=None
-for a in ans:
+for a in sniffer.captured:
fh=a.payload.payload
if fh.offset != 0 or fh.nh != (ip6/syn).nh:
continue
#!/usr/local/bin/python2.7
import os
+import threading
from addr import *
from scapy.all import *
+class Sniff1(threading.Thread):
+ filter = None
+ captured = None
+ packet = None
+ def __init__(self):
+ # clear packets buffered by scapy bpf
+ sniff(iface=LOCAL_IF, timeout=1)
+ super(Sniff1, self).__init__()
+ def run(self):
+ self.captured = sniff(iface=LOCAL_IF, filter=self.filter,
+ count=1, timeout=3)
+ if self.captured:
+ self.packet = self.captured[0]
+
ip=IP(src=FAKE_NET_ADDR, dst=REMOTE_ADDR)
tport=os.getpid() & 0xffff
print "Fill our receive buffer."
time.sleep(1)
+# sr1 cannot be used, TCP data will not match outgoing ICMP packet
+sniffer = Sniff1()
+sniffer.filter = \
+ "ip and src %s and tcp port %u and dst %s and tcp port %u" % \
+ (ip.dst, syn.dport, ip.src, syn.sport)
+sniffer.start()
+time.sleep(1)
+
print "Send ICMP fragmentation needed packet with MTU 1300."
icmp=ICMP(type="dest-unreach", code="fragmentation-needed",
nexthopmtu=1300)/data
-# sr1 cannot be used, TCP data will not match outgoing ICMP packet
-if os.fork() == 0:
- time.sleep(1)
- send(IP(src=LOCAL_ADDR, dst=REMOTE_ADDR)/icmp, iface=LOCAL_IF)
- os._exit(0)
+send(IP(src=LOCAL_ADDR, dst=REMOTE_ADDR)/icmp, iface=LOCAL_IF)
print "Path MTU discovery will resend first data with length 1300."
-ans=sniff(iface=LOCAL_IF, timeout=3, count=1, filter=
- "ip and src %s and tcp port %u and dst %s and tcp port %u" %
- (ip.dst, syn.dport, ip.src, syn.sport))
+sniffer.join(timeout=5)
+ans = sniffer.packet
if len(ans) == 0:
print "ERROR: no data retransmit from chargen server received"
#!/usr/local/bin/python2.7
import os
+import threading
from addr import *
from scapy.all import *
+class Sniff1(threading.Thread):
+ filter = None
+ captured = None
+ packet = None
+ def __init__(self):
+ # clear packets buffered by scapy bpf
+ sniff(iface=LOCAL_IF, timeout=1)
+ super(Sniff1, self).__init__()
+ def run(self):
+ self.captured = sniff(iface=LOCAL_IF, filter=self.filter,
+ count=1, timeout=3)
+ if self.captured:
+ self.packet = self.captured[0]
+
e=Ether(src=LOCAL_MAC, dst=REMOTE_MAC)
ip6=IPv6(src=FAKE_NET_ADDR6, dst=REMOTE_ADDR6)
tport=os.getpid() & 0xffff
print "Fill our receive buffer."
time.sleep(1)
+# srp1 cannot be used, TCP data will not match outgoing ICMP6 packet
+sniffer = Sniff1()
+sniffer.filter = \
+ "ip6 and src %s and tcp port %u and dst %s and tcp port %u" % \
+ (ip6.dst, syn.dport, ip6.src, syn.sport)
+sniffer.start()
+time.sleep(1)
+
print "Send ICMP6 packet too big packet with MTU 1300."
icmp6=ICMPv6PacketTooBig(mtu=1300)/data.payload
-# srp1 cannot be used, TCP data will not match outgoing ICMP6 packet
-if os.fork() == 0:
- time.sleep(1)
- sendp(e/IPv6(src=LOCAL_ADDR6, dst=REMOTE_ADDR6)/icmp6, iface=LOCAL_IF)
- os._exit(0)
+sendp(e/IPv6(src=LOCAL_ADDR6, dst=REMOTE_ADDR6)/icmp6, iface=LOCAL_IF)
print "Path MTU discovery will resend first data with length 1300."
-ans=sniff(iface=LOCAL_IF, timeout=3, count=1, filter=
- "ip6 and src %s and tcp port %u and dst %s and tcp port %u" %
- (ip6.dst, syn.dport, ip6.src, syn.sport))
+sniffer.join(timeout=5)
+ans = sniffer.packet
if len(ans) == 0:
print "ERROR: no data retransmit from chargen server received"
#!/usr/local/bin/python2.7
import os
+import threading
import string
import random
from addr import *
from scapy.all import *
+class Sniff(threading.Thread):
+ filter = None
+ captured = None
+ packet = None
+ def __init__(self):
+ # clear packets buffered by scapy bpf
+ sniff(iface=LOCAL_IF, timeout=1)
+ super(Sniff, self).__init__()
+ def run(self):
+ self.captured = sniff(iface=LOCAL_IF, filter=self.filter,
+ timeout=3)
+ if self.captured:
+ self.packet = self.captured[0]
+
e=Ether(src=LOCAL_MAC, dst=REMOTE_MAC)
ip6=IPv6(src=FAKE_NET_ADDR6, dst=REMOTE_ADDR6)
uport=os.getpid() & 0xffff
print "Clear route cache at echo socket by sending from different address."
sendp(e/IPv6(src=LOCAL_ADDR6, dst=REMOTE_ADDR6)/udp, iface=LOCAL_IF)
-print "Path MTU discovery will not send UDP atomic fragment."
# srp1 cannot be used, fragment answer will not match on outgoing UDP packet
-if os.fork() == 0:
- time.sleep(1)
- sendp(e/ip6/udp, iface=LOCAL_IF)
- os._exit(0)
+sniffer = Sniff()
+sniffer.filter = \
+ "ip6 and src "+ip6.dst+" and dst "+ip6.src+" and proto ipv6-frag"
+sniffer.start()
+time.sleep(1)
-ans=sniff(iface=LOCAL_IF, timeout=3, filter=
- "ip6 and src "+ip6.dst+" and dst "+ip6.src+" and proto ipv6-frag")
+print "Send UDP packet with 1200 octets payload."
+sendp(e/ip6/udp, iface=LOCAL_IF)
+
+print "Path MTU discovery will not send UDP atomic fragment."
+sniffer.join(timeout=5)
print "IPv6 atomic fragments must not be generated."
frag=None
-for a in ans:
+for a in sniffer.captured:
fh=a.payload.payload
if fh.offset != 0 or fh.nh != (ip6/udp).nh:
continue
#!/usr/local/bin/python2.7
import os
+import threading
import string
import random
from addr import *
from scapy.all import *
+class Sniff(threading.Thread):
+ filter = None
+ captured = None
+ packet = None
+ def __init__(self):
+ # clear packets buffered by scapy bpf
+ sniff(iface=LOCAL_IF, timeout=1)
+ super(Sniff, self).__init__()
+ def run(self):
+ self.captured = sniff(iface=LOCAL_IF, filter=self.filter,
+ timeout=3)
+ if self.captured:
+ self.packet = self.captured[0]
+
e=Ether(src=LOCAL_MAC, dst=REMOTE_MAC)
ip6=IPv6(src=FAKE_NET_ADDR6, dst=REMOTE_ADDR6)
uport=os.getpid() & 0xffff
print "Clear route cache at echo socket by sending from different address."
sendp(e/IPv6(src=LOCAL_ADDR6, dst=REMOTE_ADDR6)/udp, iface=LOCAL_IF)
-print "Path MTU discovery will send UDP fragment with maximum length 1300."
# srp1 cannot be used, fragment answer will not match on outgoing UDP packet
-if os.fork() == 0:
- time.sleep(1)
- sendp(e/ip6/udp, iface=LOCAL_IF)
- os._exit(0)
+sniffer = Sniff()
+sniffer.filter = \
+ "ip6 and src "+ip6.dst+" and dst "+ip6.src+" and proto ipv6-frag"
+sniffer.start()
+time.sleep(1)
-ans=sniff(iface=LOCAL_IF, timeout=3, filter=
- "ip6 and src "+ip6.dst+" and dst "+ip6.src+" and proto ipv6-frag")
+print "Send UDP packet with 1400 octets payload."
+sendp(e/ip6/udp, iface=LOCAL_IF)
+
+print "Path MTU discovery will send UDP fragment with maximum length 1300."
+sniffer.join(timeout=5)
-for a in ans:
+for a in sniffer.captured:
fh=a.payload.payload
if fh.offset != 0 or fh.nh != (ip6/udp).nh:
continue