编程语言
首页 > 编程语言> > 线程和协程详解-python

线程和协程详解-python

作者:互联网

1、前言

关于基本概念部分这里不再详述,可以参考之前的文章或者自行查阅相关文章。

由于python中线程的创建、属性和方法和进程很相似,这里也不再讲解。

这里重点讲解下多线程访问共享数据的相关问题。

2、多线程数据完全与解决

先看下示例:预测并执行看下结果是否和预测一致

from threading import Thread

sum = 0


def minus():
    global sum
    for i in range(1000000):
        sum += 1


def plus():
    global sum
    for i in range(1000000):
        sum -= 1


if __name__ == '__main__':
    t1 = Thread(target=minus)
    t2 = Thread(target=plus)

    t1.start()
    t2.start()

    t1.join()
    t2.join()

    print('sum=',sum)

如何解决多线程下共享数据的安全问题呢?

2.1、加锁

2.2、信号量:semaphore

import time
import threading

semaphore = threading.BoundedSemaphore(3)


def task():
    semaphore.acquire()
    print('{}---running-{}'.format(threading.current_thread().name,
                                   time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))))
    time.sleep(1)
    semaphore.release()


if __name__ == '__main__':
    l = []
    for i in range(10):
        t = threading.Thread(target=task, name='task' + str(i))
        t.start()
        l.append(t)

    for i in l:
        i.join()

2.3、Events:事件驱动

import threading, time, random

events = threading.Event()


def lighter():
    if not events.isSet():
        events.set()  # 初始化绿灯Event set
    counter = 0
    while True:
        if counter < 5:
            print('\033[42;0mGreen is lighten...\033[0m')
        elif counter < 10:
            if events.isSet():
                events.clear()
            print('\033[41;0mRed is lighten...\033[0m')

        else:
            counter = 0
            print('\033[42;1m--green light on---\033[0m')
            events.set()
        time.sleep(1)
        counter += 1


def car(i):
    while True:
        if events.isSet():
            print("car[%s] is running..." % i)
            time.sleep(random.randrange(10))
        else:
            print('car is waiting green lighten...')
            events.wait()


if __name__ == '__main__':
    lighter1 = threading.Thread(target=lighter)
    lighter1.start()
    for i in range(3):
        t = threading.Thread(target=car, args=(i,))
        t.start()

2.4、死锁

所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程

在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源时,就会造成死锁。
尽管死锁很少发生,但一旦发生就会造成应用的停止响应。
示例代码:

import time
from threading import Thread, Lock

la = Lock()
lb = Lock()


def task1():
    if la.acquire():
        print('task1获取锁A')
        time.sleep(1)
        if lb.acquire():
            print('task1获取锁B')
            lb.release()
            print('task1释放锁B')
        la.release()
        print('task1释放锁A')


def task2():
    if lb.acquire():
        print('task2获取锁B')
        time.sleep(1)
        if la.acquire():
            print('task2获取锁A')
            la.release()
            print('task2释放锁A')
        lb.release()
        print('task2释放锁B')


if __name__ == '__main__':
    t1 = Thread(target=task1)
    t2 = Thread(target=task2)

    t1.start()
    t2.start()

    t1.join()
    t2.join()

3、线程间通信

线程间通信也是通过Queue来完成,这里不在讲解。

示例代码:

import queue
import random
import time
from threading import Thread


def produce(q):
    i = 0
    while i < 10:
        num = random.randint(1, 100)
        q.put(num)
        print('生产者生产数据:%d' % num)
        time.sleep(0.5)
        i += 1
    q.put(None)
    # 任务结束
    q.task_done()


def consume(q, n):
    while True:
        item = q.get()
        if item is None:
            break
        print('%s获取:%d' % (n, item))
        time.sleep(1)
    # 任务结束
    q.put(None)
    q.task_done()


if __name__ == '__main__':
    q = queue.Queue(10)

    # 创建生产者
    tp = Thread(target=produce, args=(q,))
    tp.start()

    # 创建消费者
    tc1 = Thread(target=consume, args=(q, '消费者1'))
    tc2 = Thread(target=consume, args=(q, '消费者2'))
    tc1.start()
    tc2.start()

    tp.join()
    tc1.join()
    tc2.join()

3、协程

3.1、简介

协程,又称微线程,纤程,是一种用户态的轻量级线程。

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。

因此:协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态(进入上一次离开时所处逻辑流的位置)

协程与线程类似,每个协程表示一个执行单元,既有自己的本地数据,也与其他协程共享全局数据和其他资源。

协程存在于线程中,需要用户来编写调度逻辑,对CPU而言,不需要考虑协程如何调度,切换上下文。

"原子操作(atomic operation)是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何上下文切换 (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。

3.2、greenlet

greenlet是一个用C实现的协程模块,相比与python自带的yield,它可以使你在任意函数之间随意切换,而不需把这个函数先声明为generator。

greenlet的工作流程:进行访问网络的IO操作时,出现阻塞,greenlet就显式切换到另一段没有被阻塞的代码执行,直到原来的阻塞状况消失以后,再切换回原来代码段继续处理。因此,greenlet是一种合理安排的串行方法。

greenlet.switch()可实现协程的切换,greenlet并不能实现自动切换。

示例:模拟2个耗时任务

import time

from greenlet import greenlet


def task1():
    for i in range(5):
        print('A ' + str(i))
        g2.switch()
        # 模拟耗时操作
        time.sleep(0.1)


def task2():
    for i in range(5):
        print('B ' + str(i))
        g1.switch()
        # 模拟耗时操作
        time.sleep(0.1)


if __name__ == '__main__':
    g1 = greenlet(task1)
    g2 = greenlet(task2)

    g1.switch()

3.3、gevent和猴子补丁

gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程。gevent是对greenlet进行封装,实现协程的自动切换。

通过gevent.sleep模仿IO操作,实现协程的切换。

示例:

import time
import gevent


def task1():
    for i in range(5):
        print('A ' + str(i))
        # 模拟耗时操作
        time.sleep(0.1)


def task2():
    for i in range(5):
        print('B ' + str(i))
        # 模拟耗时操作
        time.sleep(0.1)


if __name__ == '__main__':
    g1 = gevent.spawn(task1)
    g2 = gevent.spawn(task2)

    g1.join()
    g2.join()

协程切换是在IO操作时自动完成,在启动时通过monkey.patch_all()实现将一些常见的阻塞,如socket,select,urllib等地方实现协程跳转,因为gevent并不能完全识别所有当前操作是否为IO操作,而未切换。

示例:

import time
import gevent
from gevent import monkey
monkey.patch_all()


def task1():
    for i in range(5):
        print('A ' + str(i))
        # 模拟耗时操作
        time.sleep(0.1)


def task2():
    for i in range(5):
        print('B ' + str(i))
        # 模拟耗时操作
        time.sleep(0.1)


if __name__ == '__main__':
    g1 = gevent.spawn(task1)
    g2 = gevent.spawn(task2)

    g1.join()
    g2.join()

3.4、简单应用

示例:模拟爬取3个网页

from gevent import monkey

monkey.patch_all()
import requests
import gevent


def download(url):
    resp = requests.get(url)
    print('下载了{}的数据, 长度:{}'.format(url, len(resp.text)))


if __name__ == '__main__':
    urls = ['https://www.baidu.com', 'https://www.163.com', 'https://www.qq.com']

    arr = []

    for url in urls:
        g = gevent.spawn(download, url)
        arr.append(g)

    gevent.joinall(arr)

gevent还提供对池的支持,当拥有动态数量的greenlet需要进行并发管理(限制并发数)时,就可以使用池,在处理大量的网络或IO操作时非常重要。

示例:注意python版本

from gevent import monkey
monkey.patch_all()
from gevent.pool import Pool
import requests

def run_task(url):
    print('Visit --> %s'% url)
    try:
        res = requests.get(url)
        data = res.text
        print('%s bytes received from %s' %(len(data),url))
    except Exception as value:
        print(value)
    return 'url:%s --> finished' % url

if __name__ == '__main__':
    urls = ['https://www.baidu.com/','https://github.com/','https://www.python.org/']
    pool = Pool(2)
    result = pool.map(run_task, urls)
    print(result)

参考地址

参考视频https://www.bilibili.com/video/BV1R7411F7JV p274~281

源代码仓库地址:https://gitee.com/gaogzhen/python-study

QQ群:433529853

标签:__,协程,python,gevent,线程,time,print,import
来源: https://blog.csdn.net/gaogzhen/article/details/122500375