# 通过生产者和消费者关系,爬取博客园网页数据
import queue
import threading
import requests
from bs4 import BeautifulSoup
lock = threading.Lock()
data = [
{"CategoryType": "SiteHome",
"ParentCategoryId": 0,
"CategoryId": 808,
"PageIndex": i,
"TotalPostCount": 2000,
"ItemListActionName": "AggSitePostList"}
for i in range(1, 101)
]
url = "https://www.cnblogs.com/AggSite/AggSitePostList"
# 获取网页数据,相当于生产者使用的方法
def reps(datas):
rep = requests.post(url, json=datas)
return rep.text, datas["PageIndex"]
# 解析网页数据,相当于消费者使用的方法
def html_output(html):
# class="post-item-title"
soup = BeautifulSoup(html, "html.parser")
links = soup.find_all("a", class_="post-item-title")
return [(link["href"], link.get_text()) for link in links]
# 定义生产者,获取数据
def do_data(url_queues: queue.Queue, html_queues: queue.Queue):
while True:
# 如果需要获取URL队列没有目标,则结束该线程
if url_queues.empty():
with lock:
print("结束", threading.current_thread().name, "消费者队列剩余: ", html_queues.qsize())
break
urls = url_queues.get()
html = reps(urls)
# 生产者获取的数据放入消费者队列
html_queues.put(html)
with lock:
print(threading.current_thread().name, f"生产者队列剩余:", url_queues.qsize(), ",消费者队列剩余",
html_queues.qsize())
url_queues.task_done()
# 定义消费者,解析数据
def out_data(html_queues: queue.Queue, file):
while True:
try:
# 如果线程10秒依然未从消费者队列获取数据,则结束该线程
html = html_queues.get(timeout=10)
except:
break
results = html_output(html[0])
with lock:
print(threading.current_thread().name,
f"消费者已消费一个队列,第{html[1]}获取数据: {len(results)}条 消费者队列剩余:", html_queues.qsize())
for result in results:
with lock:
file.write(str(result).strip() + "\n")
html_queues.task_done()
if __name__ == '__main__':
with open("result.txt", "w", encoding="utf-8") as files:
# 定义生产则消费者队列
url_queue = queue.Queue()
html_queue = queue.Queue()
do = []
out = []
# 将需要生产者爬取的数据放入生产者队列
for data_s in data:
url_queue.put(data_s)
# 开启生产者消费者线程
for idx in range(30):
do.append(threading.Thread(target=do_data, args=(url_queue, html_queue), name=f"生产者{idx}"))
for idx in range(4):
out.append(threading.Thread(target=out_data, args=(html_queue, files), name=f"消费者{idx}"))
# 启动线程
for a in do:
a.start()
for b in out:
b.start()
# 等待线程结束或则等待队列全部取完都可
# for s in do:
# s.join()
# for g in out:
# g.join()
url_queue.join()
html_queue.join()
with lock:
print("end", url_queue.qsize())
print("end", html_queue.qsize())