编程语言
首页 > 编程语言> > 读数笔记_python网络编程3(7)

读数笔记_python网络编程3(7)

作者:互联网

7.服务器架构

7.0.网络服务的编写需要面对两个挑战:

7.0.1. 第一个挑战是核心: 编写出能够正确处理请求,并构造合适响应的代码

7.0.2. 第二个挑战是: 如何将网络代码部署到随OS自动启动的Win-Ser/Unix守护进程中,将日志持久化存储,在无法连接DB/后端数据存储区时,发出警告,为其提供完整的保护,防止所有可能的失败情形,确保其在失败时快速重启。

只使用一个小节的篇幅来介绍部署的问题,把重点放在如何构建网络Serv软件上。
可将网络Serv分为3大类:
1)单线程Serv(类似于UDP和TCP),着重讨论局限性:同一时刻只能为一个Cli提供服务,其他所有Cli都进行等待,即使为该Cli服务,OS的CPU也可能处于近乎空闲的状态。有两个截然相反的解决方案:
1.1)使用多个threading/processing,每个thd/prs内都运行一个单thd-Serv;
1.2)在自己的代码中使用异步网络操作来支持多路复用,而不直接使用OS提供的多路复用。
最新的asyncio模块只支持Py3

7.1. 浅谈部署

可能会把网络Serv部署到单台机器上,也可能部署到多台机器上

7.1.1. 使用单台机器上的服务,Cli只要直接连接到该机器的IP即可;

7.1.2. 要使用运行在多台机器上的服务,一种方法是:将该服务的某个实例的IP/hostname返回给Cli(如与Cli运行在同一机房中的服务实例),该方法没提供任何冗余性,如果服务的实例宕机了,通过hostname/IP硬编码连接,该服务实例的Cli都将无法继续连接

7.1.3. 更健壮的连接方法:要访问某个服务,DNS返回运行该服务的所有IP,如Cli无法连接第一个IP,再连接第二个IP,以此类推。业界已广泛应用该方法:在服务前端配置一个负载均衡器(load balancer),Cli直连到负载均衡器,由负载均衡器将连接转发至实际的Serv。如果某台Serv宕机了,负载均衡器会将转发至该Serv的连接请求予以停止,直到该Serv恢复服务为止。Serv的故障对于大量用户来说是不可见的。

7.1.4. 大型Web服务结合了上述两种方法: 每个机房都配置了一个负载均衡器与Serv群,公共的DNS会返回与用户距离最近的机房中的负载均衡器的IP

7.1.5. 无论Serv架构多么简单/复杂,都需要使用在物理/虚拟机上运行Py-Serv代码,这一过程叫做部署。分为两大类:

1)旧式:为每个Serv程序都编写服务所提供的所有功能:通过两次fork()创建一个Unix守护进程(将自己注册一个Win服务),安排进行系统级的日志操作,支持配置文件及提供启动、关闭、重启的相关机制。可以使用三方库/自己实现来完成Serv程序的编写。
2)"十二要素应用"的宣言提出广为流行,提倡只实现Serv程序必备功能的最小集合。将每个服务实现为普通的前台程序,而不是守护进程。程序从环境变量(Py中sys.environ字典)而不是系统级的配置文件中获取所需的配置选项。通过环境变量中指定的选项,连接到任意的后端服务,且直接将日志信息输出到屏幕/直接使用print()函数。该方法通过打开且监听环境配置指定的任意端口来接收网络请求。这种风格编写的服务很易于开发者直接在shell中运行以进行测试。只要简单地在程序外部使用适当的部署框架,就能将程序改为守护进程/系统服务,也可将其部署到网络Serv集群中。
如,部署框架可从一个集中式的配置服务中,获取环境变量的设置,可将程序的标准输出流和错误流连接到一个远程日志Serv,也可在服务停止响应/暂停时重启服务。程序本身不知道这一切,仍会像平常一样,直接输出到标准输出流,程序员可保证该Serv代码在生产环境下的运行表现与开发环境相同。

7.1.6. 大型的“平台即服务”(Paas)提供商提供了托管这种程序的功能,将程序的几十上百个副本配置在一个公共域名和TCP负载均衡器下,将所有输出的日志聚集起来分析。

某些提供商允许直接提交Py代码,另一些则希望将代码、Py解释器及所有需要的依赖包打入一个容器内(Docker等)。无需在单个服务中提供多个功能,服务中所有冗余和重复都由平台来处理。

7.1.7.有时还是会将程序编写为守护进程。可从阅读PEP 3143(访问http://python.org)开始,其中的"Other daemon implementations"提供了编写守护进程所需步骤的资源列表。supervisord工具和logging模块的源代码也可作参考。

7.1.8.无论是独立的包含完整功能的Py进程还是应用PaaS的网络服务,最有效使用OS网络栈和OS-prc对网络请求进行响应的问题都是一样的。目标是令OS尽可能繁忙,就能把Cli获取网络请求响应前的等待时间减少到最短。

7.2.示例使用最简单的TCP协议作为说明。Cli可以询问3个问题,3个问题都以纯文本的ASCII字符表示。发出请求的问题后,Cli将等待Serv的应答。和HTTP协议一样,只要socket保持打开,Cli发起问题请求的次数没有限制。Cli不再发起请求后,无需发出任何警告即可将连接关闭。每个问题的结尾用ASCII的问号表示结束。

Beautiful is better than?

Serv返回的应答在结尾用句点表示响应信息的结束。

Ugly.

为了基于协议构建一个Cli和多个Serv,在代码7-1中定义了许多规则。7-1本身没有命令行接口。唯一作用就是作为一个支持性的模块,由后续的程序清单导入,后续的程序清单也可重用7-1定义的模式,无需重复编写。

# 7-1 zen_utils.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import argparse, socket, time

aphorisms = {b'Beautiful is better than?': b'Ugly.',
             b'Explicit is better than?': b'Implicit.',
             b'Simple is better than?': b'Complex.'}

def get_answer(aphorisms):
    """Return the string response to a particular Zen-of-Python aphorism."""
    time.sleep(0.0) # increase to simulate an expensive operation
    return aphorisms.get(aphorisms, b'Error: unknown aphorism.')

def parse_command_line(description):
    """Parse command line and return a socket address."""
    parser = argparse.ArgumentParser(description=description)
    parser.add_argument('host', help='IP or hostname')
    parser.add_argument('-p', metavar='port', type=int, default=1060, help='TCP port (default 1060)')
    args = parser.parse_args()
    address = (args.host, args.p)
    return address

def create_srv_socket(address):
    """build and return a listening server socket."""
    listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    listener.bind(address)
    listener.listen(64)
    print('Listening at {}'.format(address))
    return listener

def accept_connections_forever(listener):
    """Forever answer incoming connections on a listening socket."""
    while True:
        sock, address = listener.accept()
        print('Accepted connection from {}'.format(address))
        handle_conversation(sock, address)

def handle_conversation(sock, address):
    """Converse with a client over `sock` until they are done talking."""
    try:
        while True:
            handle_request(sock)
    except EOFError:
        print('Client socket to {} has closed'.format(address))
    finally:
        sock.close()

def handle_request(sock):
    """Receive a single client request on `sock` and send the answer."""
    aphorisms = recv_until(sock, b'?')
    answer = get_answer(aphorisms)
    sock.sendall(answer)

def recv_until(sock, suffix):
    """Receive bytes over socket `sock` until we receive the `suffix`."""
    message = sock.recv(4096)
    if not message:
        raise EOFError('socket closed')
    while not message.endswith(suffix):
        data = sock.recv(4096)
        if not data:
            raise IOError('received {!r} then socket closed'.format(message))
        message += data
    return message

7.2.1. Cli希望Serv理解的3个问题,作为aphorisms字典的键列出,对应的回答则以字典值的形式存储。

7.2.2. get_answer()为了在字典中,安全地查找回答而编写的一个简单快速函数。如果传入的问题无法被识别,会返回一个简短的错误信息。Cli的请求以问号结尾,回答则始终以句号结尾(错误信息也不例外)。这两个标点符号,为这个迷你协议提供了封帧功能

7.2.3. parse_command_line()提供了用于读取命令行参数的通用机制,create_srv_socket()则用于构造TCP的监听socket,Serv通过监听socket接收连接请求。俩个函数提供了所有Serv都会共同使用的一些启动代码

7.2.4. 以下4个函数间的层级调用过程,真正展示Serv进程的核心模式:

7.2.4.1. accept_connections_forever()只包含一个简单的循环,不断通过监听socket接收连接请求,且使用print()把每个连接的Serv打印出来,将连接socket作为参数传递给handle_conversation()

7.2.4.2. handle_conversation()包含一个无线循环,来不断处理请求。会捕捉可能发生的错误,使得Cli-socket的任何问题都不会引起程序的崩溃。如果Cli完成了所有请求,且已经挂起,最内层的数据接收循环,会抛出EOFError异常作为信号传递的方式。并不是一个真正的异常事件,专门在一个单独的except从句汇总捕捉了EOFError异常,将所有其他异常都视为错误,被捕捉后会通过print()函数进行输出。(所有普通的Py错误都继承自Exception,都会被except从句截获)finally能对已关闭的文件及socket对象重复调用close函数,且次数不限,通过这种方式运行close函数始终是安全的。

7.2.4.3. handle_request()能简单读取Cli的问题,然后做出应答,要注意的是,send()调用本身无法保证数据发送的完整性,所以要使用sendall()

7.2.4.4. recv_until()进行封帧,只要不断累加的字节字符串没有形成一个完整的问题,就会不断重复调用socket的recv()方法。

上述的程序就是用来构建各种服务器的工具箱

为测试各种Serv,需要一个Cli程序,7-2提供了一个简单命令行工具作为Cli。

# 7-2 client.py
import argparse, socket, random, zen_utils


def client(address, cause_error=False):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect(address)
    aphorisms = list(zen_utils.aphorisms)
    if cause_error:
        sock.sendall(aphorisms[0][:-1])
        return 
    for aphorisms in random.sample(aphorisms, 3):
        sock.sendall(aphorisms)
        print(aphorisms, zen_utils.recv_until(sock, b'.'))
    sock.close()

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Example client')
    parser.add_argument('host', help='IP or hostname')
    parser.add_argument('-e', action='store_true', help='cause an error')
    parser.add_argument('-p', metavar='port', type=int, default=1060,
                        help='TCP port (default 1060)')
    args = parser.parse_args()
    address = (args.host, args.p)
    client(address, args.e)

7.2.5. 正常情况下,cause_error为False,此时Cli将创建一个TCP-socket,然后发送3句格言作为请求,每发送一个就等待Serv返回相应的答案。有时想知道Serv会如何处理输入有误的情况,该Cli提供了-e选项,用来发送不完整的问题,然后使Serv突然挂起。如果没有提供-e选项,则Serv已经启动并正确运行,就能在Cli看到这3个问题及相应的答案

>python client.py 127.0.0.1
b'Simple is better than?' b'Complex.'
b'Beautiful is better than?' b'Ugly.'
b'Explicit is better than?' b'Implicit.'

如果OS上的1060-port不可用,可使用-p来指定另一个port

7.3. 单线程服务器

7-1中的zen_utils模块提供了丰富的工具程序,减少了编写一个简单的单threading-Serv的工作量。单threading-Serv是最简单的可用设计:

# 7-2 srv-single.py
import zen_utils

if __name__ == '__main__':
    address = zen_utils.parse_command_line('simple single-threaded server')
    listener = zen_utils.create_srv_socket(address)
    zen_utils.accept_connections_forever(listener)

该Serv要求提供一个命令行参数——Serv用来监听连接请求的port,如果要防止LAN/网络中的其他用户访问该Serv,应指定标准本地IP:127.0.0.1作为监听port

>python srv_single.py 127.0.0.1
Listening at ('127.0.0.1', 1060)
Accepted connection from ('127.0.0.1', 53995)
Client socket to ('127.0.0.1', 53995) has closed

如果只有一个Cli连接我们的网络服务,且该Cli在同一时刻只会发起一个连接,那么上一个连接一关闭,这个Serv就可准备进行下一个连接。只要一个连接正在进行,Serv就只可能出于下面两种状态中的一种:
1)可能被recv()调用阻塞,等待更多数据传达并被OS唤醒;
2)也可能正在尽快构造针对某个请求的答案,将答案立刻发送回请求方。只有在Cli还没有准备好接受数据时,send()/sendall()才可能阻塞。Cli一准备好,数据就可被发送,Serv就能从阻塞状态恢复,然后继续调用recv()等待请求。

7.3.1. 如果Serv与一个Cli进行会话期间,另一个Cli也尝试连接该Serv,那么这个单threading的缺点就显现了。

如果listen()函数的int参数大于0,OS至少会通过一个TCP三次握手来接受第二个Cli的连接请求,可节省一些Serv实现通信的准备时间。
只要Serv与第一个Cli的会话没有完成,新建立的连接就会一直处于OS的监听队列中。只有Serv与第一个Cli的会话完成了,且Serv代码进入下一次循环迭代,再次调用accept()函数时,Serv才能接收第二个Cli的连接请求,并生成相应的Cli-socket,并开始对该Cli-socket上的请求作出回答。

7.3.2. 要对这个单threading-Serv进行拒绝服务攻击:连接该Serv,且用不关闭该连接即可。Serv会永远阻塞在recv()调用,等待Cli的数据。Serv可能会通过sock.settimeout()设置了超时参数,防止Serv永远等待。此时只要调整一下拒绝Serv攻击的工具,使之发送的间隔不超过Serv超时参数设置的最长等待时间即可。

7.3.3. 由于单threading的设计无法再等待Cli发送下一个请求时进行其他操作,无法有效利用Serv的CPU和OS资源,可使用标准库的trace模块来运行该单threading-Serv,测试每一行代码花费的时间。为了只输出Serv代码的运行时间,需要令trace模块忽略与标准库有关的输出。

$ python -m trace -tg --ignore-dir=/usr srv_single.py ''

每行输出都给出了对应代码的运行时间,单位精确到秒。输出从Serv启动并运行第一行Py代码开始。可观察到,大多数代码在前一行代码运行完成后立刻开始运行,两行代码的运行时间不超过0.01s。每次Serv需要等待Cli时,就会停止运行,并进行等待。

 --- modulename: srv_single, funcname: <module>
0.00 srv_single.py(4): import zen_utils
 --- modulename: zen_utils, funcname: <module>
0.00 zen_utils.py(3): import argparse, socket, time
0.03 zen_utils.py(5): aphorisms = {b'Beautiful is better than
?': b'Ugly.',
0.03 zen_utils.py(6):              b'Explicit is better than?
': b'Implicit.',
0.03 zen_utils.py(7):              b'Simple is better than?':
 b'Complex.'}
0.03 zen_utils.py(9): def get_answer(aphorism):
0.03 zen_utils.py(14): def parse_command_line(description):
0.03 zen_utils.py(23): def create_srv_socket(address):
0.03 zen_utils.py(32): def accept_connections_forever(listene
r):
0.03 zen_utils.py(39): def handle_conversation(sock, address)
:
0.03 zen_utils.py(49): def handle_request(sock):
0.03 zen_utils.py(55): def recv_until(sock, suffix):
0.03 srv_single.py(6): if __name__ == '__main__':
0.03 srv_single.py(7):     address = zen_utils.parse_command_
line('simple single-threaded server')
 --- modulename: zen_utils, funcname: parse_command_line
0.03 zen_utils.py(16):     parser = argparse.ArgumentParser(d
escription=description)
0.03 zen_utils.py(17):     parser.add_argument('host', help='
IP or hostname')
0.03 zen_utils.py(18):     parser.add_argument('-p', metavar=
'port', type=int, default=1060, help='TCP port (default 1060)
')
0.03 zen_utils.py(19):     args = parser.parse_args()
0.03 zen_utils.py(20):     address = (args.host, args.p)
0.03 zen_utils.py(21):     return address
0.03 srv_single.py(8):     listener = zen_utils.create_srv_so
cket(address)
 --- modulename: zen_utils, funcname: create_srv_socket
0.03 zen_utils.py(25):     listener = socket.socket(socket.AF
_INET, socket.SOCK_STREAM)
0.03 zen_utils.py(26):     listener.setsockopt(socket.SOL_SOC
KET, socket.SO_REUSEADDR, 1)
0.03 zen_utils.py(27):     listener.bind(address)
0.03 zen_utils.py(28):     listener.listen(64)
0.03 zen_utils.py(29):     print('Listening at {}'.format(add
ress))
Listening at ('127.0.0.1', 1060)
0.03 zen_utils.py(30):     return listener
0.03 srv_single.py(9):     zen_utils.accept_connections_forev
er(listener)
 --- modulename: zen_utils, funcname: accept_connections_fore
ver
0.03 zen_utils.py(34):     while True:
0.05 zen_utils.py(35):         sock, address = listener.accep
t()
3.25 zen_utils.py(36):         print('Accepted connection fro
m {}'.format(address))
Accepted connection from ('127.0.0.1', 54895)
3.25 zen_utils.py(37):         handle_conversation(sock, addr
ess)
 --- modulename: zen_utils, funcname: handle_conversation
3.25 zen_utils.py(41):     try:
3.25 zen_utils.py(42):         while True:
3.25 zen_utils.py(43):             handle_request(sock)
 --- modulename: zen_utils, funcname: handle_request
3.25 zen_utils.py(51):     aphorisms = recv_until(sock, b'?')

 --- modulename: zen_utils, funcname: recv_until
3.25 zen_utils.py(57):     message = sock.recv(4096)
3.25 zen_utils.py(58):     if not message:
3.25 zen_utils.py(60):     while not message.endswith(suffix)
:
3.25 zen_utils.py(65):     return message
3.25 zen_utils.py(52):     answer = get_answer(aphorisms)
 --- modulename: zen_utils, funcname: get_answer
3.25 zen_utils.py(11):     time.sleep(0.0) # increase to simu
late an expensive operation
3.27 zen_utils.py(12):     return aphorisms.get(aphorism, b'E
rror: unknown aphorism.')
3.27 zen_utils.py(53):     sock.sendall(answer)
3.27 zen_utils.py(43):             handle_request(sock)
 --- modulename: zen_utils, funcname: handle_request
3.27 zen_utils.py(51):     aphorisms = recv_until(sock, b'?')

 --- modulename: zen_utils, funcname: recv_until
3.27 zen_utils.py(57):     message = sock.recv(4096)
3.27 zen_utils.py(58):     if not message:
3.27 zen_utils.py(60):     while not message.endswith(suffix)
:
3.27 zen_utils.py(65):     return message
3.27 zen_utils.py(52):     answer = get_answer(aphorisms)
 --- modulename: zen_utils, funcname: get_answer
3.27 zen_utils.py(11):     time.sleep(0.0) # increase to simu
late an expensive operation
3.27 zen_utils.py(12):     return aphorisms.get(aphorism, b'E
rror: unknown aphorism.')
3.27 zen_utils.py(53):     sock.sendall(answer)
3.27 zen_utils.py(43):             handle_request(sock)
 --- modulename: zen_utils, funcname: handle_request
3.27 zen_utils.py(51):     aphorisms = recv_until(sock, b'?')

 --- modulename: zen_utils, funcname: recv_until
3.27 zen_utils.py(57):     message = sock.recv(4096)
3.27 zen_utils.py(58):     if not message:
3.27 zen_utils.py(60):     while not message.endswith(suffix)
:
3.27 zen_utils.py(65):     return message
3.27 zen_utils.py(52):     answer = get_answer(aphorisms)
 --- modulename: zen_utils, funcname: get_answer
3.27 zen_utils.py(11):     time.sleep(0.0) # increase to simu
late an expensive operation
3.27 zen_utils.py(12):     return aphorisms.get(aphorism, b'E
rror: unknown aphorism.')
3.27 zen_utils.py(53):     sock.sendall(answer)
3.27 zen_utils.py(43):             handle_request(sock)
 --- modulename: zen_utils, funcname: handle_request
3.27 zen_utils.py(51):     aphorisms = recv_until(sock, b'?')

 --- modulename: zen_utils, funcname: recv_until
3.27 zen_utils.py(57):     message = sock.recv(4096)
3.27 zen_utils.py(58):     if not message:
3.27 zen_utils.py(59):         raise EOFError('socket closed'
)
3.27 zen_utils.py(44):     except EOFError:
3.27 zen_utils.py(45):         print('Client socket to {} has
 closed'.format(address))
Client socket to ('127.0.0.1', 54895) has closed
3.28 zen_utils.py(47):         sock.close()
3.28 zen_utils.py(35):         sock, address = listener.accep
t()

这是client.py与Serv之间的一个完整会话,包含了3个请求与响应。从第一行代码到最后一行代码,总共处理时间为0.03s。三次等待Cli的时间总和为0.02+0.02+0.02=0.06,这0.06s中Serv都是空闲的,意味着Serv的CPU占用率在上述信息交换中,只有0.01/0.06=16.7%,这些比率不是精确的。
故,除非单threading-Serv对每个请求进行大量CPU运算,否则Serv的CPU使用率是极低的,当其他Cli在等待Serv为其服务时,CPU一直是空闲的。

7.3.4. 第一个recv()调用会立即返回,而第二个、第三个直到最后一个recv()调用,在知道socket已经关闭前,都会延迟一段时间才返回数据。第一个recv()调用能立即返回,因为OS的网络栈相当智能,会在建立TCP连接的三次握手时,将第一个请求的文本包含在内。当该连接正式存在并调用了accept()后,其实已经有数据在等待被接收了,recv()就可立即返回了。

7.3.5. 另一个细节是,send()并没有引起任何延迟。因为send()在POSIX系统上的语义认为,只要将数据发送至OS网络栈的发送缓冲区,就可以返回。仅通过send()返回无法保证OS真正发送了任何数据。只有在Serv监听更多的Cli数据,才能强制OS阻塞进程,并观察数据发送的结果。

7.4. 多线程与多进程服务器

如果希望Serv能同时与多个Cli进行会话,可创建多个共享相同内存空间的Threading,也可创建完全独立运行的Processing。
优点是简洁:直接使用单Threading-Serv,创建多个Threading运行它的多份副本
缺点是:Serv能同时通信的Cli数量受OS并发机制规模的限制。即使某个Cli处于空闲状态/运行缓慢状态,也会占用整个thd/prc。就算程序被recv()阻塞,也会占用RAM及进程表中的一个进程槽。当同时运行的thd数量达到几千甚至更多时,OS很少能维持良好的表现。此时OS在切换服务的Cli时,需进行大量上下文切换,使得服务的运行效率大大降低。

7.4.1.OS简化了多thd/多prc-Serv,都需使用一个主控制thd来不断运行accept()循环,然后将新创建的Cli-socket交给等待队列中的工作thd来处理。每个thd都可拥有Serv监听socket的一个副本,并运行自己的accept()函数。OS会将新的Cli连接交由任何运行了accept()函数,并处于等待的thd来处理。如果所有thd都处在繁忙状态,OS会将该连接置于队列中,直到某个thd空闲为止,如7-4。

# 7-4多线程Serv srv_threaded.py
import zen_utils
from threading import Thread

def start_threads(listener, workers=4):
    t = (listener,)
    for i in range(workers):
        Thread(target=zen_utils.accept_connections_forever, args=t).start()
        

if __name__ == '__main__':
    address = zen_utils.parse_command_line('multi-threaded server')
    listener = zen_utils.create_srv_socket(address)
    start_threads(listener)
注意:只是多thd程序的一种可能设计。主thd启动n个Serv-thd,然后退出。主thd认为这n个工作thd将永远运行,因此运行这些thd的prc也会保持运行状态。

此外,还有其他可选设计。
如,主thd可以保持运行,且成为一个Serv-thd。主thd也可作为一个监控线程,每隔一段时间就检查一下n个Serv-thd是否仍然在运行。如果有Serv-thd停止运行了,主thd就将其重启。
如果不使用threading.Thread,而使用multiprocessing.Process,OS会为每个线程分配独立的内存空间及文件描述符,会增加OS的开销,但能更好地隔离prc,进一步降低Serv-thd造成主监控thd崩溃的概率。
threading和multiprocessing模块有一个共同的基本特点:无论Cli是否在发送请求,都会为每个连接的Cli分配一个开销较大的OS级控制thd。
Serv代码无需任何改变,就能部署到多个thd(假设每个thd都建立了自己的DB连接,并管理自己打开的文件,无需协调不同thd之间的资源),很容易使用多thd方法来处理Serv的工作。内部服务不向公共开放,攻击者无法简单地打开很多空闲的连接,使thd-pool/prc-pool耗尽资源。

7.4.2. 遗留的SocketServer框架

socketserver将多线程模式分成了两个模式:

7.4.2.1. 用于打开监听socket,并接受Cli连接的server模式

7.4.2.2. 用于通过某个打开的socket与特定Cli进行会话的handler模式。

7-5结合使用这两个模式,实例化一个server对象,将一个handler对象作为参数传给server对象

# 7-5 使用标准库服务器模式构建的多线程服务器 srv_legacy1.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from socketserver import BaseRequestHandler, TCPServer, ThreadingMixIn
import zen_utils

class ZenHandler(BaseRequestHandler):
    def handle(self):
        zen_utils.handle_conversation(self.request, self.client_address)

class ZenServer(ThreadingMixIn, TCPServer):
    allow_reuse_address = 1
    # address_family = socket.AF_INET6 # uncomment if you need IPv6
    
if __name__ == '__main__':
    address = zen_utils.parse_command_line('legacy "SocketServer" server')
    server = ZenServer(address, ZenHandler)
    server.serve_forever()

可将ThreadingMixIn改为ForkingMixIn,就可使用完全隔离的prc来处理连接的Cli,而不使用thd
与7-4中的程序相比,7-5的缺点相当明显,7-4中启动了固定数量的thd,thd的数量可由Serv的管理员,根据特定Serv和OS能高效管理的控制thd数量,来指定。而7-5中的则有Serv的Cli连接池来决定启动的thd数量——不限制Serv最终启动的thd数量,使得攻击者很容易令Serv过载,故开发用于生产环境及面向客户的服务时,不推荐使用这个标准库模块。

7.5. 异步服务器

从Serv向Cli发送响应,到接受Cli的下一个请求间,有一段时间的间隔。
如何在不为每个Cli分配一个OS级的控制thd的前提下,保证CPU处于繁忙状态?
可采用一种异步(asynchronous)模式来编写Serv,代码不需等待数据发送至某个特定的Serv/由这个Cli接受。代码可从整个处于等待的Cli-socket列表中读取数据。只要任何一个Cli做好了进行通信的准备,Serv就可向该Cli发送响应。

7.5.0. 现代OS网络栈的两个特点,使该模式的应用成为了现实:

7.5.0.1. 网络栈提供了一个OS调用,支持prc为等待整个Cli-socket列表中的socket而阻塞,而不只是等待一个单独的Cli-socket。可使用一个thd来同时为成千上万的Cli-socket服务

7.5.0.2. 可将一个socket配置为非阻塞socket。非阻塞socket在进行send()/recv()时,用于不会阻塞调用进程。无论会话中是否有进一步的交互,send()或recv()调用都会立刻返回。如果发生延迟,调用方会负责,在稍后Cli准备好继续进行交互时重试。

7.5.0.3. 异步(asynchronous)表示Serv代码不会停下来等待某个特定的Cli,运行的代码的控制thd不是同步(synchronized)的。控制thd不会以锁步的方式等待任何一个进行会话的Cli。异步Serv可在所有连接的Cli间自由切换,并提供相应的服务。

7.5.0.4. OS通过许多调用来支持异步的代码。最古老的就是POSIX的select()。在很多方面都显得效率低下。现代OS出现了select()的替代品。

如:Linux上的poll()和BSD系统上的epoll()调用。

7-6展示了一个简单异步Serv的完整内部细节,用于简单的Zen协议

# 7-6 一个简单的异步事件循环 srv_async.py

import select, zen_utils

def all_event_forever(poll_object):
    while True:
        for fd, event in poll_object.poll():
            yield fd, event

def server(listener):
    sockets = {listener.fileno(): listener}
    addresses = {}
    bytes_received = {}
    bytes_to_send = {}

    poll_object = select.poll()
    poll_object.register(listener, select.POLLIN)

    for fd, event in all_event_forever(poll_object):
        sock = sockets[fd]
        # Socket closed: remove it from our data structures.
        if event & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
            address = addresses.pop(sock)
            rb = bytes_received.pop(sock, b'')
            sb = bytes_to_send.pop(sock, b'')
            if rb:
                print('Client {} sent {} but the closed'.format(address, rb))
            elif sb:
                print('Client {} closed before we sent {}'.format(address, sb))
            else:
                print('Client {} closed socket normally'.format(address))
            poll_object.unregister(fd)
            del sockets[fd]

        # New socket: add it to our data structures.
        elif sock is listener:
            sock, address = sock.accept()
            print('Accepted connection from {}'.format(address))
            sock.setblocing(False) # force socket.timeout if we blunder
            sockets[sock.fileno()] = sock
            address[sock] = address
            poll_object.register(sock, select.POLLIN)

                # Incoming data: keep receiving until we see the suffix
            
        elif event & select.POLLOUT:
            data = bytes_to_send.pop(sock)
            n = sock.send(data)
            if n < len(data):
                bytes_to_send[sock] = data[n:]
            else:
                poll_object.modify(sock, select.POLLIN)

if __name__ == '__main__':
    address = zen_utils.parse_command_line('low-level async_server')
    listener = zen_utils.create_srv_socket(address)
    server(listener)

7.5.0.5. 该事件循环的精髓在于:使用了自己的数据结构来维护每个Cli会话的状态,而没有依赖OS在Cli活动改变时,进行上下文切换。

这个Serv实际上有两层循环:

一个不断调用poll()的while循环,一次poll()调用可能返回多个事件,因此该while循环内部还有一个for循环,用于处理poll()返回的每个事件。
将两层迭代隐藏在一个生成器内,避免了主Serv循环因为这两次循环迭代,而多用两个不必要的缩进

7.5.0.6. 这个程序中,维护了一个sockets字典。从poll()获取,表示已准备好进行后续通信的socket的文件描述符n后,就能根据该文件描述符,从sockets字典中,查找到对应的Py-socket了。还存储了socket的地址,即使socket已经关闭,OS无法继续提供已经连接的地址,也能打印出正确的远程地址作为调试信息。

7.5.0.7. 该async-Serv的核心是缓冲区:在等待某个请求完成时,将收到的数据存储在bytes_received字典中。这两个缓冲区与告知poll(),要在每个socket上等待的事件一起,形成了一个完整的状态机,用于一步步处理Cli会话。

1)准备连接的Cli会将自身,视作Serv监听socket上的一个事件,始终将该事件设置为POLLIN(poll input)状态。响应此类事件的方法就是运行accept(),将返回的socket及其地址存储在字典中,并通过register()告知poll对象,已经准备好从新的Cli-socket接收数据了。
2)当socket本身就是Cli-socket,且事件类型为POLLIN时,就能使用recv()接收最多为4KB的数据了。如果还没接收到表示帧尾的问号字符,就将数据保存到bytes_received字典中,并返回至循环顶部,进行下一个poll()。反之,表示已经接收到了一个完整的问题,就可处理该Cli请求,使用zen_utils的get_answer()函数查询对应的回答,并将结果存储到bytes_to_send字典汇总。包含了一个很重要的操作:将socket的模式从POLLIN切换至POLLOUT。
POLLIN模式: 表示要接收更多数据,
POLLOUT模式: 表示在发送缓冲区空闲时,立刻通知OS,此时socket不用于接收,而是用于发送
3)socket模式设置为POLLOUT后,只要Cli-socket的发送缓冲区,还能接收一个/多个字节,poll()就会立刻通知我们。作为响应,使用send()发送余下的需要发送的内容。要发送的数据超出了,发送缓冲区的容量,就将超出部分保存至bytes_to_send
4)如socket模式为POLLOUT,且send()完成了所有数据的发送,此时就完成了一个完整的请求-响应循环,将socket模式切回POLLIN,用于下一个请求。
5)如果Cli-socket返回了错误信息/关闭状态,将该Cli-socket及发送缓冲区与接收缓冲区丢弃。至此,至少完整地完成了,众多可能同时进行的会话中的一个。
该异步的关键在于,可在一个控制thd中,处理成千上万的Cli会话。当每个Cli-socket准备好下一个事件时,代码就执行该socket的下一个操作,接收/发送数据,然后立刻返回到poll()调用,监控更多事件。
使用这种单thd的异步方法,不需进行任何OS上下文切换(除为了进行poll()、recv()、send()、close()系统调用,而在进入OS时进行的特权模式切换外)。通过将所有Cli会话的状态保存在一系列字典中,且将Cli-socket作为键进行索引,成功在单个控制thd中处理大量Cli。
本质上来说,就是使用Py字典支持的键值查找功能,替代了OS成熟的上下文切换机制,而多thd/多prc-Serv,在位不同的Cli提供服务时,需要的正是这种机制。

7.5.0.8. 技术角度来说,不使用sock.setblocking(False)将所有accept()返回的Cli-socket设置为非阻塞模式,7-6依然能正确运行。因为7-6只有在等待数据时,才调用recv(),只要有数据输入,recv()就不会阻塞。同样,只在有数据可传输时,send()才被调用,只要有数据能被写入到OS的发送缓冲区,send()就不会阻塞。

7.5.0.9. 使用setblocking(),可以防止代码出错。如果没有将socket设置为非阻塞,只要在错误的位置调用了send()/recv(),就可能造成阻塞,且除了造成阻塞的Cli外,其他所有Cli都无法得到Serv的响应。使用了setblocking()后,如果Serv出现问题而发响应的话,就会抛出socket.timeout异常。就可知道OS无法立刻对我们进行的某个调用作出响应。

7.5.0.10. 如启动多个Cli,来与这个Serv进行通信,就能发现,这个单thd-Serv能自如的处理所有同时进行的会话。

为编写7-6中的Serv,需深入了解一些OS的内部细节。如想将注意力放在Cli上,而将select()、poll()/epoll()有关的细节交给别人负责,该怎样做?

7.5.1. 回调风格的asyncio

Py3.4将asynico框架引入标准库,为基于select()、epoll()及其他类似机制的事件循环,提供标准接口。

7.5.1.1. asyncio维护了一个select风格的核心循环,将所有进行I/O操作的socket保存在了一个表中,有需要时,会在select循环里,向表中添加/删除socket。一旦socket关闭,asyncio就将其清除或丢弃。最后,当接收到实际数据时,将由用户代码来决定要返回的正确响应。

asynico框架支持两种编程风格:1)回调风格;2)协程风格

1)类似于Py2中的Twisted框架,用户通过对象实例来维护每个打开的Cli连接。这种设计模式中,使用对象实例上的方法调用,代替了7-6中用来家属Cli会话的各步骤。
7-7中,可看到一个熟悉的流程:读取问题,然后给出响应,直接使用了asyncio框架。

# 7-7 回调风格的asyncio服务器 srv_asyncio1.py
import asyncio, zen_utils

class ZenServer(asyncio.Protocol):

    def connection_made(self, transport):
        self.transport = transport
        self.address = transport.get_extra_info('peername')
        self.data = b''
        print('Accepted connection from {}'.format(self.address))

    def data_received(self, data):
        self.data += data
        if self.data.endswith(b'?'):
            answer = zen_utils.get_answer(self.data)
            self.transport.write(answer)
            self.data = b''
    
    def connection_lost(self, exc):
        if exc:
            print('Client {} error: {}'.format(self.address, exc))
        elif self.data:
            print('Client {} sent {} but then closed'.format(self.address, self.data))
        else:
            print('Client {} closed socket'.format(self.address))
    
if __name__ == '__main__':
    address = zen_utils.parse_command_line('asyncio server using callbacks')        
    loop = asyncio.get_event_loop()
    coro = loop.create_server(ZenServer, *address)
    server = loop.run_until_complete(coro)
    print('Listening at {}'.format(address))
    try:
        loop.run_forever()
    finally:
        server.close()
        loop.close()

7-7中,真正的socket对象被隐藏了。可通过该框架来获取远程地址,而不直接通过socket来获取。数据通过一个方法来调用传输的,该方法只需将接收到的字符作为参数即可。

7.5.1.2. 当需要发送响应时,只要将回答,传递给框架transport.write()方法即可,无需将相关代码写到主事件循环中。回答数据被真正传递给OS,并发送回Cli这一过程,其实是在主循环执行期间进行的。框架能确保尽快完成数据的传输,且不会阻塞其他进行中的Cli连接。

异步通信的过程,比7-7复杂的多。如,构造响应信息的过程,并不像7-7那样,而是可能会涉及文件系统上的文件读取/对DB等后端数据服务的查询。

这种情况下,代码要处理两个方向上的数据传输:
asyncio框架,既会负责Serv与文件系统/数据库间的数据发送和接收。此时可能会在回调方法中,构造一些futures对象,用于更深一层的回调,以供DB或文件系统的I/O最终完成时触发。

7.5.2. 协程风格的asyncio

asynico提供的另一种构造协议代码的方法是,使用协程(coroutine)。是一个函数,进行I/O操作时,不会阻塞,而是会暂停,并将控制权转移回调用方。

7.5.2.1. Py支持协程的一种标准形式就是生成器(generator)——在内部包含一个/多个yield语句的函数。这类函数不会再运行了一条返回语句后就退出,而是会返回一个序列。

通用的生成器中的yield语句,只用来生成一系列的项,供调用方使用。
asyncio利用了PEP 380中提出的扩展yield句法。
扩展句法不仅允许使用yield from语句,利用另一个生成器来生成序列,还允许yield将返回值返回给一个协程,甚至能在调用方需要的时候,抛出异常。
使我们能在协程中,使用result = yield的形式。yield后面的对象,描述了想进行的操作,可以是另一个socket的内容,也可以是读取文件系统。如果操作成功,就会将yield的结果存储在result中,反之,就直接在协程内触发异常。
7-8说明了,通过协程实现的Zen协议。

# 7-8 协程风格的asynico服务器
import asyncio, zen_utils

@asyncio.coroutine
def handle_conversation(reader, writer):
    address = writer.get_extra_info('peername')
    print('Accepted connection from {}'.format(address))
    while True:
        data = b''
        while not data.endswith(b'?'):
            more_data = yield from reader.read(4096)
            if not more_data:
                if data:
                    print('Client {} send {!r} but then closed'.format(address, data))
                else:
                    print('Client {} closed socket normally'.format(address))
                return 
            data += more_data
        answer = zen_utils.get_answer(data)
        writer.write(answer)

if __name__ == '__main__':
    address = zen_utils.parse_command_line('asycio server using coroutine')
    loop = asyncio.get_event_loop()
    coro = asyncio.start_server(handle_conversation, *address)
    server = loop.run_until_complete(coro)
    print('Listening at {}'.format(address))
    try:
        loop.run_forever()
    finally:
        server.close()
        loop.close()

将7-8和之前编写Serv的方法对比。while循环使用之前提到的封帧方法,不断调用recv(),将响应写入并发送给等待的Cli。所有操作都封装在while循环中。该循环会尽可能对各Cli发出的请求作出响应。

7.5.2.2. 协程风格的Serv与之前有一个重要的区别,7-8使用了生成器的形式,使用yield from代替了之前所有进行阻塞操作,并等待OS响应的地方。这一区别,使得生成器能直接应用到asyncio子系统中,且不会将OS阻塞,也不会同时处理多个Cli连接。

7.5.2.3. 使用该方法,更容易找到生成器可能暂停的地方,所以PEP 380推荐在协程中使用该方法。每次调用yield时,该协程都可能会在一段不确定的时间内暂停运行。

Py2中有一些框架会使用普通的网络代码,如gevent和eventlet,这些框架使用普通的阻塞I/O调用,在合适的地方,截获这些调用并完成真正意义上的异步I/O

7.5.2.4. asyncio的协程方法有点啰嗦,但意思明确,每个可能暂停的地方,都有yield语句。对于另外几个框架,代码意思不明确,但更紧凑,使用了像recv()的调用。这些调用在代码中,就像普通的方法调用一样,但是会在返回时,将控制权交还给异步I/O循环

7.5.3. 遗留模块asyncore

# 7-9 使用旧式asyncore框架 srv_legacy2.py
import asyncore, asynchat, zen_utils

class ZenRequestHandler(asynchat.async_chat):
    def __init__(self, sock):
        asynchat.async_chat.__init__(self, sock)
        self.set_terminator(b'?')
        self.data = b''

    def collect_incoming_data(self, more_data):
        self.data += more_data

    def found_terminator(self):
        answer = zen_utils.get_answer(self.data + b'?')
        self.push(answer)
        self.initiate_send()
        self.data = b''

class ZenServer(asyncore.dispatcher):
    def handle_accept(self):
        sock, address = self.accept()
        ZenRequestHandler(sock)

if __name__ == '__main__':
    address = zen_utils.parse_command_line('legacy "asyncore" server')
    listener = zen_utils.create_srv_socket(address)
    server = ZenServer(listener)
    server.accepting = True # we already called listen()
    asyncore.loop()

以上代码是有问题的。ZenServ对象没有被传递给asyncore.loop(),也没有进行任何显示注册,但是控制循环却获取了可用的服务。

7.5.3.1. 显然,这个模块不当地使用了一些模块级别的全局变量,但是通过某些不良的方法,建立了主循环、Serv对象、请求处理程序间的联系,但却无法看到这一联系的建立方法。

asyncore隐藏的许多步骤与asyncio显式处理的步骤还是有许多相同之处的。每个Cli连接都会新建一个ZenRequestHandler实例。
所有这些异步框架中,数据的接收和发送间都是不对称的。
接收数据的过程中,需要返回控制权并将控制权交还给框架。只要还有输入数据需要接收,就要重新调用recv()。但,发送数据后就不需再理会什么了。可将需要发送的所有数据都交个框架处理,框架会确保使用必要的send()调用去尽量将数据发送出去。

7.5.3.2. 除非使用包含了隐藏操作的gevent或eventlet,否则要使用异步框架来编写Serv代码,需要遵循不同的编程风格,与7-3中的简单Serv是不相同的。

多thd与多prc可以直接使用未经修改的单thd代码,而异步方法需要对代码进行分解,使每部分都可以非阻塞运行。
回调风格的异步程序会把每一个非阻塞代码片段封装在方法中;
协程风格的异步程序则会将所有非阻塞操作放在yield/yield from 语句中

7.5.4. 两全其美的方法

异步Serv可以在服务的不同Cli会话间迅速切换。要完成切换,只要扫描协议对象即可(7-6中,只要扫描字典键值对)。比起OS的上下文切换,这种方法为Cli提供服务的花销要小很多。
异步Serv是有硬件限制的,
所有操作都在单个OS线程中完成,一旦CPU使用率到达100%,异步Serv就无法再为任何Cli提供服务。即使Serv有多核,所有工作也只能在单个CPU上完成(对原始形式的异步Serv来说)
需要高性能时的解决方案(需询问MA):

7.5.4.1. 使用异步的回调对象/协程来编写服务,

7.5.4.2. 通过某个异步框架启动服务

7.5.4.3. 配置一些运行Serv的OS,检查OS的CPU内核数目。有多少核,启动多少个事件循环。

以上即可享受到两种方法的优点了,在每个给定CPU上,异步框架都可以不断在打开的Cli-socket间来回切换,无需prc间的上下文切换。OS可将新建立的连接分配给某个Serv-prc,理想情况下能平衡整个Serv的负载。

7.6. 在inetd下运行

几乎所有Linux都提供了inetd守护进程,在web发展的早期阶段发明,用于解决:在一台特定的Serv上,在OS启动时,启动n个不同的后台prc,用于提供n个不同的网络服务。可在/etc/inetd.conf中将所有要监听的port列出。

7.6.1. inetd在列出的每个port上,都调用了bind()和listen(),不过只在Cli真正连接时,才启动一个Serv-prc。

这种模式来支持port较小的服务(普通用户运行的)是很容易的,因为inetd-prc本身打开的port也是较小的。inetd可以为每个Cli都启动一个prc,也可在Serc接受了Cli连接后,使用同一prc监听下一个连接请求。

7.6.2. 为每个连接都建立一个prc的花销很大,会降低Serv的利用率,但该方法更简单。只需在inetd.com中将第4个字段设为nowait即可。

1060 stream tcp nowait brandon /usr/bin/python3 /usr/bin/python3 in_zen1.py

服务一启用,stdin、stdout、stderr便被连接到了CLi-socket。服务只需与连接的Cli通信,然后退出即可。

# 7-10 in_zen1.py 响应一个将socket连接到stdin/stdout/stderr的Cli
# -- coding: utf-8 --
import socket, sys, zen_utils

if __name__ == '__main__':
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sys.stdin = open('/dev/null', 'r')
    sys.stdout = sys.stderr = open('log.txt', 'a', buffering=1)
    address = sock.getpeername()
    print('Accepted connection from {}'.format(address))
    zen_utils.handle_conversation(sock, address)

很少会希望Py/Python库将原始的追踪和状态信息,输出到stdout/stderr中,中断与Cli的会话,该脚本将Py的stdin、stdout、stdout设置为合适的已打开文件。这种方法只操作了sys内的文件对象,没有操作真正的文件描述符,只是在Py内部重新设置了I/O。

7.6.3. Serv调用了处理I/O的底层C库,就需关闭表示stdin、stdout、stderr的文件描述符0、1、2,这种情况下,将开始处理沙箱类的工作,supervisord模块/“平台即服务"的容器都已经更好的完成了这一工作。

只要port较小,就可在普通用户命令行中运行inetd -d inet.conf来测试7-10中的程序。inet.conf包含了之前提到的配置行。按照运行client.py连接到该服务的port即可。

7.6.4. 另一种模式,将inetd.conf的第4个字段指定为wait,会将监听socket提供给脚本。脚本需调用accept(),用于接受正在等待的Cli的连接请求。

这一模式的优势在于,Serv可保持运行状态,不断运行accept()来接受更多的Cli连接请求,这个过程不需要inetd的介入。如果Cli暂停连接,Serv也可自由调用exit(),来降低Serv的内存占用。在Cli再次需要Serv时,再启动Serv即可。inetd会检测到服务已退出,会由inetd来负责监听。
7-11是使用wait模式来涉及到。能永远接受新的连接请求,也可发生超时/退出行为。如果几秒内都没有任何Cli连接,Serv便无需一直将该程序置于RAM中

7-11 in_zen2.py 对一个/多个Cli连接做出响应,最终发生超时

import socket, sys, zen_utils

if __name__ == '__main__':
    listener = socket.fromfd(0, socket.AF_INET, socket.SOCK_STREAM)
    sys.stdin = open('dev/null', 'r')
    sys.stdou = sys.stderr = open('log.txt', 'a', buffering=1)
    listener.settimeout(8.0)
    try:
        zen_utils.accept_connections_forever(listener)
    except socket.timeout:
        print('Waited8 seconds with no further connections; shutting down')

这个Serv使用了原始的单thd设计。生产环境中,要能在一个已经处于监听状态的socket上不停运行accept()。如果不介意使用inetd启动Serv-prc后便不再退出该prc,较容易。但是希望Serv在连续一段时间内没有活动的情况下,提供超时和关闭功能,就会复杂些。因为,对于一组thd/prc来说,确认最近一段时间内,既没有与Cli通信,也没有收到任何Cli的连接请求,从而无需保持运行状态的过程是相当棘手的。

7.7. 小结

7.7.1. 使用多thd时,通常可不加修改地使用单thd-Serv程序,OS会负责隐式地完成切换,使等待中的Cli能快速得到响应,空闲的Cli则不会消耗Serv的CPU。这一技术允许同时进行多个Cli会话,且很好地利用了Serv的CPU。对于原始的单thd-Serv,大多数时间都在等待Cli的操作,CPU很多时候都是空闲的。

7.7.2. 更复杂但更强大的方法是,使用异步编程,在单个控制thd中完成对大量Cli的服务切换。向OS提供了当前正在进行会话的完整socket列表。复杂处在于,需要将读取Cli请求,然后构造响应的过程,分割为小型的非阻塞代码块,就能在等待Cli操作时,将控制权交还给异步框架。尽管可通过select()/poll()来手动编写异步Serv,但是Py3.4以上的标准库中内置的asyncio框架可提供异步功能。

7.7.3. 将编写的服务,安装到Serv上,且在OS启动时运行Serv的过程叫部署。可使用许多现代机制进行自动化部署,如OS启动时,运行supervisord工具/将控制权交给一个“平台即服务”容器。在基本的Linux-Serv上,可用最简单的方法: 古老的inetd守护进程,提供极其简单的方法,在Cli需要连接时,保证服务处于启动状态。

标签:Serv,读数,Cli,python,utils,编程,zen,py,socket
来源: https://www.cnblogs.com/wangxue533/p/12032816.html