Python实现网络工具
作者:互联网
使用python编写网络工具
基础内容
介绍基本的网络编程
Socket编程
Socket又称"套接字",应用程序通常通过"套接字"向网络发出请求或者应答网络请求,使主机间或者一台计算机上的进程间可以通讯。使用Python中的socket库就可以进行网络相关的编程。
函数 | 描述 |
---|---|
服务器端套接字 | |
s.bind() | 绑定地址(host,port)到套接字, 在 AF_INET下,以元组(host,port)的形式表示地址。 |
s.listen() | 开始 TCP 监听。backlog 指定在拒绝连接之前,操作系统可以挂起的最大连接数量。该值至少为 1,大部分应用程序设为 5 就可以了。 |
s.accept() | 被动接受TCP客户端连接,(阻塞式)等待连接的到来 |
客户端套接字 | |
s.connect() | 主动初始化TCP服务器连接,。一般address的格式为元组(hostname,port),如果连接出错,返回socket.error错误。 |
s.connect_ex() | connect()函数的扩展版本,出错时返回出错码,而不是抛出异常 |
公共用途的套接字函数 | |
s.recv() | 接收 TCP 数据,数据以字符串形式返回,bufsize 指定要接收的最大数据量。flag 提供有关消息的其他信息,通常可以忽略。 |
s.send() | 发送 TCP 数据,将 string 中的数据发送到连接的套接字。返回值是要发送的字节数量,该数量可能小于 string 的字节大小。 |
s.sendall() | 完整发送 TCP 数据。将 string 中的数据发送到连接的套接字,但在返回之前会尝试发送所有数据。成功返回 None,失败则抛出异常。 |
s.recvfrom() | 接收 UDP 数据,与 recv() 类似,但返回值是(data,address)。其中 data 是包含接收数据的字符串,address 是发送数据的套接字地址。 |
s.sendto() | 发送 UDP 数据,将数据发送到套接字,address 是形式为(ipaddr,port)的元组,指定远程地址。返回值是发送的字节数。 |
s.close() | 关闭套接字 |
TCP客户端
import socket
host = "www.baidu.com"
port = 80
# 创建socket对象
client = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
# 连接服务器
client.connect((host,port))
# 发送数据 HTTP请求
client.send(b"GET / HTTP/1.1\r\nHost: baidu.com\r\n\r\n")
# 接收响应数据
response = client.recv(4096)
# 打印数据
print(response.decode())
client.close()
使用socket函数创建一个socket对象。AF_INET表示使用标准的IPv4地址或主机名,SOCK_STREAM是流式套接字,表示使用TCP。上述程序向www.baidu.com对应的主机发送了一个HTTP GET请求,并且得到了响应
HTTP/1.1 200 OK
Accept-Ranges: bytes
Cache-Control: no-cache
Connection: keep-alive
Content-Length: 9508
Content-Type: text/html
...
UDP客户端
import socket
host = "127.0.0.1"
port = 9091
# 创建socket对象
client = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
# 发送数据
client.sendto(b"AAAAAAAAAA",(host,port))
# 接收数据
data,addr = client.recvfrom(4096)
# 打印响应信息
print(data.decode())
client.close()
使用UDP时,socket中的函数中的第二个参数变成了SOCK_DGRAM即数据报套接字,并且发送数据时使用sendto函数,接收数据时使用recvfrom函数。
TCP服务端
import socket
import threading
from urllib import request
IP = '0.0.0.0'
PORT = 9999
def handler(client_socket):
with client_socket as sock:
request = sock.recv(1024)
print(f'[*] Received: {request.decode("utf-8")}')
sock.send(b"ACK")
def main():
server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.bind((IP,PORT))
server.listen(5)
print(f"[*] Listening on {IP}:{PORT}")
while True:
client,address = server.accept()
print(f"[*] Accepted connection from {address[0]:{address[1]}}")
client_handler = threading.Thread(target=handler,args=(client,))
client_handler.start()
if __name__ == "__main__":
main()
指定服务器应该监听哪个IP地址和端口,接着让服务器开始监听,并且将最大连接数设置为5。下一步让服务器进入主循环,并在该循环中等待外来连接。当一个客户端成功建立连接时,将接收到的客户端socket对象保存到client变量中,将远程连接的详细信息保存到address变量中。然后,创建一个新的线程,让它指向handler函数,并传入client变量。创建好后,启动这个线程来处理接收到的连接,与此同时服务端的主循环也已经准备好处理下一个外来连接,而handler函数会调用recv接收数据,并给客户端发送一段简单的回复。
ipaddress处理IP地址
文档: ipaddress模块介绍
在下面案例中会用到的几个重要方法
import ipaddress
addr = ipaddress.ip_address('192.0.2.1') # 返回一个object
print(addr.version) # 打印IP版本
net4 = ipaddress.ip_network('192.0.2.0/24') # 返回接口
print(net4.num_addresses) # 答打印地址数
for host in net4.hosts(): # 遍历
print(host)
输出
4
256
192.0.2.1
192.0.2.2
192.0.2.3
192.0.2.4
192.0.2.5
192.0.2.6
192.0.2.7
192.0.2.8
192.0.2.9
192.0.2.10
192.0.2.11
如上代码迭代显示该网段上的"可用"的独立地址
基于SOCKET的网络工具
大部分操作系统都会执行一个操作: 向一台主机发送一格UDP数据包时,如果主机上的UDP端口没有开启,一般会返回一个ICMP包来提示目标端口不可访问,可以以此来判断主机是否存活。
嗅探流量包
使用Python的socket库,代码如下
import socket
import os
host = '10.81.226.234'
def main():
# 判断系统是否为windows
if os.name == 'nt':
# windwos允许嗅探任何协议
socket_protocol = socket.IPPROTO_IP
else:
# Linux强制指定一个协议进行嗅探
socket_protocol = socket.IPPROTO_ICMP
# 创建socket对象
sniffer = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol)
sniffer.bind((host,0))
# 设置包含IP头
sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)
# 对windows机器 额外启用混杂模式
if os.name == 'nt':
sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)
# 读取一个数据包
print(sniffer.recvfrom(65565))
# 关闭混杂模式
if os.name == 'nt':
sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)
if __name__ == '__main__':
main()
host变量为本机IP地址,然后创建一个socket对象,使用bind函数绑定socket对象,使用setsockopt函数修改socket设置,使其在抓包时包含IP头部。之后会判断平台是否为windows,如果是则启动网卡的混杂模式。
运行输出
$ sudo python3 sock_sniffer.py
(b'E\xc0\x00X\x88j\x00\x00@\x01\x17\x04\nQ\xe2\xea\nQ\xe2\xea\x03\x01?+\x00\x00\x00\x00E\x00\x00<\n)@\x00@\x06\xc8\x0f\nQ\xe2\xeao\xb2\x0b\x96\xaf\x1e\x00P\x1f\xd8Y\xfd\x00\x00\x00\x00\xa0\x02\xfa\xf0h\xb2\x00\x00\x02\x04\x05\xb4\x04\x02\x08\n\x9e\x9d\xda}\x00\x00\x00\x00\x01\x03\x03\x07', ('10.81.226.234', 0))
毫无疑问,这些输出相当凌乱,可以将其进行解码
解码IP层
进行更深层次解析分析,提取诸如协议类型、源IP地址和目的IP地址等有用的信息
如下是IPv4报文头
因为是二进制数据,要对IP头各个数据段进行分割
使用ctypes库或者struct库
Ctypes库拆解IP字段
ctypes提供了各种兼容C语言的数据结构,可调用符合C语言标准的共享库中的函数
文档介绍: ctypes — A foreign function library for Python
示例代码
from ctypes import *
import socket
import struct
from numpy import uint32
class IP(Structure):
_fields_ = [
("ihl", c_ubyte, 4),
("version", c_ubyte, 4),
("tos", c_ushort, 8),
("len", c_ushort, 16),
("id", c_ushort, 16),
("offset", c_ushort, 16),
("ttl", c_ubyte, 8),
("protocol_num", c_ubyte, 8),
("sum", c_ushort, 16),
("src", c_uint32, 32),
("dst", c-uint32, 32)
]
def __new__(cls, socket_buffer=None):
return cls.from_buffer_copy(socket_buffer)
def __init__(self,socket_buffert=None):
self.src_address = socket.inet_ntoa(struct.pack("<L",self.src))
self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))
这个类创建了一个名为_fields_的结构,用于定义IP头各个部分。该结构使用了ctypes里定义的C语言数据类型,例如代表unsigned char 类型的c_ubyte,代表unsigned short的c_ushort等。上述代码中定义的字段和上图中的IP头部中的字段一一对应。各个字段的定义有3个字段组成: 字段名称、数据类型以及字段位数。设置字段位数使得能够以位为单位指定数据长度,这意味着能够自由指定想要的长度。
在上述代码中,IP类继承自ctypes库的Structure类,要求创建对象前必须定义_fields_结构。为了向fields结构里填充数据,Structure类利用了_new_函数。此函数的第一个参数是指向当前类的引用,new函数用该引用创建当前类的第一个对象。之后这个对象被传给_init_函数进行初始化。
struct库拆解IP字段
该库提供了一些格式字符,用来定义二进制数据的结构
import ipaddress
import struct
class IP:
def __init__(self,buff=None):
header = struct.unpack('<BBHHHBBH4s4s',buff)
self.ver = header[0] >> 4
self.ihl = header[0] & 0xF
self.tos = header[1]
self.len = header[2]
self.id = header[3]
self.offset = header[4]
self.ttl = header[5]
self.protocol = header[6]
self.sum = header[7]
self.src = header[8]
self.dst = header[9]
self.src_address = ipaddress.ip_address(self.src)
self.dst_address = ipaddress.ip_address(self.dst)
# 映射
self.protocol_map = {1:"ICMP",6:"TCP",17:"UDP"}
<BBHHHBBH4s4s
中的<
表示数据字节序,这里表示小端序。
BBHHHBBH4s4s
表示IP头的各部分。struct库中提供了若干格式字符。B是1字节,H是2字节,ns是n个字节的数组。
因为不能按位指定想要的长度了,但这里先获取version,只要取第一个字节的高4位,所以使用>>4
进行偏移,将高4位向低位偏移4位,原来的高位用0补齐,原来的低4为被覆盖,这样就得到了version。ihl是第一个字节的低4位,将第一个字节与二进制数00001111
相与即可得到,对其他字段的获取只要顺次按下标获取即可。
IP解码器
如下是完整代码实现
import ipaddress
import os
from shutil import ExecError
import socket
import struct
import sys
class IP:
def __init__(self, buff=None):
header = struct.unpack('<BBHHHBBH4s4s', buff)
self.ver = header[0] >> 4
self.ihl = header[0] & 0xF
self.tos = header[1]
self.len = header[2]
self.id = header[3]
self.offset = header[4]
self.ttl = header[5]
self.protocol_num = header[6]
self.sum = header[7]
self.src = header[8]
self.dst = header[9]
self.src_address = ipaddress.ip_address(self.src)
self.dst_address = ipaddress.ip_address(self.dst)
# 映射
self.protocol_map = {1: "ICMP", 6: "TCP", 17: "UDP"}
try:
self.protocol = self.protocol_map[self.protocol_num]
except:
print(f'No protocol {self.protocol_num}')
self.protocol = str(self.protocol_num)
def sniff(host):
if os.name == 'nt':
socket_protocol = socket.IPPROTO_IP
else:
socket_protocol = socket.IPPROTO_ICMP
sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)
sniffer.bind((host, 0))
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
if os.name == 'nt':
sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)
try:
while True:
raw_buffer = sniffer.recvfrom(65535)[0]
ip_header = IP(raw_buffer[0:20])
print(
f"Protocol: {ip_header.protocol} {ip_header.src_address} -> {ip_header.dst_address}")
except KeyboardInterrupt:
if os.name == 'nt':
sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)
sys.exit()
if __name__ == "__main__":
if len(sys.argv) == 2:
host = sys.argv[1]
else:
host = "127.0.0.1"
sniff(host)
首先实现IP类,它定义了一个Python结构,可以把数据包的前20字节映射到读/写的IP头对象,将IP数据包中的字段拆解出来,可供用户识别。将之前实现的抓包程序合并进来,就实现了完整的流程。
测试运行:
因为是Linux平台,所以只能看到ICMP协议数据,运行程序后
使用ping命令 ping baidu.com
如下是程序输出
$ sudo python3 ip_decoder.py 10.81.226.234
Protocol: ICMP 10.81.226.234 -> 10.81.226.234
Protocol: ICMP 220.181.38.148 -> 10.81.226.234
Protocol: ICMP 220.181.38.148 -> 10.81.226.234
Protocol: ICMP 220.181.38.148 -> 10.81.226.234
Protocol: ICMP 220.181.38.148 -> 10.81.226.234
Protocol: ICMP 220.181.38.148 -> 10.81.226.234
Protocol: ICMP 10.81.226.234 -> 10.81.226.234
Protocol: ICMP 220.181.38.148 -> 10.81.226.234
ICMP解码器
解码了IP层数据后还要解码ICMP响应。不同的ICMP消息之间千差万别,但一定存在3个字段:类型、代码和校验和。类型和代码表示了要接收的ICMP消息是什么类型的,也就指明了如何正确地解码其中的数据。
字段 | 长度 | 含义 |
---|---|---|
Type | 1字节 | 消息类型: - 0:回显应答报文 - 8:请求回显报文 |
Code | 1字节 | 消息代码,此处值为0。 |
Checksum | 2字节 | 检验和。 |
Identifier | 2字节 | 标识符,发送端标示此发送的报文 |
Sequence Number | 2字节 | 序列号,发送端发送的报文的顺序号。每发送一次顺序号就加1。 |
Data | 可变 | 选项数据,是一个可变长的字段,其中包含要返回给发送者的数据。回显应答通常返回与所收到的数据完全相同的数据。 |
如上所示,数据包开头的8个二进制位代表类型,其后的8个二进制位代表ICMP消息代码。
完整代码实现
import ipaddress
import os
import socket
import struct
import sys
class IP:
def __init__(self, buff=None):
header = struct.unpack('<BBHHHBBH4s4s', buff)
self.ver = header[0] >> 4
self.ihl = header[0] & 0xF
self.tos = header[1]
self.len = header[2]
self.id = header[3]
self.offset = header[4]
self.ttl = header[5]
self.protocol_num = header[6]
self.sum = header[7]
self.src = header[8]
self.dst = header[9]
self.src_address = ipaddress.ip_address(self.src)
self.dst_address = ipaddress.ip_address(self.dst)
# 映射
self.protocol_map = {1: "ICMP", 6: "TCP", 17: "UDP"}
try:
self.protocol = self.protocol_map[self.protocol_num]
except:
print(f'No protocol {self.protocol_num}')
self.protocol = str(self.protocol_num)
class ICMP:
def __init__(self,buff):
header = struct.unpack('<BBHHH',buff)
self.type = header[0]
self.code = header[1]
self.sum = header[2]
self.id = header[3]
self.seq = header[4]
def sniff(host):
if os.name == 'nt':
socket_protocol = socket.IPPROTO_IP
else:
socket_protocol = socket.IPPROTO_ICMP
sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)
sniffer.bind((host, 0))
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
if os.name == 'nt':
sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)
try:
while True:
raw_buffer = sniffer.recvfrom(65535)[0]
ip_header = IP(raw_buffer[0:20])
if ip_header.protocol == "ICMP":
print(f"Protocol: {ip_header.protocol} {ip_header.src_address} -> {ip_header.dst_address}")
print(f"Version: {ip_header.ver}")
print(f"Header Length: {ip_header.ihl} TTL: {ip_header.ttl}")
offset = ip_header.ihl * 4
buf = raw_buffer[offset:offset+8]
icmp_header = ICMP(buf)
print(f"ICMP -> Type: {icmp_header.type} Code: {icmp_header.code}")
except KeyboardInterrupt:
if os.name == 'nt':
sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)
sys.exit()
if __name__ == "__main__":
if len(sys.argv) == 2:
host = sys.argv[1]
else:
host = "127.0.0.1"
sniff(host)
如上代码中增加了ICMP消息结构及其解析。在sniff中也增加了一系列流程:
在接受数据包的主循环中判断接收到的数据包是否为ICMP数据包,然后计算出ICMP数据在原始数据包中的偏移,最后将数据按照ICMP结构进行解析,输出其中的类型(type)和代码(code)字段。IP头的长度是基于IP头中的ihl字段计算的,该字段记录了IP头中有多少个32位(4字节)长的数据块,将这个字段乘以4就能计算出IP头的大小,以及数据包中下一网络层开始的位置。
运行测试
$ sudo python3 icmp_decoder.py 10.81.226.234
Protocol: ICMP 10.81.226.234 -> 10.81.226.234
Version: 4
Header Length: 5 TTL: 64
ICMP -> Type: 3 Code: 1
Protocol: ICMP 220.181.38.148 -> 10.81.226.234
Version: 4
Header Length: 5 TTL: 52
ICMP -> Type: 0 Code: 0
Protocol: ICMP 220.181.38.148 -> 10.81.226.234
Version: 4
Header Length: 5 TTL: 52
ICMP -> Type: 0 Code: 0
Protocol: ICMP 220.181.38.148 -> 10.81.226.234
Version: 4
基于UDP的主机扫描器
完整代码如下
import ipaddress
import os
import socket
import struct
import sys
import threading
import time
SUBNET = '10.81.226.0/24'
MESSAGE = 'PYTHONRULES' # 签名
# IP消息结构
class IP:
def __init__(self, buff=None):
header = struct.unpack('<BBHHHBBH4s4s', buff)
self.ver = header[0] >> 4
self.ihl = header[0] & 0xF
self.tos = header[1]
self.len = header[2]
self.id = header[3]
self.offset = header[4]
self.ttl = header[5]
self.protocol_num = header[6]
self.sum = header[7]
self.src = header[8]
self.dst = header[9]
self.src_address = ipaddress.ip_address(self.src)
self.dst_address = ipaddress.ip_address(self.dst)
# 映射
self.protocol_map = {1: "ICMP", 6: "TCP", 17: "UDP"}
try:
self.protocol = self.protocol_map[self.protocol_num]
except:
print(f'No protocol {self.protocol_num}')
self.protocol = str(self.protocol_num)
# ICMP消息结构
class ICMP:
def __init__(self, buff):
header = struct.unpack('<BBHHH', buff)
self.type = header[0]
self.code = header[1]
self.sum = header[2]
self.id = header[3]
self.seq = header[4]
# 发送UDP数据包
def udp_sender():
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sender:
for ip in ipaddress.ip_network(SUBNET).hosts(): # 遍历子网IP地址
sender.sendto(bytes(MESSAGE, 'utf8'), (str(ip), 65212))
# 扫描
class Scanner:
def __init__(self, host):
self.host = host
if os.name == 'nt':
socket_protocol = socket.IPPROTO_IP
else:
socket_protocol = socket.IPPROTO_ICMP
self.socket = socket.socket(
socket.AF_INET, socket.SOCK_RAW, socket_protocol)
self.socket.bind((host, 0))
self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
if os.name == 'nt':
self.socket.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)
# 获取数据包
def sniff(self):
hosts_up = set([f"{str(self.host)}"])
try:
while True:
raw_buffer = self.socket.recvfrom(65535)[0]
ip_header = IP(raw_buffer[0:20])
if ip_header.protocol == "ICMP":
offset = ip_header.ihl * 4
buff = raw_buffer[offset:offset+8]
icmp_header = ICMP(buff)
if icmp_header.code == 3 and icmp_header.type == 3:
if ipaddress.ip_address(ip_header.src_address) in ipaddress.IPv4Network(SUBNET):
if raw_buffer[len(raw_buffer) - len(MESSAGE):] == bytes(MESSAGE, 'utf8'):
target = str(ip_header.src_address)
if target != self.host and target not in hosts_up:
hosts_up.add(str(ip_header.src_address))
print(f"Host UP: {target}")
# 处理键盘中断
except KeyboardInterrupt:
if os.name == 'nt':
self.socket.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)
print('\nUser interrupted')
if hosts_up:
print(f'\n\nSummary: Hosts up on {SUBNET}')
for host in sorted(hosts_up):
print(f'{host}')
print('')
sys.exit()
if __name__ == "__main__":
if len(sys.argv) == 2:
host = sys.argv[1]
else:
host = '127.0.0.1'
scan = Scanner(host)
time.sleep(3)
thread = threading.Thread(target=udp_sender)
thread.start()
scan.sniff()
程序的开头定义了一个字符串作为签名,udp_sender负责遍历子网下的IP地址,发送UDP数据包。接着定义了一个名叫Scanner的类,向其中传入扫描器所在主机的IP地址来初始化它。sniff函数负责嗅探网络上的数据,解析并提取报文格式,并将在线的主机记录下来。接收到ICMP消息时,要检查这个响应是不是来自正在扫描的网段,然后检查ICMP消息里有没有自定义的签名("PYTHONRULES!")。如果检查通过则打印发送这条消息的主机IP地址,如果使用Ctrl+C中断该程序,就会把目前扫描出来的结果打印。
运行测试
$ sudo python3 scanner.py 10.81.226.234
Host UP: 10.81.226.3
Host UP: 10.81.226.1
Host UP: 10.81.226.251
Host UP: 10.81.226.14
Host UP: 10.81.226.51
Host UP: 10.81.226.37
Host UP: 10.81.226.34
Host UP: 10.81.226.39
Host UP: 10.81.226.23
...
Scapy的使用
使用Scapy进行流量嗅探
安装python3版本:$ sudo pip3 install scapy
捕获数据
编写一个能分解并输出数据包内容的基础嗅探器。Scapy提供了一个接口函数sniff:
sniff(filter="",iface="any",prn=function,count=N)
filter
参数允许指定一个Berkeley数据包过滤器,用于过滤嗅探到的数据包。将此参数留空则代表要嗅探所有数据包。
iface
参数用于指定嗅探器要嗅探的网卡,如果不设置的话,默认会嗅探所有网卡。
prn
参数用于指定一个回调函数,每当遇到符合过滤条件的数据包时,嗅探器会将该数据包传给这个回调函数,这是该函数接收的唯一参数。
count
参数可以用来指定要嗅探的数据包数量
from scapy.all import sniff
# 回调函数 接收数据包
def packet_callback(packet):
print(packet.show())
def main():
sniff(prn=packet_callback,count=1)
if __name__ == "__main__":
main()
如上代码所示,该程序使用sniff获取一个数据包并显示
使用管理员权限运行
$ sudo python3 sniff.py
###[ Ethernet ]###
dst = 14:14:4b:81:96:23
src = 34:cf:f6:89:e3:83
type = IPv4
###[ IP ]###
version = 4
ihl = 5
tos = 0x0
len = 60
id = 44584
flags = DF
frag = 0
ttl = 64
proto = tcp
chksum = 0x934c
src = 10.81.226.234
dst = 8.8.4.4
\options \
###[ TCP ]###
sport = 54080
dport = https
seq = 2355326062
ack = 0
dataofs = 10
reserved = 0
flags = S
window = 64240
chksum = 0xf975
urgptr = 0
options = [('MSS', 1460), ('SAckOK', b''), ('Timestamp', (889222417, 0)), ('NOP', None), ('WScale', 7)]
None
使用show方法将数据包显示在终端。可以看到打印出的信息按照数据链路层、网络层、传输层的层次显示。
下面解释上述部分信息的含义:
Ethernet
是以太网,该层下的信息对应了以太网帧结构的一部分: 目的地址、源地址、数据字段。以太网帧中的数据字段存放了IP数据报,到IP层被解析
IP
层下对应的信息是IP数据包中的关键字段:
version
是版本号,4即IPv4,ihl
是报文头长度,以字节为单位
tos
是服务类型,该字段使不同类型的数据报能区分开来,len
是数据报长度
id flags frag
是标识、标志、片偏移,与IP分片有关
ttl
是寿命,即Time-To-Live
,这确保了数据报不会永远在网络中循环,当TTL=0,则该数据报会被丢弃
proto
是协议,该字段值指示了IP数据报的数据部分应交给哪个特定的运输层协议
checksum
是首部校验和,帮助路由器检测数据报中的比特错误
src
和dst
分别是源IP和目的IP
上述打印结果的最后一层是TCP
字段报文结构,sport
是源端口号,dport
是目的端口。
seq
和ack
是序号和确认号,这些字段被用于实现TCP的可靠数据传输服务(三次握手)
BPF语法
概念 | 描述 | 示例 |
---|---|---|
描述词 | 想匹配的数据 | host, net, port |
数据流方向 | 数据行进的方向 | src, dst, src or dst |
通信协议 | 发送数据所用协议 | ip, ip6, tcp, udp |
examples:
只捕获来源于网络中某一IP的主机流量: src host 192.168.10.1
只捕获80端口: port 80
只捕获ICMP流量: ICMP
保存数据
示例代码
from scapy.all import *
def capture(packet):
print("*"*30)
print(f"source:{packet[IP].src}:{packet.sport} target:{packet[IP].dst}:{packet.sport}")
print(packet.show())
print("*"*30)
if __name__ == "__main__":
packets = sniff(filter="tcp and port 80",prn=capture,count=10)
# 保存输出文件
wrpcap("res.pcap",packets)
该程序只获取TCP端口为80的数据包,使用wrpcap函数进行保存,pcap文件可以使用wireshark打开。
发送数据
send(IP(dst="www.baidu.com",ttl=1)/ICMP())
使用send函数发送ICMP数据包,可以设置字段值,指定协议名称,从第三层发送数据包。
sendp(Ether()/IP(dst="127.0.0.1",ttl=1)/ICMP())
使用sendp函数,从第二层发送数据包,上面两个函数都没有接收功能。
sr函数可以发送数据包,并且接收数据,会返回2个列表,第一个是有应答的answer,第二个是无应答的answer
from scapy.all import sr,sr1,IP,ICMP
a,b=sr(IP(dst="www.baidu.com",ttl=1)/ICMP())
print(a.show())
print(b.show())
输出
$ sudo python3 send.py
Begin emission:
Finished sending 1 packets.
.*
Received 2 packets, got 1 answers, remaining 0 packets
0000 IP / ICMP 10.81.226.234 > 180.101.49.11 echo-request 0 ==> IP / ICMP 10.81.255.254 > 10.81.226.234 time-exceeded ttl-zero-during-transit / IPerror / ICMPerror / Padding
None
None
sr1函数只返回1个列表,有应答的answer。
from scapy.all import sr,sr1,IP,ICMP
p =sr1(IP(dst="www.baidu.com",ttl=1)/ICMP())
print(p.show())
输出
$ sudo python3 send.py
Begin emission:
Finished sending 1 packets.
.*
Received 2 packets, got 1 answers, remaining 0 packets
###[ IP ]###
version = 4
ihl = 5
tos = 0xc0
len = 70
id = 27585
flags =
frag = 0
ttl = 64
proto = icmp
chksum = 0x16ab
src = 10.81.255.254
dst = 10.81.226.234
\options \
###[ ICMP ]###
type = time-exceeded
code = ttl-zero-during-transit
chksum = 0xf4ff
reserved = 0
length = 0
unused = 0
###[ IP in ICMP ]###
version = 4
ihl = 5
tos = 0x0
len = 28
id = 1
flags =
frag = 0
ttl = 1
proto = icmp
chksum = 0xe734
src = 10.81.226.234
dst = 180.101.49.11
\options \
###[ ICMP in ICMP ]###
type = echo-request
code = 0
chksum = 0xf7ff
id = 0x0
seq = 0x0
unused = ''
###[ Padding ]###
load = '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
None
基于ICMP的主机探测
发送一个ICMP请求消息给目标主机,若源主机收到目标主机的应答响应消息,则表示目标可达。
from socket import timeout
from tabnanny import verbose
from scapy.all import IP,ICMP,sr1
from random import randint
from optparse import OptionParser
import ipaddress
import logging
# 关闭报错
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
hosts = [] # 存放结果
def scan(ip):
# 生成随机字段
ip_id = randint(1,65535) # IP报文标识符
icmp_id = randint(1,65535) # ICMP报文标识符
icmp_seq = randint(1,65535) # ICMP序列号
packet = IP(dst=ip,ttl=64,id=ip_id)/ICMP(id=icmp_id,seq=icmp_seq)/b'rootkit'
result = sr1(packet,timeout=1,verbose=False) # 发送并接收数据包
if result:
for rcv in result:
scan_ip = rcv[IP].src
print(scan_ip,'Host is up')
hosts.append(scan_ip)
else:
print(ip,'Host is down')
def main():
# 命令行参数选项
parser = OptionParser("Usage: %prog -i <target host>") # 输出帮助信息
parser.add_option('-i',type='string',dest='IP',help='specify target host') # 获取IP地址参数
options,args = parser.parse_args() # 解析参数
print("Scan report for " + options.IP + "\n")
# 判断单台主机还是网段
if '/' not in options.IP:
scan(options.IP)
else:
net4 = ipaddress.ip_network(options.IP)
for ip in net4.hosts():
scan(str(ip))
print("scan result:")
for host in hosts:
print(host)
if __name__ == "__main__":
main()
该程序首先解析命令行参数,然后判断参数是单个IP还是网段,如果是网段则遍历其子网IP进行处理。
标签:__,socket,header,Python,IP,self,网络工具,实现,ICMP 来源: https://www.cnblogs.com/N3ptune/p/16322703.html