Python多线程编程技术总结

蚂蚁学Python

共 4251字,需浏览 9分钟

 ·

2021-01-28 21:04

学习并发编程

  • 加速程序的运行
  • 高薪程序员必备能力

程序运行的5种并发粒度

  • 单线程
  • 单线程多协程
  • 多线程
  • 多进程
  • 多机器

怎样选择并发技术

  • 如果单机无法搞定

    • 大数据计算
  • IO密集型

    • CPU经常在等待IO
    • 比如网络爬虫
    • 选择1:多协程 coroutine
    • 选择2:多线程 threading
  • CPU密集型

    • 计算密集型,CPU计算为主
    • 比如加密解密
    • 使用多进程multithreading

线程池和进程池

  • 原理:提前创建好线程/进程放在池子里,新的task到来可以重用这些资源,减少了新建、终止线程/进程的开销

  • 池化的好处

    • 提升性能:因为减去了大量新建、终止线程的开销,重用了线程资源
    • 适用场景:适合处理突发性大量请求或需要大量线程完成任务、但实际任务处理时间较短
    • 防御功能:能有效避免系统因为创建线程过多,而导致系统负荷过大响应变慢等问题
    • 代码简洁:使用线程池的语法比自己新建线程执行线程更加简洁

全局解释器锁GIL

任何时刻仅有一个线程在执行。

在多核心处理器上,使用 GIL 的解释器也只允许同一时间执行一个线程

GIL目的:为了解决多线程之间数据完整性和状态同步问题

GIL带来的问题

  • 即使使用了多线程,同一时刻也只有单个线程使用CPU,导致多核CPU的浪费
  • GIL只会对CPU密集型的程序产生影响
  • 如果程序主要是在做I/O操作,比如处理网络连接,那么多线程技术常常是一个明智的选择

规避GIL的方法

  • 规避方法2:使用multiprocessing多进程,对CPU密集型计算,单独启动子进程解释器去执行
  • 规避方法2:将计算密集型的任务转移到C语言中,因为C语言比Python快得多,注意要在C语言中自己释放GIL

多线程编程

应用于IO密集型计算,比如几乎所有的网络后台服务、网络爬虫

引入模块

from threading import Thread

新建、启动、等待结束

    t=Thread(target=func, args=(100, ))
    t.start()
    t.join()

数据通信

import queue
q = queue.Queue()
q.put(item)
item = q.get()

线程安全加锁

from threading import Lock
lock = Lock()
with lock:
    # do something

信号量限制并发

from threading import Semaphore
semaphre = Semaphore(10)
with semaphre:
    # do something

使用线程池

from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor() as executor:
    # 方法1
    results = executor.map(func, [1,2,3])

    # 方法2
    future = executor.submit(func, 1)
    result = future.result()

多进程编程

应用于CPU密集型计算,只有发现多线程编程有性能问题时,才求助于该模块

引入模块

from multiprocessing import Process

新建、启动、等待结束

p = Process(target=f, args=('bob',))
p.start()
p.join()

数据通信

from multiprocessing import Queue
q = Queue()
q.put([42None'hello'])
item = q.get()

线程安全加锁

from multiprocessing import Lock
lock = Lock()
with lock:
    # do something

信号量限制并发

from multiprocessing import Semaphore
semaphore = Semaphore(10)
with semaphore:
    # do something

进程池

from concurrent.futures import ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
    # 方法1
    results = executor.map(func, [1,2,3])

    # 方法2
    future = executor.submit(func, 1)
    result = future.result()

多协程编程

异步编程的威力

  • Nginx作为 Web 服务器:打败了 同步阻塞服务器 Apache, 使用更少的资源支持更多的并发连接,体现更高的效率,能够支持高达 50,000 个并发连接数的响应,使用 epoll and kqueue 作为开发模型
  • Redis为什么这么快:处理网络请求采用单线程+使用多路I/O复用模型,非阻塞IO ,避免了不必要的上下文切换和竞争条件,也不存在多进程或者多线程导致的切换而消耗 CPU,不用去考虑各种锁的问题,不存在加锁释放锁操作,没有因为可能出现死锁而导致的性能消耗;
  • Node.js的优势:采用事件驱动、异步编程,为网络服务而设计。其实Javascript的匿名函数和闭包特性非常适合事件驱动、异步编程 Node.js非阻塞模式的IO处理带来在相对低系统资源耗用下的高性能与出众的负载能力,非常适合用作依赖其它IO资源的中间层服务
  • Go语言的一个优势:Go 使用Goroutine 和 channel为生成协程和使用信道提供了轻量级的语法,使得编写高并发的服务端软件变得相当容易,很多情况下完全不需要考虑锁机制以及由此带来的各种问题,相比Python单个 Go 应用也能有效的利用多个 CPU 核,并行执行的性能好

异步编程的原理

  • 核心原理1:超级循环

    • 在单线程内实现并发
    • 用一个超级循环(其实就是while true)循环,里面每次轮询处理所有的task
    • 记忆口诀:《the one loop》至尊循环驭众生、至尊循环寻众生、至尊循环引众生、普照众生欣欣荣
  • 核心原理2:IO多路复用

    • select

    • poll

    • epool

    • 数据结构:bitmap
    • 最大连接数:1024
    • fd拷贝:每次调用select拷贝
    • 工作效率:轮询O(N)
    • 数据结构:数组
    • 最大连接数:无上限
    • fd拷贝:每次调用poll拷贝
    • 工作效率:轮询O(N)
    • 数据结构:红黑树
    • 最大连接数:无上限
    • fd拷贝:fd首次调用epool_ctl拷贝,每次调用epoll_wait不拷贝
    • 工作效率:回调O(1)
    • 是一种同步IO模型,实现一个线程可以监视多个文件句柄;
    • 一旦某个文件句柄就绪,就能够通知应用程序进行相应的读写操作;
    • 没有文件句柄就绪时会阻塞应用程序,交出cpu
    • 多路是指网络连接,复用指的是同一个线程
    • 原理

    • 3种实现方式

Python官方异步库:asyncio

代码例子

 import asyncio

    # 获取事件循环
    loop = asyncio.get_event_loop()

    # 定义协程
    async def myfunc(url):
        await get_url(url)

    # 创建task列表
    tasks = [loop.create_task(myfunc(url)) for url in urls]

    # 执行爬虫事件列表
    loop.run_until_complete(asyncio.wait(tasks))
  • 优点:

    • 官方库支持
    • 明确使用asyncio、await关键字编程,直观易读
  • 缺点:

    • 很多库不支持,比如requests

Python第三方异步库:Gevent

代码例子

 import gevent.monkey

    gevent.monkey.patch_all()

    import gevent
    import blog_spider
    import time

    begin = time.time()
    for url in blog_spider.urls:
        blog_spider.craw(url)
    end = time.time()
    print("single thread, cost = ", end - begin)

    begin = time.time()
    tasks = [gevent.spawn(blog_spider.craw, url) for url in blog_spider.urls]
    gevent.joinall(tasks)
    end = time.time()
    print("gevent, cost = ", end - begin)
  • 原理

    • 提供猴子补丁MonkeyPatch方法,通过该方法gevent能够 修改标准库里面大部分的阻塞式系统调用,包括socket、ssl、threading和 select等模块,而变为协作式运行
  • 优点

    • 只需要monkey.patch_all(),就能自动修改阻塞为非阻塞
    • 提供了pywsgi异步服务器可以封装flask
  • 缺点

    • 不知道它具体patch了哪些库修改了哪些模块、类、函数
    • 创造了“隐式的副作用”,如果出现问题很多时候极难调试

Gevent改造Flask为异步服务

代码例子

from gevent import monkey

monkey.patch_all()

from flask import Flask
from gevent import pywsgi

app = Flask(__name__)


@app.route("/")
def index():
    return "success"


if __name__ == "__main__":
    # app.run()
    server = pywsgi.WSGIServer(("0.0.0.0"8888), app)
    server.serve_forever()



浏览 73
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报