其他分享
首页 > 其他分享> > 温故而知新--day4

温故而知新--day4

作者:互联网

温故而知新--day4

进程与线程

进程本质上就是一段程序的运行过程,由程序、数据集、进程控制块组成。每个进程都有自己的地址空间、数据栈以及其他用于跟踪进程执行的辅助数据。操作系统管理所有的进程,并为他们合理分配资源。
线程是进程中的执行单元,可以共享进程中的资源。
进程之间是相互独立的,所以进程是最小的资源单位。

关于并行和并发

并行:系统能同时处理多个任务
并发:系统可以处理多个任务

线程

简单使用

import threading
import os


def work(num1, num2, name, **kwargs):
    print(num1, num2)   # 12 123
    print(name)         # lczmx
    print(kwargs)       # {'age': 20}
    print("pid:", os.getpid())  # pid: 12932


if __name__ == "__main__":

    t1 = threading.Thread(target=work, args=(12, 123),
                          kwargs={"name": "lczmx", "age": 20})
    t2 = threading.Thread(target=work, args=(1, 3),
                          kwargs={"name": "xxx", "age": 20})

    t1.start()	# 开始线程活动
    t1.join()	# 等待,直到线程终结

    t2.start()
    t2.join()
    print("pid:", os.getpid())  # pid: 12932

定义一个类,继承threading.Thread、重写run方法也可以
重写__init__方法的话要super.__init__()

import threading


class MyThread(threading.Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        pass

    def func2(self):
        pass


if __name__ == '__main__':
    t = MyThread()
    t.start()
    t.join()

Threading对象的方法

is_alive(): 返回线程是否活动的。
getName(): 返回线程名,也可以在创建时通过name参数指定。
setName(): 设置线程名。

守护线程

默认情况下,主线程会等到所有子线程执行完之后才会退出,但守护线程并不会。
线程
守护线程就是跟随主线程一起结束的线程,守护线程通过setDaemon方法实现,其内部时设置daemon属性,可以被继承,所以daemon默认为False。

import threading
import time


def work(sleep_time=0.5):
    time.sleep(sleep_time)
    print("sleep time: ", sleep_time)


if __name__ == "__main__":

    t1 = threading.Thread(target=work, args=(1,))
    t1.setDaemon(True)      # setDaemon要在start之前
    t1.start()
    print("exit")

注意以下例子

import threading
import time


def work(sleep_time=0.5):
    time.sleep(sleep_time)
    print("sleep time: ", sleep_time)


if __name__ == "__main__":

    t1 = threading.Thread(target=work, args=(1,))       # 1秒
    t2 = threading.Thread(target=work, args=(3,))       # 3秒
    t1.setDaemon(True)
    t2.setDaemon(True)

    t1.start()
    t2.start()
    time.sleep(2)                                       # 2秒

    print(t1.is_alive())    # False
    print(t2.is_alive())    # True

锁主要时用来解决在cpu切换时造程序取得的数据不同步的问题。
比如这个例子:

from threading import Thread
import os
import time


def work():
    global n
    temp = n
    time.sleep(0.01)
    n = temp - 1


if __name__ == '__main__':
    n = 100
    l = []
    for i in range(100):
        p = Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n)  # 结果可能为99或98,但几乎不为零

为了解决这个问题,我们可以使用串行的方式让所有的代码按照顺序执行,但是这就失去了多线程的意义。那么只要串行部分代码就既能享受多线程的优势,又可以保证数据的安全了。也就是说,锁做的工作就是使操作数据的那部分代码串行。

互斥锁

使用threading.Lock获取一把锁,它由一个acquire()release()方法控制锁定和释放。

from threading import Thread, Lock
import os
import time

lock = Lock()


def work():
    global n
    lock.acquire()
    temp = n
    time.sleep(0.01)
    n = temp - 1
    lock.release()


if __name__ == '__main__':
    n = 100
    l = []
    for i in range(100):
        p = Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n)  # 现在结果为0

死锁

上面说过,锁就是把部分代码变为串行,只有当锁被释放后才能执行后面的代码。死锁的一个原因是互斥,还有可能是粗心大意,忘记release()了。

from threading import Thread, Lock
import time

lockA = Lock()
lockB = Lock()


class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        lockA.acquire()
        print("%s获得锁A" % self.name)

        lockB.acquire()
        print("%s获得锁B" % self.name)
        lockB.release()
        print("%s释放锁B" % self.name)
        lockA.release()
        print("%s释放锁A" % self.name)

    def func2(self):
        lockB.acquire()
        print("%s获得锁B" % self.name)
        time.sleep(2)

        lockA.acquire()
        print("%s获得锁A" % self.name)
        lockA.release()
        print("%s释放锁A" % self.name)

        lockB.release()
        print("%s释放锁B" % self.name)


if __name__ == '__main__':
    for i in range(10):
        t = MyThread(name="线程%d" % i)
        t.start()
        """
        线程0获得锁A
        线程0获得锁B
        线程0释放锁B
        线程0释放锁A
        线程0获得锁B
        线程1获得锁A

        卡死了
        """

解决死锁的好方式就是用递归锁,而使用一般的锁的话可以用with关键词,以防忘记释放锁了。

递归锁

递归锁也是锁,其内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:

from threading import Thread, RLock
import time

lockA = lockB = RLock()


class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        lockA.acquire()
        print("%s获得锁A" % self.name)

        lockB.acquire()
        print("%s获得锁B" % self.name)
        lockB.release()
        print("%s释放锁B" % self.name)
        lockA.release()
        print("%s释放锁A" % self.name)

    def func2(self):
        lockB.acquire()
        print("%s获得锁B" % self.name)
        time.sleep(2)

        lockA.acquire()
        print("%s获得锁A" % self.name)
        lockA.release()
        print("%s释放锁A" % self.name)

        lockB.release()
        print("%s释放锁B" % self.name)


if __name__ == '__main__':
    for i in range(10):
        t = MyThread(name="线程%d" % i)
        t.start()

semaphore 信号量

信号量就是一把锁,之前我们说的threading.Lock是互斥锁(Mutual exclusion,缩写 Mutex)实质上就是信号量为一的情景。信号量可以用来限定某些资源可以同时由几个线程访问,访问时同样要acquire,出来时同样也release。

import threading
import time

sm = threading.Semaphore(5)


def foo():
    sm.acquire()
    # 打印当前线程的名字
    print("%s ..." % threading.current_thread().getName())
    time.sleep(1)
    sm.release()


if __name__ == "__main__":
    for i in range(9):
        t = threading.Thread(target=foo)
        t.start()

GIL

关于GIL(global interpreter lock),点击这里

线程间通信

  1. event 同步条件
    由于线程之间是相互独立的,彼此不能直接确认状态,为此python提供了threading.Event对象,可以在不同线程间传递状态,其由一下方法:

    • .wait(timeout=None),event变为True,timeout为None时,为阻塞;反之则为等待秒数(非阻塞)

    • .set(),设置event的值为True

    • .clear(),恢复event的状态值为False。

    • .is_set,返回event状态值

      根据上述,可以把Event的情况分为以下几种:
      event

    import threading
    import time
    e = threading.Event()
    
    
    def foo():
    	print("event状态:", e.is_set())
    	print("等待。。。。")
    	if e.wait():    # 默认阻塞
    		print("event状态:", e.is_set())
    		print("收到同步条件,ok")
    
    
    def bar():
    	time.sleep(2)
    	e.set()
    
    
    if __name__ == '__main__':
    	f = threading.Thread(target=foo)
    	b = threading.Thread(target=bar)
    
    	f.start()
    	b.start()
    
  2. queue 线程队列
    线程队列特别适用于消息必须安全地在多线程间交换的线程编程,线程队列有三种类型,在实例化的时候根据需求指定:

    • 先进先出(FILO):queue.Queue(maxsize=0)
    • 后进先出(LIFO):queue.LifoQueue(maxsize=0)
    • 按优先级,使用heapq(堆队列算法),确定优先级:queue.PriorityQueue(maxsize=0)

    注:maxsize参数指定队列的大小,当maxsize <= 0 时,队列的元素个数没有限制。

    这三者都返回queue.Queue对象的方法,因为LifoQueuePriorityQueue都继承queue.Queue,Queue对象拥有以下方法:

    方法 说明
    .put(item, block=True, timeout=None) 将 item 放入队列,block默认为True,表示阻塞。优先级队列的item要包含优先级如:q.put([2, "abc"])
    .get(block=True, timeout=None) 从队列中移除并返回一个项目。block默认为True,表示阻塞
    .qsize() 返回队列的大致大小
    .empty() 如果队列为空,返回 True ,否则返回 False
    .full() 如果队列是满的返回 True ,否则返回 False 。
    .task_done() 完成一个任务后,向队列发信号(join()用到)。
    .join() 阻塞到 队列中所有的元素 都 被 接 收 和 处 理 完毕(根据收到的task_done信号确定)。

    关于task_done与join:
    Queue内部有一个unfinished_tasks属性(默认为0),put时自增1,task_done调用时自减1
    join的逻辑是while self.unfinished_tasks: self.all_tasks_done.wait(),当unfinished_tasks为0的时候就跳出循环,停止阻塞状态。

    import threading
    import queue
    import time
    
    q = queue.Queue(5)      # 只存5个元素
    
    
    def worker():
    	while True:
    		print("qsize: ", q.qsize())
    		item = q.get()          # 队列为空时会阻塞
    
    		print(f'Working on {item}')
    		time.sleep(0.5)         # 模拟处理数据的时间
    		print(f'Finished {item}')
    		q.task_done()           # 已经执行
    
    
    # 开启为worker线程,处理队列,设置为守护线程
    threading.Thread(target=worker, daemon=True).start()
    
    
    print("队列是否为空:", q.empty())
    
    # 往队列中添加元素
    for item in range(10):
    	if q.full():
    		print("已经满了,阻塞。。。。")
    	q.put(item)     # 队列满的时候会阻塞
    
    print('全部元素已经放入队列中')
    
    # 会一直阻塞,知道unfinished_tasks为0
    q.join()
    print('全部任务已完成')
    

    PriorityQueue有点特殊,单独举例:

    import queue
    
    
    q = queue.PriorityQueue()
    
    q.put([3, "c"])
    q.put([1, "a"])
    q.put([2, "b"])
    
    print(q.get())  # [1, 'a']
    print(q.get())  # [2, 'b']
    print(q.get())  # [3, 'c']
    

进程

python中使用multiprocessing模块来实现多进程。

简单使用

import multiprocessing


def work(num, name, age):
    print(f"num: {num}, name: {name}, age: {age}")


if __name__ == '__main__':      # 不要省略了这个,否则报错
    p = multiprocessing.Process(target=work, args=(
        1, ), kwargs={"name": "lczmx", "age": 22})

    p.start()
    p.join()

方法二

要重写__init__方法的话要super.__init__()

import multiprocessing


class MyProcess(multiprocessing.Process):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        print("func1")

    def func2(self):
        print("func2")


if __name__ == "__main__":
    p = MyProcess()
    p.start()

一些常用方法

主进程创建守护进程
其一:守护进程会在主进程代码执行结束后就终止
其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

进程间通信

使用多进程时,一般使用消息机制实现进程间通信,尽可能避免使用锁等同步原语。

  1. 进程队列 multiprocessing.Queue(maxsize=0)
    进程队列的方法与queue.Queue的方法很像,常用的方法中就没有task_donejoin(multiprocessing.JoinableQueue(maxsize=0)有这两个方法,但要必须要手动调用task_done,否则用于统计未完成任务的信号量最终会溢出并抛出异常)
    from multiprocessing import Process, Queue
    
    
    def f(q):
    	q.put([42, None, 'hello'])
    
    
    if __name__ == '__main__':
    	q = Queue()
    	p = Process(target=f, args=(q,))
    	p.start()
    	print(q.get())    # [42, None, 'hello']
    	p.join()
    
  2. 管道 multiprocessing.Pipe
    conn1, conn2 = multiprocessing.Pipe([duplex])conn1和conn2是一对 Connection 对象, 分别表示管道的两端。
    如果 duplex 被置为 True (默认值),那么该管道是双向的。如果 duplex 被置为 False ,那么该管道是单向的,即 conn1 只能用于接收消息,而 conn2 仅能用于发送消息。
    Connection对象的常用方法:
    • send(obj)
      将一个对象发送到连接的另一端,可以用 recv() 读取。
      发送的对象必须是可以序列化的,过大的对象 ( 接近 32MiB+ ,这个值取决于操作系统 ) 有可能引发 ValueError 异常。
    • recv()
      返回一个由另一端使用 send() 发送的对象。该方法会一直阻塞直到接收到对象。 如果对端关闭了连接或者没有东西可接收,将抛出 EOFError 异常。
    • fileno()
      返回由连接对象使用的描述符或者句柄。
    • close()
      关闭连接对象。当连接对象被垃圾回收时会自动调用。
      更多方法详见文档
    from multiprocessing import Process, Pipe
    
    
    def f(conn):
    	print(conn.recv())  # [1, '12', True]
    
    
    if __name__ == '__main__':
    	conn1, conn2 = Pipe()   # 默认为双向
    
    	p = Process(target=f, args=(conn2,))
    	p.start()
    	conn1.send([1, "12", True])
    	p.join()

线程池和进程池

线程和进程的创建、切换、关闭都需要一定的成本,对于某些重复次数多且声明周期短的任务可以使用线/进程池,线/进程池的数量并不是越多越好,太多可能得不偿失,甚至导致python解释器崩溃。
使用线程池要用到concurrent.futures.ThreadPoolExecutor
使用进程池要用到concurrent.futures.ProcessPoolExecutor
线程池和进程池都提供了以下常用方法:

Future对象是submit方法的返回值,其本身也有一些实用的方法:

生产者、消费者模型

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题,通过一个容器来解决生产者和消费者的强耦合问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。在一般的高并发程序中通常就会有这样的场景出现:生产多快,处理不过来;生产太慢,等半天没得处理。所以要引入生产者消费者模型:

生产者与消费者模型
生产者与消费者模型
多线程实现:

import threading
import queue
import time
import random


class Consumer(threading.Thread):

    """
    消费者
    """

    def __init__(self, q, lock, name):
        super().__init__()
        self.q = q          # 阻塞消息队列
        self.lock = lock    # 互斥锁
        self.name = "消费者-" + str(name)
        self.daemon = True  # 设置为守护线程

    def run(self):

        while True:
            item = self.q.get()  # 有则取,无则阻塞

            with self.lock:         # 使用上下文管理协议使用锁
                print(f"{self.name}: 处理{item}....")
                time.sleep(random.uniform(1, 2))     # 模拟处理时间1~2的浮点数
            self.q.task_done()      # 调用task_done()


class Producer(threading.Thread):
    """
    生产者
    """

    def __init__(self, q, count, name):
        super().__init__()
        self.q = q      # 阻塞队列
        self.count = count  # 生产几个数据
        self.name = "生产者-" + str(name)

    def run(self):
        for num in range(self.count):
            data = "data-%d" % num
            print(f"{self.name}: 生成数据 {data}")
            time.sleep(random.random())     # 模拟处理时间0~1的浮点数
            self.q.put(data)     # 添加数据,满则阻塞

        # 因为消费者是守护线程,其是否可以退出要看生产者
        self.q.join()       # 等待所有的数据都处理完了,才退出


if __name__ == '__main__':
    q = queue.Queue()
    lock = threading.Lock()

    # 使用map生成,并启动消费者
    list(map(lambda name: Consumer(q, lock, name).start(), ["甲", "乙", "丙"]))

    # 生成者列表
    producer_list = map(lambda name: Producer(q, 20, name), ["大厨", "小厨"])

    for p in producer_list:
        p.start()
    for p in producer_list:
        p.join()
        # 等结束

多进程实现:

import multiprocessing
import time
import random


class Consumer(multiprocessing.Process):

    """
    消费者
    """

    def __init__(self, q, lock, name):
        super().__init__()
        self.q = q          # 阻塞消息队列
        self.lock = lock    # 互斥锁
        self.name = "消费者-" + str(name)
        self.daemon = True  # 设置为守护进程

    def run(self):

        while True:
            item = self.q.get()  # 有则取,无则阻塞

            with self.lock:         # 使用上下文管理协议使用锁
                print(f"{self.name}: 处理{item}....")
                time.sleep(random.uniform(1, 2))     # 模拟处理时间1~2的浮点数
            self.q.task_done()      # 调用task_done()


class Producer(multiprocessing.Process):
    """
    生产者
    """

    def __init__(self, q, count, name):
        super().__init__()
        self.q = q      # 阻塞队列
        self.count = count  # 生产几个数据
        self.name = "生产者-" + str(name)

    def run(self):
        for num in range(self.count):
            data = "data-%d" % num
            print(f"{self.name}: 生成数据 {data}")
            time.sleep(random.random())     # 模拟处理时间0~1的浮点数
            self.q.put(data)     # 添加数据,满则阻塞

        # 因为消费者是守护进程,其是否可以退出要看生产者
        self.q.join()       # 等待所有的数据都处理完了,才退出


if __name__ == '__main__':
    q = multiprocessing.JoinableQueue()
    lock = multiprocessing.Lock()

    # 启动两个消费者进程

    list(map(lambda name: Consumer(q, lock, name).start(), ["甲", "乙"]))

    # 生成者只开一个进程
    p = Producer(q, 20, "大厨")
    p.start()

    p.join()
    # 等结束

协程

点击查看如何使用协程

aiohttp

aiohttp是一个基于asyncio实现对http协议支持的第三方库,点击查看如何使用aiohttp

IO

进程的执行是要靠操作系统调度的,为了保证不影响后面程序的运行,所以在执行过程中遇到阻塞或超过时间轮询时cpu会切换不同的进程执行

当我们写的程序需要数据即有IO的时候可以使用以下的四种模式来解决问题。

IO模式
IO模式

IO多路复用实现

不同平台有不同的实现IO多路复用的模块,windows下支持select仅适用于套接字;Linux下至此selectpollepoll 函数的访问,这些函数在大多数操作系统中是可用的;在 Solaris下为devpoll; BSD 上可用kqueue;在这些操作系统上,适用于套接字和其他文件类型。

水平触发和边缘触发

水平触发
对于读:只要缓冲内容不为空返回读就绪
对于写:只要缓冲区还不满返回写就绪
边缘触发
对于读:缓冲区由空变为不空 或 数据变多 等时候返回读就绪
对于写:缓冲区由满变为空 或 数据变少 等时候返回写就绪
select和poll都是使用的水平触发方式。epoll既支持水平触发也支持边缘触发,默认是水平触发。

在python中要实现IO多路复用,可以使用selectselectorsselectors是对select的进一步封装,使用selectors.DefaultSelector()可以自动选择当前平台最高效的接口。所以推荐使用selectors模块。

使用selectors模块主要要用到以下几个方法:

来自python官方文档的例子

import selectors
import socket

sel = selectors.DefaultSelector()


def accept(sock, mask):
    conn, addr = sock.accept()  # 等连接是读
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)     # 设置非阻塞
    sel.register(conn, selectors.EVENT_READ, read)


def read(conn, mask):
    data = conn.recv(1000)  # 接收消息是读
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)
    else:                   # 断开连接
        print('closing', conn)
        sel.unregister(conn)
        conn.close()


sock = socket.socket()
sock.bind(('localhost', 1234))

sock.listen(100)
sock.setblocking(False)      # 设置非阻塞
sel.register(sock, selectors.EVENT_READ, accept)

while True:
    events = sel.select()
    for key, mask in events:    # 一直阻塞,直到有数据来
        print("event 循环")
        # mask是位掩码EVENT_READ或EVENT_WRITE

        # 得到回调函数,这里是read或accept
        callback = key.data
        callback(key.fileobj, mask)

运行这段代码,并用其它终端连接:

>>> import socket
>>> sock = socket.socket()
>>> sock.connect(("localhost", 1234))
>>> sock.send(b"hello world")
11

异步非阻塞实现

自定义异步非阻塞web框架

我的github
我的博客
我的笔记

标签:温故而知新,__,name,--,day4,线程,print,import,self
来源: https://www.cnblogs.com/lczmx/p/14364667.html