跳转至

Python 线程池

线程池原理

线程的生命周期:

新建线程系统需要分配资源,终止线程系统需要回收资源,如果可以重用线程,则可以减去新建/终止开销。

使用线程池好处

  • 提升性能: 因为减去了大量新建、终止线程的开销,重用了线程资源。
  • 使用场景: 适合处理突发大量请求或需要大量线程完成任务、但实际任务处理时间很短。
  • 防御功能: 能够有效避免系统因为创建线程过多,而导致系统负荷过大相应变慢问题。
  • 代码优势: 使用线程池的语法比自己新建线程执行线程更简洁。

线程池的使用

方法一: map 函数,注意 map 的结果和入参的顺序的对应的。

方法二: future 模式,更强大,注意 as_completed 顺序是随机的。

Python
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed

import requests

data = [
    {"CategoryType": "SiteHome",
     "ParentCategoryId": 0,
     "CategoryId": 808,
     "PageIndex": i,
     "TotalPostCount": 2000,
     "ItemListActionName": "AggSitePostList"}
    for i in range(1, 101)
]
url = "https://www.cnblogs.com/AggSite/AggSitePostList"


# 定义线程需要做的任务函数
def reps(datas):
    rep = requests.post(url, json=datas)
    return f"第{datas['PageIndex']}页获取数据:{len(rep.text)}"


# 方法一
def one():
    # 10表示线程池创建10个线程
    with ThreadPoolExecutor(10) as pool:
        results = pool.map(reps, data)
        for result in results:
            print(threading.current_thread().name, result)


# 方法二
def two():
    with ThreadPoolExecutor(10) as pool:
        futures = [
            pool.submit(reps, datas)
            for datas in data
        ]
        # 两种结果的输出方式
        # 第一种按顺序会等待全部线程结束然后for循环遍历结果、
        # 第二种会实时输出结果
        # 第一种
        # for future in futures:
        #     print(threading.current_thread().name, future.result())
        # 第二种
        for future in as_completed(futures):
            print(threading.current_thread().name, future.result())


if __name__ == '__main__':
    # one()
    two()

使用线程池实现 flask web 服务加速

web 服务特点:

  • web 服务对相应时间要求非常高。
  • web 服务有大量的依赖 IO 操作的调用。
  • web 服务经常要处理很多人的同时请求。

不使用线程池:

Python
import json

import flask
import time

app = flask.Flask(__name__)


def read_file():
    time.sleep(0.1)
    return "file result"


def read_db():
    time.sleep(0.3)
    return "db result"


def read_api():
    time.sleep(0.2)
    return "api result"


@app.route("/")
def index():
    result_file = read_file()
    result_db = read_db()
    result_api = read_api()

    return json.dumps({
        "result_file": result_file,
        "result_db": result_db,
        "result_api": result_api
    })


if __name__ == '__main__':
    app.run()

使用线程池:

Python
import json

import flask
import time
from concurrent.futures import ThreadPoolExecutor

app = flask.Flask(__name__)


def read_file():
    time.sleep(0.1)
    return "file result"


def read_db():
    time.sleep(0.3)
    return "db result"


def read_api():
    time.sleep(0.2)
    return "api result"


@app.route("/")
def index():
    result_file = pool.submit(read_file)
    result_db = pool.submit(read_db)
    result_api = pool.submit(read_api)

    return json.dumps({
        "result_file": result_file.result(),
        "result_db": result_db.result(),
        "result_api": result_api.result()
    })


if __name__ == '__main__':
    pool = ThreadPoolExecutor(3)
    app.run()