其他分享
首页 > 其他分享> > 玩转Twisted之一:介绍

玩转Twisted之一:介绍

作者:互联网

Python编写的事件驱动的网络引擎

Twisted 支持所有主流操作系统的时间循环 -- select (所有平台), poll (大多数 POSIX 平台), epoll (Linux), kqueue (FreeBSD, macOS), IOCP (Windows), 和多种GUI事件循环 (GTK+2/3, Qt, wxWidgets). 第三方reactors 可以插入 Twisted,并提供多种其他事件循环。

安装

pip install twisted

安装支持tls的版本

pip instal twisted[tls]

示例代码

Echo 服务器

Twisted使实现定制网络应用变得简单。下面是一个TCP服务器,响应回复发给它的所有文本:

from twisted.internet import protocol, reactor, endpoints

class Echo(protocol.Protocol):
    def dataReceived(self, data):
        data = "Echo-Server:".encode()+data
        self.transport.write(data)


class EchoFactory(protocol.Factory):
    def buildProtocol(self, addr):
        return Echo()
endpoints.serverFromString(reactor, "tcp:1234").listen(EchoFactory())
reactor.run()

使用netcat进行测试:

$netcat -v 127.0.0.1 1234

Connection to 127.0.0.1 1234 port [tcp/*] succeeded!
你好
Echo-Server:你好
中华人民共和国
Echo-Server:中华人民共和国

Web 服务器

Twisted 包含一个事件驱动web服务器. 下面是一个Web应用样例; 注意资源对象如何持久化到内存,而不是每次请求都重新创建

from twisted.web import server, resource
from twisted.internet import reactor, endpoints


class Counter(resource.Resource):
    isLeaf = True
    numberRequests = 0

    def render_GET(self, request):
        self.numberRequests += 1
        request.setHeader(b"content-type", b"text/plain")
        content = u"我是请求 #{}\n".format(self.numberRequests)
        return content.encode("utf-8")


endpoints.serverFromString(reactor, "tcp:8080").listen(server.Site(Counter()))
reactor.run()

使用curl进行测试

$ curl 127.0.0.1:8080
我是请求 #1
$ curl 127.0.0.1:8080
我是请求 #2
$ curl 127.0.0.1:8080
我是请求 #3

发布/订阅 服务器

下面是一个简单的publish/subscribe 服务器, 客户端可以看到其他客户端发送的所有信息

from twisted.internet import reactor, protocol, endpoints
from twisted.protocols import basic


class PubProtocol(basic.LineReceiver):
    def __init__(self, factory):
        self.factory = factory

    def connectionMade(self):
        self.factory.clients.add(self)

    def connectionLost(self, reason):
        self.factory.clients.remove(self)

    def lineReceived(self, line):
        for c in self.factory.clients:
            source = u"<{}> ".format(self.transport.getHost()).encode("utf-8")
            c.sendLine(source + line)


class PubFactory(protocol.Factory):
    def __init__(self):
        self.clients = set()

    def buildProtocol(self, addr):
        return PubProtocol(self)


endpoints.serverFromString(reactor, "tcp:1025").listen(PubFactory())
reactor.run()

开启两个终端,使用telnet进行测试:
客户端1:

telnet 127.0.0.1 1025
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
今天天气不错,我是客户端#1
<IPv4Address(type='TCP', host='127.0.0.1', port=1025)> 今天天气不错,我是客户端#1
<IPv4Address(type='TCP', host='127.0.0.1', port=1025)> 是啊是啊!是客户端#2

客户端#2

telnet 127.0.0.1 1025
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
<IPv4Address(type='TCP', host='127.0.0.1', port=1025)> 今天天气不错,我是客户端#1
是啊是啊!我是客户端#2
<IPv4Address(type='TCP', host='127.0.0.1', port=1025)> 是啊是啊!是客户端#2

邮件客户端

Twisted 包含一个高明的(sophisticated) IMAP4 客户端库.


import sys

from twisted.internet import protocol, defer, endpoints, task
from twisted.mail import imap4
from twisted.python import failure


async def main(
    reactor, username="TaceyWong", password="secret", strport="tls:example.com:993"
):
    endpoint = endpoints.clientFromString(reactor, strport)
    factory = protocol.Factory.forProtocol(imap4.IMAP4Client)
    try:
        client = await endpoint.connect(factory)
        await client.login(username.encode("utf-8"),
                           password.encode("utf-8"))
        await client.select("INBOX")
        info = await client.fetchEnvelope(imap4.MessageSet(1))
        print("First message subject:", info[1]["ENVELOPE"][1])
    except:
        print("IMAP4 client interaction failed")
        print(failure.Failure().getTraceback())

task.react(lambda *a, **k: defer.ensureDeferred(main(*a, **k)), sys.argv[1:])
                  

具体可以用自己支持IMAP的邮箱做测试

SSH客户端

Twisted 包含 一个SSH 客户端 和 服务器, "conch" (比如: Twisted Shell).

import sys, os

from twisted.internet import protocol, defer, endpoints, task
from twisted.conch.endpoints import SSHCommandClientEndpoint


async def main(reactor, username="tacey", sshhost="192.168.1.123", portno="22"):
    envAgent = endpoints.UNIXClientEndpoint(reactor, os.environ["SSH_AUTH_SOCK"])
    endpoint = SSHCommandClientEndpoint.newConnection(
        reactor, "echo '你好,世界'", username, sshhost,
        int(portno), agentEndpoint=envAgent,
    )

    class ShowOutput(protocol.Protocol):
        received = b""

        def dataReceived(self, data):
            self.received += data

        def connectionLost(self, reason):
            finished.callback(self.received)

    finished = defer.Deferred()
    factory = protocol.Factory.forProtocol(ShowOutput)
    await endpoint.connect(factory)
    print("SSH 响应:", await finished)


task.react(lambda *a, **k: defer.ensureDeferred(main(*a, **k)), sys.argv[1:])

(可能需要安装bcrypt——pip install bcrypt

标签:reactor,self,twisted,介绍,Twisted,玩转,import,def,客户端
来源: https://www.cnblogs.com/taceywong/p/16413683.html