跳转至

Python 多线程

多线程创建方法

Python
# 导入模块
import threading


# 创建执行任务的函数
def my_func(a, b):
    print(threading.current_thread().name)
    print(a, b)


# 多线程的主函数
def thread():
    # 创建一个多线程对象,注意target和args
    task = threading.Thread(target=my_func, args=(1, 2))
    # 启动多线程
    task.start()
    # 等待线程结束然后执行之后的代码,防止之后的代码提前执行,导致意外错误
    task.join()


if __name__ == '__main__':
    # 输出当前的线程名称
    print(threading.current_thread().name)
    thread()

利用多线程爬取网站数据

Python
import threading
import requests
import time


lock = threading.Lock()
url = [f"https://www.cnblogs.com/#p{url}" for url in range(1, 51)]


def reps(urls):
    rep = requests.get(urls)
    with lock:
        print(threading.current_thread().name, end=" ")
        print(urls, len(rep.text))


def thread():
    tasklist = []
    for i in url:
        tasklist.append(threading.Thread(target=reps, args=(i,)))
    for task in tasklist:
        task.start()
    for tasks in tasklist:
        tasks.join()


if __name__ == '__main__':
    last = time.time()
    for s in url:
        reps(s)
    now = time.time()
    print("单线程执行时间: ", now-last)
    last = time.time()
    thread()
    now = time.time()
    print("多线程执行时间: ", now-last)

利用 python 实现生产者和消费者功能

多组件的 pipeline 技术架构,复杂的事情一般都不会一下子做完,而是很多中间步骤一步一步完成,生产者消费者爬虫架构,线程组一(生产者)爬取网页放入队列中,线程组二(消费者)解析生产者队列中的网页数据,然后存储起来。

多线程数据通信

使用模块 queue.queue,用于多线程之间的、线程安全的数据通信。

Python
# 导入模块
import queue

# 创建Queue
q = queue.Queue()

# 添加元素
q.put(item)

# 获取元素
item = q.get()

# 查询状态
# 查看元素的多少
q.qsize()
# 判断是否为空
q.empty()
# 判断是否已满
q.full()

# 等待队列为空在执行继续执行
q.join()

代码示例

Python
# 通过生产者和消费者关系,爬取博客园网页数据
import queue
import threading

import requests
from bs4 import BeautifulSoup

lock = threading.Lock()
data = [
    {"CategoryType": "SiteHome",
     "ParentCategoryId": 0,
     "CategoryId": 808,
     "PageIndex": i,
     "TotalPostCount": 2000,
     "ItemListActionName": "AggSitePostList"}
    for i in range(1, 101)
]
url = "https://www.cnblogs.com/AggSite/AggSitePostList"


# 获取网页数据,相当于生产者使用的方法
def reps(datas):
    rep = requests.post(url, json=datas)
    return rep.text, datas["PageIndex"]


# 解析网页数据,相当于消费者使用的方法
def html_output(html):
    # class="post-item-title"
    soup = BeautifulSoup(html, "html.parser")
    links = soup.find_all("a", class_="post-item-title")
    return [(link["href"], link.get_text()) for link in links]


# 定义生产者,获取数据
def do_data(url_queues: queue.Queue, html_queues: queue.Queue):
    while True:
        # 如果需要获取URL队列没有目标,则结束该线程
        if url_queues.empty():
            with lock:
                print("结束", threading.current_thread().name, "消费者队列剩余: ", html_queues.qsize())
            break
        urls = url_queues.get()
        html = reps(urls)
        # 生产者获取的数据放入消费者队列
        html_queues.put(html)
        with lock:
            print(threading.current_thread().name, f"生产者队列剩余:", url_queues.qsize(), ",消费者队列剩余",
                  html_queues.qsize())
        url_queues.task_done()


# 定义消费者,解析数据
def out_data(html_queues: queue.Queue, file):
    while True:
        try:
            # 如果线程10秒依然未从消费者队列获取数据,则结束该线程
            html = html_queues.get(timeout=10)
        except:
            break
        results = html_output(html[0])
        with lock:
            print(threading.current_thread().name,
                  f"消费者已消费一个队列,第{html[1]}获取数据: {len(results)}条 消费者队列剩余:", html_queues.qsize())
        for result in results:
            with lock:
                file.write(str(result).strip() + "\n")
        html_queues.task_done()


if __name__ == '__main__':
    with open("result.txt", "w", encoding="utf-8") as files:
        # 定义生产则消费者队列
        url_queue = queue.Queue()
        html_queue = queue.Queue()
        do = []
        out = []
        # 将需要生产者爬取的数据放入生产者队列
        for data_s in data:
            url_queue.put(data_s)
        # 开启生产者消费者线程
        for idx in range(30):
            do.append(threading.Thread(target=do_data, args=(url_queue, html_queue), name=f"生产者{idx}"))
        for idx in range(4):
            out.append(threading.Thread(target=out_data, args=(html_queue, files), name=f"消费者{idx}"))
        # 启动线程
        for a in do:
            a.start()
        for b in out:
            b.start()
        # 等待线程结束或则等待队列全部取完都可
        # for s in do:
        #     s.join()
        # for g in out:
        #     g.join()
        url_queue.join()
        html_queue.join()
        with lock:
            print("end", url_queue.qsize())
            print("end", html_queue.qsize())

线程安全-锁

线程安全指某个函数、库函数在多线程环境中被调用时,能够正确的处理多个线程之间的共享变量。使程序功能正确完成。由于线程执行随时会发生切换,就造成了不可预料的结果,出现线程的不安全。

解决办法:

Python
import threading


# 方法一
lock = threading.Lock()
lock.acquire()
try:
    # do
finally:
    lock.release()


# 方法二
lock = threading.Lock()
with lock:
    # do