编程语言
首页 > 编程语言> > 20190918 Python 网络与线程

20190918 Python 网络与线程

作者:互联网

多进程

进程的概念

Process 类

Process(target , name , args)
参数介绍
target表示调用对象,即子进程要执行的任务
args表示调用对象的位置参数元组,args=(1,)
name为子进程的名称

Process 常用方法

p.start():启动进程,并调用该子进程中的p.run() 
p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  
p.terminate()(了解)强制终止进程p,不会进行任何清理操作
p.is_alive():如果p仍然运行,返回True.用来判断进程是否还在运行
p.join([timeout]):主进程等待p终止,timeout是可选的超时时间

Python 实现多进程

创建 Process 对象
"""
    1. 导入 multiprocessing 模块(Python 自带的)
    2. 创建 multiprocessing.Process 对象
    3. 执行线程方法   --> start 方法
    4. 挂起主进程,等待子进程执行完毕  -->  join 方法

"""

from multiprocessing import Process
from time import sleep


def run(name):
    """ 子进程执行的函数 """
    sleep(1)
    print("Hello multiprocess {0}!".format(name))


if __name__ == '__main__':
    """ 
            在 window 下,必须加 if __name__ == '__main__': 语句,否则会报错
    """
    pro = Process(target=run, name='my-process', args=('Hi',))
    print('开启子进程 ...')
    pro.start()
    print("子进程名字:", pro.name)  # 打印子进程的名字
    print("子进程PID", pro.pid)
    pro.join()
    print('子进程执行完毕 ...')

子类继承 Process

这第二种 Python 实现多线程的方式,通过子类继承 multiprocessing.Process 类,这种方法实际用的多

from multiprocessing import Process
from time import sleep


class ClockTimer(Process):  # 继承 Process 类
    # 重写 run 方法
    def run(self):
        num = 5
        while num > 0:
            print("num:", num)
            num -= 1
            sleep(0.5)


if __name__ == '__main__':
    ct = ClockTimer()
    ct.start()
    ct.join()
    print('程序执行结束 ...')

多进程默认数据不共享

from multiprocessing import Process

"""
    测试多线程数据不共享
"""

num = 10


def run1(name):
    """方法1"""
    global num
    num += 5
    print("{}: num=".format(name), num)


def run2(name):
    """方法2"""
    global num
    num += 10
    print("{}: num=".format(name), num)


if __name__ == '__main__':
    # 创建 Process 对象
    p1 = Process(target=run1, args=('子进程1', ))
    p2 = Process(target=run2, args=('子进程2', ))
    # 启动子进程
    p1.start()
    p2.start()
    # 挂起主进程
    p1.join()
    p2.join()
    print("num:{}".format(num))



"""
运行结果:
子进程1: num= 15
子进程2: num= 20
num:10
"""

进程池

用来创建多个进程

利用:Python 3提供 multiprocessing 库中 Pool 类

Pool 类的常用方法

apply_async(func[, args[, kwds]]) : 使⽤⾮阻塞⽅式调⽤func(并⾏执⾏, 堵塞⽅式必须等待上⼀个进程退出才能执⾏下⼀个进程) , args为传递给func的参数列表, kwds为传递给func的关键字参数列表;
apply(func[, args[, kwds]])(了解即可几乎不用) 使⽤阻塞⽅式调⽤func
close(): 关闭Pool, 使其不再接受新的任务;
terminate(): 不管任务是否完成, ⽴即终⽌;
join(): 主进程阻塞, 等待⼦进程的退出, 必须在close或terminate之后使⽤; 

测试

from multiprocessing import Pool
from time import sleep



def test(num):
    print("newNum:", num)
    sleep(3)


if __name__ == '__main__':
    # 创建进程池
    pool = Pool(6)  # 若未给定子进程数, 默认子进程数为当前主机 CPU 的核心数,

    for i in range(1, 17):
        pool.apply_async(test, args=(i,))  # 并行执行子进程任务

    pool.close()  # 关闭进程池,不接受新的子进程请求
    pool.join()   # 阻塞主进程,该方法必须位于 close 方法后面执行

    # 在多进程中:主进程一般用来等待,真正的任务都在子进程中执行

进程间通信

可以使⽤multiprocessing模块的Queue实现多进程之间的数据传递
初始化Queue()对象时(例如: q=Queue()) , 若括号中没有指定最⼤可接收的消息数量, 或数量为负值, 那么就代表可接受的消息数量没有上限 
Queue.qsize(): 返回当前队列包含的消息数量
Queue.empty(): 如果队列为空, 返回True, 反之False 
Queue.full(): 如果队列满了, 返回True,反之False
Queue.get([block[, timeout]]): 获取队列中的⼀条消息, 然后将其从列队中移除, block默认值为True
如果block使⽤默认值, 且没有设置timeout(单位秒) , 消息列队如果为空, 此时程序将被阻塞(停在读取状态) , 直到从消息列队读到消息为⽌,如果设置了timeout, 则会等待timeout秒, 若还没读取到任何消息, 则抛出"Queue.Empty"异常 
如果block值为False, 消息列队如果为空, 则会⽴刻抛出“Queue.Empty”异常

Queue.get_nowait(): 相当Queue.get(False)
Queue.put(item,[block[, timeout]]): 将item消息写⼊队列, block默认值为True
如果block使⽤默认值, 且没有设置timeout(单位秒) , 消息列队如果已经没有空间可写⼊, 此时程序将被阻塞(停在写⼊状态) , 直到从消息列队腾出空间为⽌, 如果设置了True和timeout, 则会等待timeout秒, 若还没空间, 则抛出"Queue.Full"异常
如果block值为False, 消息列队如果没有空间可写⼊, 则会⽴刻抛出"Queue.Full"异常
Queue.put_nowait(item): 相当Queue.put(item, False); 

测试

from multiprocessing import Pool, Queue, Process
from time import sleep

"""
    进程间通信- 队列 Queue
"""


def run1(queue, msg):
    """ 将消息放入队列中 """
    sleep(0.5)
    queue.put(msg)
    queue.put(msg)
    queue.put(msg)
    queue.put(msg)
    print("full;", queue.full())


def run2(queue):
    """ 从队列中获取消息"""
    try:
        print("qsize:", queue.qsize())
        # 若队列中没有消息,get 方法默认会阻塞,等待队列中放入消息.
        msg = queue.get(timeout=1)
        print(msg)
        print("qsize:", queue.qsize())
    except Exception as e:
        print(e)


if __name__ == '__main__':
    q = Queue(3)  # 若不给定

    p1 = Process(target=run1, args=(q, '消息1'))
    p2 = Process(target=run2, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

单线程间通信

from multiprocessing import Pool, Queue, Process
from time import sleep


def write(q, msgs):
    """ 将循环消息放入队列中 """
    for msg in msgs:
        q.put(msg)
    sleep(0.5)


def read(q):
    """ 读取队列中的消息 """
    while True:
        if not q.empty():
            print(q.get())
            sleep(0.5)
        else:
            break


if __name__ == '__main__':
    q = Queue()
    w = Process(target=write, args=(q, "abc"))
    r = Process(target=read, args=(q,))
    w.start()
    w.join()   # 阻塞主进程
    r.start()
    r.join()

Pool 间通信

导入 multiprocessing.Manager

from multiprocessing import Pool, Manager
from time import sleep


def write(q, msgs):
    """ 将循环消息放入队列中 """
    for msg in msgs:
        q.put(msg)


def read(q):
    """ 读取队列中的消息 """
    sleep(2)
    while True:
        if not q.empty():
            print(q.get())
        else:
            break


if __name__ == '__main__':
    pool = Pool()
    q = Manager().Queue()

    pool.apply_async(write, args=(q, "abcde"))
    pool.apply_async(read, args=(q,))

    pool.close()  # 关闭进程池,不接受新的子进程请求
    pool.join()  # 阻塞主进程,该方法必须位于 close 方法后面执行


多线程

线程的概念

首先它是实现多任务的另一种方式,轻量级进程(Lightweight Process,LWP),是更小的执行单元

注意:线程不能独立存在,线程必须依赖于进程

线程与进程的区别

进程:用来分配资源

线程:用来具体执行 (即 CPU 调度)

threading.Thread 类

Python 3 实现多线程的方式

  1. 创建 threading.Thread 类的对象
  2. 继承 threading.Thread 类, 重写 run 方法
import threading
from time import sleep


class Act(threading.Thread):
    """  继承 threading.Thread 类 """
    def run(self):
        """ 重写 run 方法 """
        self.sing()
        self.dance()

    def sing(self):
        """ 唱歌函数 """
        for i in range(3):
            print("说好不哭 - 周董")
            sleep(1)

    def dance(self):
        """ 跳舞函数 """
        for i in range(3):
            print("天鹅之梦 ...")
            sleep(1)


if __name__ == '__main__':
    for i in range(2):
        a = Act()
        a.start()

    while True:
        print("现在正在执行的线程数:", len(threading.enumerate()), threading.enumerate())
        # threading.enumerate() 当前正在运行线程的列表
        if len(threading.enumerate()) <= 1:
            break
        else:
            sleep(0.5)

import threading
from time import sleep


def say():
    print("当前线程:{}".format(threading.current_thread().name))
    sleep(2)
    print("Hello !")


if __name__ == '__main__':
    """ 测试多线程 """
    print("当前线程名:{}".format(threading.current_thread().name))
    print("{:=^20}".format(" 分割线 "))
    for i in range(10):  # 开 10个 线程
        t = threading.Thread(target=say)
        t.start()

import threading
from time import sleep


def sing():
    """ 唱歌函数 """
    for i in range(3):
        print("说好不哭 - 周董")
        sleep(1)

def dance():
    """ 跳舞函数 """
    for i in range(3):
        print("天鹅之梦 ...")
        sleep(1)


if __name__ == '__main__':
    s = threading.Thread(target=sing)
    s.start()
    d = threading.Thread(target=dance)
    d.start()

    while True:
        print("现在正在执行的线程数:", len(threading.enumerate()), threading.enumerate())
        # threading.enumerate() 当前正在运行线程的列表
        if len(threading.enumerate()) <= 1:
            break
        else:
            sleep(0.5)

线程间数据共享

多线程不适于数据计算,这样很有可能发生数据错乱,而且,效率没有单线程高
多线程适用于 IO 耗时操作的应用场景

import threading
from time import sleep


"""
    多线程不适于数据计算,这样很有可能发生数据错乱,而且,效率没有单线程高
    多线程适用于 IO 耗时操作的应用场景
"""

num = 0

LOOP_NUM = 100
# LOOP_NUM = 100000


def run1():
    global num
    for i in range(LOOP_NUM):
        num += 1
    sleep(1)
    print("run1-->num:{}".format(num))


def run2():
    global num
    for i in range(LOOP_NUM):
        num += 1
    sleep(1)
    print("run2-->num:{}".format(num))


if __name__ == '__main__':
    r1 = threading.Thread(target=run1)
    r2 = threading.Thread(target=run2)
    r1.start()
    r2.start()
    print("num:{}".format(num))

线程的五种状态

多线程程序的执⾏顺序是不确定的(操作系统决定)

当执⾏到sleep语句时, 线程将被阻塞(Blocked) , 到sleep结束后, 线程进⼊就绪(Runnable) 状态, 等待调度。 ⽽线程调度将⾃⾏选择⼀个线程执⾏。 代码中只能保证每个线程都运⾏完整个run函数, 但是线程的启动顺序、run函数中每次循环的执⾏顺序都不能确定

1、新状态:线程对象已经创建,还没有在其上调用start()方法。
2、可运行状态:当线程有资格运行,但调度程序还没有把它选定为运行线程时线程所处的状态。当start()方法调用时,线程首先进入可运行状态。在线程运行之后或者从阻塞、等待或睡眠状态回来后,也返回到可运行状态。
3、运行状态:线程调度程序从可运行池中选择一个线程作为当前线程时线程所处的状态。这也是线程进入运行状态的唯一一种方式。
4、等待/阻塞/睡眠状态:这是线程有资格运行时它所处的状态。实际上这个三状态组合为一种,其共同点是:线程仍旧是活的(可运行的),但是当前没有条件运行。但是如果某件事件出现,他可能返回到可运行状态。
5、死亡态:当线程的run()方法完成时就认为它死去。这个线程对象也许是活的,但是,它已经不是一个单独执行的线程。线程一旦死亡,就不能复生。如果在一个死去的线程上调用start()方法,会抛出java.lang.IllegalThreadStateException异常。

线程同步

互斥锁

互斥锁 说明
mutex = threading.Lock() 创建互斥锁对象
mutex.acquire(): 加锁
mutex.release() 释放锁

使用

import threading
from time import sleep

"""
    线程同步 --->  互斥锁,解决数据不同步的问题,缺点:效率不高
"""

num = 0

# LOOP_NUM = 100
LOOP_NUM = 100000


def run1():
    global num
    if mutex.acquire():  # 加锁
        for i in range(LOOP_NUM):
            num += 1
    mutex.release()  # 释放锁
    sleep(1)
    print("run1-->num:{}".format(num))


def run2():
    global num
    if mutex.acquire(): # 加锁
        for i in range(LOOP_NUM):
            num += 1
    mutex.release()  # 释放锁
    sleep(1)
    print("run2-->num:{}".format(num))


if __name__ == '__main__':
    # 创建锁对象
    mutex = threading.Lock()
    r1 = threading.Thread(target=run1)
    r2 = threading.Thread(target=run2)
    r1.start()
    r2.start()
    r1.join()
    r2.join()
    print("num:{}".format(num))

死锁

import threading
from time import sleep

"""
     死锁:  
     mutexA 与 mutexB 相互等待,导致死锁现象发生。
"""


class MyThread1(threading.Thread):

    # 重写 run 方法
    def run(self):
        if mutexA.acquire():
            print("A----{}".format(threading.current_thread().name))
            sleep(1)
            if mutexB.acquire():
                print("B---{}".format(threading.current_thread().name))
                mutexA.release()
        mutexB.release()


class MyThread2(threading.Thread):

    # 重写 run 方法
    def run(self):
        if mutexB.acquire():
            print("B----{}".format(threading.current_thread().name))
            sleep(1)
            if mutexA.acquire():
                print("A---{}".format(threading.current_thread().name))
                mutexB.release()
        mutexA.release()


if __name__ == '__main__':
    """ 测试 """
    # 创建锁
    mutexA = threading.Lock()
    mutexB = threading.Lock()
    t1 = MyThread1()
    t2 = MyThread2()
    t1.start()
    t2.start()

同步和异步

多线程有序执行
import threading
from time import sleep

"""
        线程同步之有序执行
"""


class MyThread(threading.Thread):

    def run(self):
        if mutexA.acquire():
            print("A ----{}".format(threading.current_thread().name))
            sleep(1)
            mutexB.release()


class MyThread2(threading.Thread):

    def run(self):
        if mutexB.acquire():
            print("B -- {}".format(threading.current_thread().name))
            sleep(1)
            mutexC.release()


class MyThread3(threading.Thread):

    def run(self):
        if mutexC.acquire():
            print("C -- {}".format(threading.current_thread().name))
            sleep(1)
            mutexA.release()


if __name__ == '__main__':
    mutexA = threading.Lock()
    mutexB = threading.Lock()
    mutexC = threading.Lock()
    mutexB.acquire()
    mutexC.acquire()

    for i in range(10):
        MyThread().start()
        MyThread2().start()
        MyThread3().start()

多线程无序执行
import threading
from time import sleep

"""
        线程同步之无序执行
"""


class MyThread(threading.Thread):
    num = 1

    def run(self):
        if mutex.acquire():
            print("A{}  ----{}".format(MyThread.num, threading.current_thread().name))
            MyThread.num += 1
            sleep(0.1)
            mutex.release()


class MyThread2(threading.Thread):
    num = 1

    def run(self):
        while True:
            if mutex.acquire(False):
                print("B{}  --- {}".format(MyThread2.num, threading.current_thread().name))
                MyThread2.num += 1
                # sleep(0.1)
                mutex.release()
                break
            else:
                sleep(0.01)  # 休眠 0.1 s,再去获取 mutex 锁


class MyThread3(threading.Thread):
    num = 1

    def run(self):
        while True:
            if mutex.acquire(False):
                print("C{} --- {}".format(MyThread3.num, threading.current_thread().name))
                MyThread3.num += 1
                # sleep(1)
                mutex.release()
                break
            else:
                sleep(0.01)


if __name__ == '__main__':
    mutex = threading.Lock()

    for i in range(10):
        MyThread().start()
        MyThread2().start()
        MyThread3().start()

生产者与消费者

在线程世界⾥, ⽣产者就是⽣产数据的线程, 消费者就是消费数据的线程(做包子,吃包子)
经常会出现生产数据的速度大于消费数据的速度,或者生产速度跟不上消费速度

⽣产者消费者模式是通过⼀个容器(缓冲区)来解决⽣产者和消费者的强耦合问题
例如两个线程共同操作一个列表,一个放数据,一个取数据

⽣产者和消费者彼此之间不直接通讯, ⽽通过阻塞队列来进⾏通讯

Python的Queue模块:实现了3种类型的队列来实现线程同步,包括:
FIFO(先⼊先出) 队列 Queue,等其他

队列 Queue

class queue.Queue(maxsize=0)

FIFO 队列的构造器。maxsize为一个整数,表示队列的最大条目数,可用来限制内存的使用。

一旦队列满,插入将被阻塞直到队列中存在空闲空间。

如果maxsize小于等于0,队列大小为无限。maxsize默认为0

import threading
from time import sleep
from queue import Queue


class Producer(threading.Thread):
    """
        生产者
    """

    def run(self):
        global qu
        while True:
            if qu.qsize() <= 1000:
                for i in range(100):
                    qu.put(str(i))
            else:
                sleep(1)


class Consumer(threading.Thread):
    """
        消费者
    """

    def run(self):
        global qu
        while True:
            if qu.qsize() > 500:
                print("{:=^70}".format("消费者"))
                for _ in range(100):
                    msg = qu.get()
                    print("消费:", msg)
            else:
                sleep(1)


if __name__ == '__main__':
    # 创建队列
    qu = Queue()
    # 初始化产品,先放入 500 个产品
    for i in range(500):
        qu.put(str(i))

    print("当前队列的状态:{}".format(qu.qsize()))

    for i in range(2):
        # 创建生产者线程对象
        producer = Producer()
        producer.start()
    for i in range(5):
        # 创建消费者线程对象
        consumer = Consumer()
        consumer.start()

ThreadLocal 本地变量

作用:应用于线程中函数间参数的传递,实现不同线程间数据的隔离

应用场景:ThreadLocal最常⽤的地⽅就是为每个线程绑定⼀个数据库连接, HTTP请 求, ⽤户身份信息等, 这样⼀个线程的所有调⽤到的处理函数都可以⾮常⽅ 便地访问这些资源

import threading, time


class Student:
    """ 学生类"""

    def __init__(self, name):
        self.name = name

    def __str__(self):
        return "Student:{}".format(self.name)


def func():
    # 调用 ThreadLocal 本地变量
    stu = threadLocal.student
    print("当前线程名:{}".format(threading.current_thread().name))


def func2():
    # 调用 ThreadLocal 本地变量
    stu = threadLocal.student
    stu.name = stu.name + "OK"
    print("stu-name:{}".format(stu))

def run(name):
    stu = Student(name)
    # 为本地变量 ThreadLocal 自定义属性 student ,存储 stu 对象
    threadLocal.student = stu
    print("当前线程名:{}, 学生姓名:{}".format(threading.current_thread().name, stu.name))
    # 调用  fun 函数
    func()
    func2()


if __name__ == '__main__':
    # 定义全局线程本地变量
    threadLocal = threading.local()
    # 创建线程对象
    t1 = threading.Thread(target=run, args=("小白",), name="thread-litter-white")
    t2 = threading.Thread(target=run, args=("小黑",), name="thread-litter-black")
    # 开启线程
    t1.start()
    t2.start()

网络通信

网络基础

IP

由网络号和主机号组成

IP地址127. 0. 0. 1~127. 255. 255. 255⽤于回路测试

子网掩码

区分网络号和主机号

分类 子网掩码
A类地址 255.0.0.0
B类地址 255.255.0.0
C 类地址 255.255.255.0

端口号

用来标记区分进程

端口 范围
主机所有端口 0 - 65535
知名端口 0-1023
动态端口 1024-65535

协议

TCP/IP 分层

Socket

通过网络完成进程间通信的方式

UDP TCP
无连接 有连接
速度快 速度较慢
可能发生丢包,不稳定 稳定连接

AF_INET(ipv4协议⽤于 Internet 进程间通信)

套接字类型

from socket import socket, AF_INET, SOCK_DGRAM
from time import  sleep

if __name__ == '__main__':
    """
        发送消息
    """
    client = socket(AF_INET, SOCK_DGRAM)
    client.bind(('', 8000))
    while True:
        client.sendto('Hello 您好'.encode(), ('127.0.0.1', 9090))

        recvData = client.recvfrom(2048)  # 指接收数据的大小 2KB
        print(recvData[0].decode())

        sleep(3)
    # 关闭连接
    client.close()

TFTP 协议

Trivial File Transfer Protocol 简单文件传输协议

实现 TFTP 下载器

下载:从服务器上将一个文件复制到本机上

下载的过程:

from socket import socket, AF_INET, SOCK_DGRAM
import struct

"""
    struct模块可以按照指定格式将Python数据转换为字符串,该字符串为字节流
"""

if __name__ == '__main__':
    # 如何保证操作码(1 / 2 / 3 / 4 / 5)占两个字节?如何保证0占一个字节?
    # !H8sb5sb:
    # ! 表示按照网络传输数据要求的形式来组织数据(占位的格式)
    # H 表示将后面的 1 替换成占两个字节
    # 8s 相当于8个s(ssssssss)占8个字节
    # b 占一个字节
    filename = "server-bus-refresh.png"
    server_IP = '127.0.0.1'
    cmb_buf = struct.pack("!H{}sb5sb".format(len(filename)), 1, filename.encode(), 0, b"octet", 0)
    print(cmb_buf)
    client = socket(AF_INET, SOCK_DGRAM)
    # 发送请求
    addr = (server_IP, 69)
    client.sendto(cmb_buf, addr)

    # 创建本地文件
    with open(filename, 'ab') as f:
        while True:
            recv_data = client.recvfrom(1024)
            # print(recv_data)
            option_code, ack_num = struct.unpack('!HH', recv_data[0][:4])
            random_port = recv_data[1][1]
            # print(opcode, ack_num)
            if int(option_code) == 5:
                print("文件不存在 ...")
                break
            print("操作码: {},ACK:{}, 服务随机端口: {}, 数据长度:{}"
                  .format(option_code, ack_num, random_port, len(recv_data[0][4:])))
            f.write(recv_data[0][4:])  # 将数据写入
            if len(recv_data[0]) < 516:
                break
            ack_data = struct.pack("!HH", 4, ack_num)
            client.sendto(ack_data, (server_IP, random_port))

struct 库

struct模块可以按照指定格式将Python数据转换为字符串,该字符串为字节流

struct模块中最重要的三个函数是pack(), unpack(), calcsize()
# 按照给定的格式(fmt),把数据封装成字符串(实际上是类似于c结构体的字节流)pack(fmt, v1, v2, ...) 
# 按照给定的格式(fmt)解析字节流string,返回解析出来的元组unpack(fmt, string) 
# 计算给定的格式(fmt)占用多少字节的内存 calcsize(fmt)

struct.pack("!HH",4,p_num)
cmdTuple = struct.unpack("!HH", recvData[:4])

广播

from socket import socket, AF_INET, SOCK_DGRAM, SOL_SOCKET, SO_BROADCAST

if __name__ == '__main__':
    dest = ("<broadcast>", 8080)

    bd = socket(AF_INET, SOCK_DGRAM)
    bd.setsockopt(SOL_SOCKET, SO_BROADCAST, 1)  # 设置 socket 为广播模式,默认为关闭

    bd.sendto(b'Hello Everyone!', dest)  # 发送广播消息

    while True:
        recv_data = bd.recv(1024)  #接收消息
        print(recv_data.decode())

网络数据通信

集线器

每次发送数据,会广播的形式发送局域网中其他计算机

当数据到达指定的计算机接收后,会再次以广播的形式回复数据。

交换器

交换器有当前局域网中计算机的 MAC 地址表

第一次发送数据,会广播的形式发送局域网中其他计算机。当数据到达指定的计算机接收后,回复到发送数据的计算机上(即点到点通信)

路由器

连接两个不同网段的计算机

TCP 通信

在通信之前,必须先等待建立连接

三次握手

所谓三次握手(Three-way Handshake),是指建立一个 TCP 连接时,需要客户端和服务器总共发送3个包。

三次握手的目的是连接服务器指定端口,建立 TCP 连接,并同步连接双方的序列号和确认号,交换 TCP 窗口大小信息。在 socket 编程中,客户端执行 connect() 时。将触发三次握手。

四次挥手

TCP 的连接的拆除需要发送四个包,因此称为四次挥手(Four-way handshake),也叫做改进的三次握手。客户端或服务器均可主动发起挥手动作,在 socket 编程中,任何一方执行 close() 操作即可产生挥手操作。

四次挥手的示意图如下:

TCP 服务端

from socket import socket, AF_INET, SOCK_STREAM

if __name__ == '__main__':
    server = socket(AF_INET, SOCK_STREAM)
    server.bind(('', 9999))
    server.listen(5)  # 最大连接数
    print("服务器启动 ...")

    while True:
        newSock, client_ip = server.accept()
        print("建立连接 ...")

        # 接收数据
        while True:
            data = newSock.recv(1024)
            if data:
                print("来自客户端的数据:", data.decode())
                if len(data) < 1024:
                    break
            else:
                break
        # 发送数据
        newSock.send(b'Hello')
        print("与客户端连接关闭 ...")
        newSock.close()

    server.close()

TCP 客户端

from socket import socket, AF_INET, SOCK_STREAM
from multiprocessing import Process, Pool
from time import sleep


def simple_client():
    """ TCP客户端 """
    client = socket(AF_INET, SOCK_STREAM)
    print("与服务器连接 ...")
    addr = ('127.0.0.1', 9999)
    client.connect(addr)
    client.send("Hello Server".encode())
    print("------接收数据中------")
    data = client.recv(2048)
    print("接收数据: {}".format(data.decode()))
    print("与服务断开连接 ...")
    client.close()


def deal_with_client(name):
    """ TCP客户端 """
    client = socket(AF_INET, SOCK_STREAM)
    print("与服务器连接 ...")
    addr = ('127.0.0.1', 9999)
    client.connect(addr)
    client.send("Hello server {}".format(name).encode())
    while True:
        print("------接收数据中------")
        data = client.recv(2048)
        if len(data) > 0:
            print("接收到的数据:{}".format(data.decode()))
            if len(data) < 1024:
                break
        else:
            break
    print("与服务断开连接 ...")
    client.close()


def run(name):
    while True:
        deal_with_client(name)
        # simple_client()
        sleep(2)  # 休眠 3 秒


def usePool():
    pool = Pool(3)

    for i in range(3):
        pool.apply_async(run, args=(i,))

    pool.close()  # 关闭进程池,拒绝新的子进程
    pool.join()  # 阻塞主进程


if __name__ == '__main__':
    # simple_client()
    usePool()

并发服务器

多进程版

from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from multiprocessing import Process

"""
    并发服务器之多进程
"""


def deal_request(newSock, client_ip):
    """ 处理客户端请求 """
    # 接收数据
    while True:
        data = newSock.recv(1024)
        if len(data) > 0:
            print("来自客户端的数据:", data.decode())
            if len(data) < 1024:
                break
        else:
            break
    # print("来自客户端的数据:", newSock.recv(1024).decode())
    # 发送数据
    newSock.send(b'Hello Client')
    print("与客户端连接关闭 ...")
    newSock.close()


def request(server):
    """ 接收请求 """
    try:
        while True:
            newSock, client_ip = server.accept()
            print("建立连接 ...")
            client = Process(target=deal_request, args=(newSock, client_ip))
            client.start()
            # 开启子进程会单独拷贝一份 newSock 对象资源,所主进程中的 newSock 可以直接关闭
            newSock.close()
    finally:
        server.close()


def main():
    server = socket(AF_INET, SOCK_STREAM)
    # 设置套接字,让同时客户端连接服务器端口
    server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    server.bind(('', 9999))
    server.listen(5)  # 最大连接数
    print("服务器启动 ...")
    request(server)


if __name__ == '__main__':
    main()

多线程版

from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
import threading

"""
    并发服务器之多线程
"""


def deal_request(newSock, client_ip):
    """ 处理客户端请求 """
    # 接收数据
    while True:
        data = newSock.recv(1024)
        if len(data) > 0:
            print("来自客户端的数据:", data.decode())
            if len(data) < 1024:
                break
        else:
            break
    print("来自客户端的数据:", newSock.recv(1024).decode())
    # 发送数据
    newSock.send('Hello Client'.encode())
    print("与客户端连接关闭 ...")
    newSock.close()


def main():
    server = socket(AF_INET, SOCK_STREAM)
    # 设置套接字,让同时客户端连接服务器端口
    server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    server.bind(('', 9999))
    server.listen(5)  # 最大连接数
    print("服务器启动 ...")
    try:
        while True:
            newSock, client_ip = server.accept()
            print("建立连接 ...")
            client = threading.Thread(target=deal_request, args=(newSock, client_ip))
            client.start()
    finally:
        server.close()

if __name__ == '__main__':
    main()

INET, SOCK_STREAM)
# 设置套接字,让同时客户端连接服务器端口
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind((’’, 9999))
server.listen(5) # 最大连接数
print(“服务器启动 …”)
request(server)

if name == ‘main’:
main()


#### 多线程版 ####

```python
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
import threading

"""
    并发服务器之多线程
"""


def deal_request(newSock, client_ip):
    """ 处理客户端请求 """
    # 接收数据
    while True:
        data = newSock.recv(1024)
        if len(data) > 0:
            print("来自客户端的数据:", data.decode())
            if len(data) < 1024:
                break
        else:
            break
    print("来自客户端的数据:", newSock.recv(1024).decode())
    # 发送数据
    newSock.send('Hello Client'.encode())
    print("与客户端连接关闭 ...")
    newSock.close()


def main():
    server = socket(AF_INET, SOCK_STREAM)
    # 设置套接字,让同时客户端连接服务器端口
    server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    server.bind(('', 9999))
    server.listen(5)  # 最大连接数
    print("服务器启动 ...")
    try:
        while True:
            newSock, client_ip = server.accept()
            print("建立连接 ...")
            client = threading.Thread(target=deal_request, args=(newSock, client_ip))
            client.start()
    finally:
        server.close()

if __name__ == '__main__':
    main()

标签:__,name,Python,threading,num,线程,print,20190918
来源: https://blog.csdn.net/Sunshine_wz/article/details/101095392