其他分享
首页 > 其他分享> > 用twisted 发送请求

用twisted 发送请求

作者:互联网

 

 

# coding: utf-8
# @Time    : 2022-05-18 13:01
# @Author  : AngDH
from twisted.internet import reactor
from twisted.web.client import Agent
from twisted.web.http_headers import Headers

# 相当于客户端角色
agent = Agent(reactor)

defered = agent.request(
    b'GET',
    b'http://www.baidu.com/s?wd=python',
    Headers({"User-Agent": ["python"]}),
    None)


def successCallback(response):
    print('Response received:', response.code)


defered.addCallback(successCallback)


def errorCallback(error):
    print("error:", str(error))


defered.addErrback(errorCallback)


def callbackShutdown(ignored):
    reactor.stop()


defered.addBoth(callbackShutdown)

reactor.run()

 

 

 

 


 

 

 

# coding: utf-8
# @Time    : 2022-05-18 13:01
# @Author  : AngDH
from twisted.internet import reactor
from twisted.internet.defer import succeed
from twisted.web.client import Agent
from twisted.web.http_headers import Headers
from twisted.web.iweb import IBodyProducer
from zope.interface import implementer


@implementer(IBodyProducer)
class BytesProducer(object):

    def __init__(self, body):
        self.body = body
        self.length = len(body)

    def startProducing(self, consumer):
        consumer.write(self.body)
        return succeed(None)

    def pauseProducting(self):
        pass

    def stopProducing(self):
        pass


# 相当于客户端角色
agent = Agent(reactor)

defered = agent.request(
    b'POST',
    b'http://www.baidu.com/s?wd=python',
    Headers({"User-Agent": ["python"]}),
    BytesProducer('请求体'.encode()))


def successCallback(response):
    print('Response received:', response.code)


defered.addCallback(successCallback)


def errorCallback(error):
    print("error:", str(error))


defered.addErrback(errorCallback)


def callbackShutdown(ignored):
    reactor.stop()


defered.addBoth(callbackShutdown)

reactor.run()

 

标签:reactor,self,twisted,发送,defered,import,def,请求
来源: https://www.cnblogs.com/angdh/p/16284344.html