Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 48 additions & 38 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import traceback
from concurrent.futures.thread import ThreadPoolExecutor
from dataclasses import dataclass
from queue import PriorityQueue, ShutDown
from queue import PriorityQueue
from threading import RLock
from typing import Any

Expand Down Expand Up @@ -279,60 +279,70 @@ def __init__(self, chaoxing: Chaoxing, course: dict[str, Any], speed: float, tas
self.wait_queue: PriorityQueue[ChapterTask] = PriorityQueue()
self.threads: list[threading.Thread] = []
self.workers = worker_num

# 修改后的 run 方法
def run(self):
for task in self.tasks:
self.task_queue.put(task)

for i in range(self.workers):
thread = threading.Thread(target=self.worker_thread, daemon=True)
# daemon=False 让主线程可以等待它们结束
thread = threading.Thread(target=self.worker_thread)
self.threads.append(thread)
thread.start()

# 重试线程仍然可以是守护线程,因为它不处理新任务
threading.Thread(target=self.retry_thread, daemon=True).start()

# 等待所有实际任务被处理完毕
self.task_queue.join()
self.task_queue.shutdown()

# 发送“哨兵”信号 (None),告诉所有工作线程可以退出了
for _ in range(self.workers):
self.task_queue.put(None)

# 等待所有工作线程真正执行完毕并退出
for thread in self.threads:
thread.join()

# 修改后的 worker_thread 方法
@log_error
def worker_thread(self):
while True:

@zsyeh zsyeh Oct 16, 2025

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感谢作者耐心回复,支持保留3.13版本状态

task = self.task_queue.get()

# 收到哨兵信号 (None),退出循环
if task is None:
break

try:
task = self.task_queue.get()
except ShutDown:
logger.info("Worker task done")
return

task.result = process_chapter(self.chaoxing, self.course, task.point, self.speed)
match task.result:
case ChapterResult.SUCCESS:
logger.info("Task success: {}", task.point["title"])
self.task_queue.task_done()

case ChapterResult.NOT_OPEN:
# task.tries += 1
if task.tries >= self.max_tries:
logger.warning("Chapter is not open: {}", task.point["title"])
self.task_queue.task_done()
continue
# self.wait_queue.put(task)
self.retry_queue.put(task)

case ChapterResult.ERROR:
task.tries += 1
logger.warning("Retrying task %s (%d/%d attempts)", task.point["title"], task.tries,
self.max_tries)
if task.tries >= self.max_tries:
logger.error("Max retries reached for task: %s", task.point["title"])
task.result = process_chapter(self.chaoxing, self.course, task.point, self.speed)
match task.result:
case ChapterResult.SUCCESS:
logger.info(f"Task success: {task.point['title']}")

case ChapterResult.NOT_OPEN:
# task.tries += 1 # 章节未开放不应该算作尝试次数
if task.tries >= self.max_tries:
logger.warning(f"Chapter is not open: {task.point['title']}")
continue
self.retry_queue.put(task)

case ChapterResult.ERROR:
task.tries += 1
logger.warning(f"Retrying task {task.point['title']} ({task.tries}/{self.max_tries} attempts)")
if task.tries >= self.max_tries:
logger.error(f"Max retries reached for task: {task.point['title']}")
self.failed_tasks.append(task)
continue
self.retry_queue.put(task)

case _:
logger.error(f"Invalid task state {task.result} for task {task.point['title']}")
self.failed_tasks.append(task)
self.task_queue.task_done()
continue
self.retry_queue.put(task)

case _:
logger.error("Invalid task state %s for task %s", task.result, task.point["title"])
self.failed_tasks.append(task)
self.task_queue.task_done()
finally:
# 关键:无论上面发生什么,都必须调用 task_done()
# 这样 task_queue.join() 才能正确工作
self.task_queue.task_done()



Expand Down