KeymouseGo多线程执行:同时运行多个脚本的方法
一、多线程执行的核心价值
在自动化测试、批量数据处理、游戏多开操作等场景中,单脚本顺序执行已无法满足效率需求。KeymouseGo作为轻量化的键鼠自动化工具,通过多线程技术实现多个脚本并行运行,可将工作效率提升3-5倍。本文将系统讲解实现多线程执行的完整方案,包括原生API调用、线程池管理、冲突解决方案及性能优化策略。
二、多线程执行的技术基础
2.1 线程模型分析
KeymouseGo的脚本执行基于RunScriptClass实现,该类继承QThread并通过信号槽机制实现线程间通信:
class RunScriptClass(QThread, RunScriptMeta):
logSignal: Signal = Signal(str) # 日志输出信号
tnumrdSignal: Signal = Signal(str) # 状态更新信号
btnSignal: Signal = Signal(bool) # 按钮状态信号
statusSignal: Signal = Signal(bool) # 执行状态信号
playtuneSignal: Signal = Signal(str) # 音效播放信号
def run(self) -> None:
# 线程主执行函数
try:
self.run_script_from_path(self.script_path)
except Exception as e:
logger.error(e)
2.2 关键限制与突破点
原生实现中存在两个核心限制:
- 单实例绑定:每个
RunScriptClass实例绑定单个脚本路径 - 状态互斥:通过
QMutex和QWaitCondition实现的暂停/恢复机制为全局共享资源
突破方案:
- 创建独立线程上下文
- 实现资源隔离的状态管理
- 构建线程池调度机制
三、实现多线程执行的三种方案
3.1 基础方案:手动创建多线程实例
通过直接实例化多个RunScriptClass对象实现并行执行:
from Util.RunScriptClass import RunScriptClass
from PySide6.QtWidgets import QApplication
import sys
def run_script_in_thread(script_path, run_times=1):
"""创建独立线程执行指定脚本"""
app = QApplication(sys.argv) # 确保每个线程有独立的QApplication实例
thread = RunScriptClass(script_path, run_times, StopFlag(False))
thread.finished.connect(app.quit) # 执行完毕后退出线程事件循环
thread.start()
app.exec()
# 同时运行两个脚本
if __name__ == "__main__":
import threading
# 脚本1:表单自动填写
t1 = threading.Thread(target=run_script_in_thread,
args=("scripts/form_filler.kms", 5))
# 脚本2:数据报表生成
t2 = threading.Thread(target=run_script_in_thread,
args=("scripts/report_generator.kms", 1))
t1.start()
t2.start()
t1.join()
t2.join()
注意:该方案需为每个线程创建独立的
QApplication实例,内存占用较高(约80-120MB/线程)
3.2 进阶方案:线程池管理
使用concurrent.futures.ThreadPoolExecutor实现线程池管理,优化资源分配:
from concurrent.futures import ThreadPoolExecutor, as_completed
from Util.RunScriptClass import RunScriptCMDClass
from dataclasses import dataclass
@dataclass
class ScriptTask:
path: str # 脚本路径
run_times: int # 执行次数
priority: int = 5 # 优先级(1-10)
class ScriptThreadPool:
def __init__(self, max_workers=4):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.futures = []
def submit_task(self, task: ScriptTask):
"""提交脚本任务到线程池"""
stop_flag = StopFlag(False)
future = self.executor.submit(
self._run_task,
task.path,
task.run_times,
stop_flag
)
self.futures.append((future, stop_flag, task))
return future
def _run_task(self, script_path, run_times, stop_flag):
"""执行单个脚本任务"""
runner = RunScriptCMDClass(script_path, run_times, stop_flag)
runner.run()
return {
"script": script_path,
"status": "completed",
"run_times": run_times
}
def cancel_all(self):
"""取消所有任务"""
for future, stop_flag, _ in self.futures:
if not future.done():
stop_flag.value = True # 触发停止标志
self.executor.shutdown(wait=False)
# 使用示例
if __name__ == "__main__":
pool = ScriptThreadPool(max_workers=3) # 创建最多3个工作线程
# 提交任务
tasks = [
ScriptTask("scripts/login.kms", 10),
ScriptTask("scripts/data_entry.kms", 5),
ScriptTask("scripts/screenshot.kms", 20)
]
for task in tasks:
pool.submit_task(task)
# 等待所有任务完成
for future, _, task in pool.futures:
result = future.result()
print(f"Task {task.path} completed with {result['run_times']} runs")
3.3 高级方案:进程隔离模式
对于资源密集型脚本,使用multiprocessing模块实现进程级隔离:
import multiprocessing
from Util.RunScriptClass import RunScriptCMDClass
from Util.Global import StopFlag
def process_worker(script_path, run_times):
"""进程工作函数"""
stop_flag = StopFlag(False)
runner = RunScriptCMDClass(script_path, run_times, stop_flag)
runner.run()
return f"Process {multiprocessing.current_process().name} completed"
# 创建进程池
if __name__ == "__main__":
with multiprocessing.Pool(processes=2) as pool:
# 提交两个脚本任务
results = pool.starmap_async(
process_worker,
[
("scripts/auto_clicker.kms", 50),
("scripts/text_typer.kms", 100)
]
)
# 获取结果
for result in results.get():
print(result)
四、线程安全与冲突解决方案
4.1 资源竞争问题
当多个线程同时操作系统资源时会引发冲突,典型场景包括:
- 剪贴板共享访问
- 屏幕坐标定位偏差
- 输入设备抢占
4.2 互斥锁实现
使用Python标准库的threading.Lock实现关键资源保护:
import threading
# 创建全局锁对象
clipboard_lock = threading.Lock()
input_lock = threading.Lock()
class SafeRunScriptClass(RunScriptClass):
def execute_keyboard_event(self, event):
"""线程安全的键盘事件执行"""
with input_lock: # 获取输入锁
super().execute_keyboard_event(event)
def get_clipboard_content(self):
"""线程安全的剪贴板访问"""
with clipboard_lock: # 获取剪贴板锁
return super().get_clipboard_content()
4.3 坐标偏移策略
多窗口并行操作时,通过动态计算坐标偏移避免点击冲突:
def calculate_offset_coords(base_x, base_y, window_index, grid_cols=2):
"""
计算网格布局窗口的坐标偏移
:param base_x: 基准X坐标
:param base_y: 基准Y坐标
:param window_index: 窗口索引(从0开始)
:param grid_cols: 网格列数
:return: 偏移后的坐标(x, y)
"""
window_width = 800 # 假设窗口宽度
window_height = 600 # 假设窗口高度
col = window_index % grid_cols
row = window_index // grid_cols
offset_x = base_x + col * window_width
offset_y = base_y + row * window_height
return (offset_x, offset_y)
五、线程池管理工具开发
5.1 任务调度器实现
from dataclasses import dataclass
from enum import Enum
from typing import List, Dict
import time
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
PAUSED = "paused"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class ScheduledTask:
script_path: str
run_times: int
priority: int = 5
status: TaskStatus = TaskStatus.PENDING
start_time: float = 0
end_time: float = 0
class ScriptScheduler:
def __init__(self, max_workers=3):
self.pool = ScriptThreadPool(max_workers=max_workers)
self.tasks: Dict[str, ScheduledTask] = {}
def add_task(self, task: ScheduledTask):
"""添加任务到调度队列"""
self.tasks[task.script_path] = task
# 按优先级排序执行
self._schedule_tasks()
def _schedule_tasks(self):
"""按优先级调度任务"""
sorted_tasks = sorted(
[t for t in self.tasks.values() if t.status == TaskStatus.PENDING],
key=lambda x: x.priority,
reverse=True
)
for task in sorted_tasks[:self.pool.executor._max_workers]:
self._start_task(task)
def _start_task(self, task: ScheduledTask):
"""启动单个任务"""
task.status = TaskStatus.RUNNING
task.start_time = time.time()
future = self.pool.submit_task(task)
future.add_done_callback(
lambda f: self._task_completed(f, task)
)
def _task_completed(self, future, task: ScheduledTask):
"""任务完成回调"""
task.status = TaskStatus.COMPLETED
task.end_time = time.time()
# 调度下一个任务
self._schedule_tasks()
def get_task_stats(self) -> Dict:
"""获取任务统计信息"""
return {
"total": len(self.tasks),
"running": sum(1 for t in self.tasks.values() if t.status == TaskStatus.RUNNING),
"completed": sum(1 for t in self.tasks.values() if t.status == TaskStatus.COMPLETED),
"avg_duration": self._calculate_avg_duration()
}
def _calculate_avg_duration(self) -> float:
"""计算平均任务时长"""
completed = [t for t in self.tasks.values() if t.status == TaskStatus.COMPLETED]
if not completed:
return 0
return sum(t.end_time - t.start_time for t in completed) / len(completed)
5.2 监控与管理界面
结合PySide6实现多线程监控面板:
from PySide6.QtWidgets import QTableWidget, QTableWidgetItem, QProgressBar
class ThreadMonitorWidget(QTableWidget):
def __init__(self, scheduler: ScriptScheduler, parent=None):
super().__init__(parent)
self.scheduler = scheduler
self.init_ui()
self.update_timer.start(1000) # 每秒更新一次
def init_ui(self):
"""初始化UI组件"""
self.setColumnCount(5)
self.setHorizontalHeaderLabels([
"脚本路径", "状态", "运行次数", "优先级", "进度"
])
self.update_timer = QTimer(self)
self.update_timer.timeout.connect(self.update_table)
def update_table(self):
"""更新任务状态表格"""
self.setRowCount(len(self.scheduler.tasks))
for row, task in enumerate(self.scheduler.tasks.values()):
# 脚本路径
self.setItem(row, 0, QTableWidgetItem(task.script_path))
# 任务状态
self.setItem(row, 1, QTableWidgetItem(task.status.value))
# 运行次数
self.setItem(row, 2, QTableWidgetItem(str(task.run_times)))
# 优先级
self.setItem(row, 3, QTableWidgetItem(str(task.priority)))
# 进度条
progress = QProgressBar()
progress.setValue(self._calculate_progress(task))
self.setCellWidget(row, 4, progress)
def _calculate_progress(self, task: ScheduledTask) -> int:
"""计算任务进度百分比"""
if task.status == TaskStatus.COMPLETED:
return 100
if task.status != TaskStatus.RUNNING:
return 0
# 简单时间比例估算
elapsed = time.time() - task.start_time
avg_duration = self.scheduler.get_task_stats()["avg_duration"] or 1
return min(int((elapsed / avg_duration) * 100), 99)
六、性能优化与最佳实践
6.1 线程数配置原则
根据硬件配置选择最优线程数:
- CPU核心数 ≤ 4:建议线程数 = CPU核心数
- CPU核心数 > 4:建议线程数 = CPU核心数 × 0.75
- 磁盘IO密集型任务:建议线程数 = CPU核心数 × 1.5
6.2 资源限制策略
# 设置每个线程的资源限制
def configure_thread_resources():
# 1. 限制内存使用
import resource
resource.setrlimit(
resource.RLIMIT_AS, # 地址空间限制
(512 * 1024 * 1024, 512 * 1024 * 1024) # 512MB上限
)
# 2. 设置CPU时间限制
resource.setrlimit(
resource.RLIMIT_CPU,
(300, 300) # 5分钟CPU时间限制
)
6.3 冲突避免检查表
执行多线程任务前应检查:
- [ ] 脚本是否依赖固定屏幕坐标
- [ ] 是否使用全局热键
- [ ] 是否操作同一文件/窗口
- [ ] 是否需要网络资源竞争
- [ ] 是否有共享剪贴板操作
七、完整实现案例
7.1 多线程执行器完整代码
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from typing import List, Dict, Callable
from PySide6.QtCore import QObject, Signal
from Util.RunScriptClass import RunScriptCMDClass
from Util.Global import StopFlag
@dataclass
class ScriptJob:
"""脚本任务定义"""
script_path: str
run_times: int = 1
priority: int = 5
on_complete: Callable = None # 完成回调函数
class MultiThreadExecutor(QObject):
"""多线程脚本执行器"""
jobStarted = Signal(str) # 任务开始信号
jobCompleted = Signal(str, dict) # 任务完成信号
jobError = Signal(str, str) # 任务错误信号
def __init__(self, max_workers: int = 3):
super().__init__()
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.jobs: Dict[str, ScriptJob] = {}
self.futures = {}
def submit_job(self, job: ScriptJob) -> None:
"""提交脚本任务"""
if job.script_path in self.jobs:
self.jobError.emit(
job.script_path,
"Job already exists"
)
return
self.jobs[job.script_path] = job
self.jobStarted.emit(job.script_path)
# 创建独立的停止标志
stop_flag = StopFlag(False)
# 提交线程任务
future = self.executor.submit(
self._execute_job,
job,
stop_flag
)
self.futures[future] = (job.script_path, stop_flag)
future.add_done_callback(self._handle_complete)
def _execute_job(self, job: ScriptJob, stop_flag: StopFlag) -> Dict:
"""执行脚本任务"""
try:
runner = RunScriptCMDClass(
job.script_path,
job.run_times,
stop_flag
)
runner.run()
return {
"status": "success",
"run_times": job.run_times,
"thread_id": threading.get_ident()
}
except Exception as e:
return {
"status": "error",
"message": str(e),
"thread_id": threading.get_ident()
}
def _handle_complete(self, future) -> None:
"""处理任务完成"""
job_path, stop_flag = self.futures.pop(future)
job = self.jobs.pop(job_path)
try:
result = future.result()
if result["status"] == "success":
self.jobCompleted.emit(job_path, result)
if job.on_complete:
job.on_complete(result)
else:
self.jobError.emit(job_path, result["message"])
except Exception as e:
self.jobError.emit(job_path, str(e))
def cancel_job(self, script_path: str) -> bool:
"""取消指定任务"""
for future, (path, stop_flag) in self.futures.items():
if path == script_path and not future.done():
stop_flag.value = True
return True
return False
def cancel_all(self) -> None:
"""取消所有任务"""
for future, (_, stop_flag) in self.futures.items():
if not future.done():
stop_flag.value = True
self.executor.shutdown(wait=False)
# 使用示例
if __name__ == "__main__":
# 创建执行器
executor = MultiThreadExecutor(max_workers=2)
# 定义完成回调
def on_job_complete(result):
print(f"Job completed with result: {result}")
# 提交任务
jobs = [
ScriptJob(
"scripts/auto_login.kms",
run_times=5,
priority=10,
on_complete=on_job_complete
),
ScriptJob(
"scripts/data_process.kms",
run_times=3,
priority=5
)
]
for job in jobs:
executor.submit_job(job)
7.2 典型应用场景配置
| 应用场景 | 推荐方案 | 线程数 | 资源限制 | 关键优化 |
|---|---|---|---|---|
| 自动化测试 | 线程池方案 | CPU核心数×0.8 | 内存≤512MB/线程 | 测试数据隔离 |
| 批量数据录入 | 进程隔离方案 | CPU核心数 | 无限制 | 输入速率控制 |
| 游戏多开操作 | 基础多线程 | 2-3线程 | 显存≤256MB/线程 | 窗口坐标偏移 |
| 监控告警系统 | 线程池+优先级 | 4-6线程 | 内存≤256MB/线程 | 任务超时控制 |
八、常见问题与解决方案
8.1 脚本冲突问题
现象:多个脚本同时操作同一窗口导致点击错位
解决方案:实现窗口句柄绑定
def bind_to_window(window_title: str):
"""绑定脚本执行到指定窗口"""
import win32gui
hwnd = win32gui.FindWindow(None, window_title)
if hwnd == 0:
raise Exception(f"Window {window_title} not found")
# 获取窗口客户区坐标
left, top, right, bottom = win32gui.GetClientRect(hwnd)
# 转换为屏幕坐标
left, top = win32gui.ClientToScreen(hwnd, (left, top))
return lambda x, y: (left + x, top + y) # 坐标转换函数
8.2 性能下降问题
现象:线程数增加后执行效率反而降低
解决方案:实现动态线程调整
def dynamic_thread_adjustment(executor, current_load: float):
"""根据系统负载动态调整线程数"""
# current_load为CPU使用率(0.0-1.0)
if current_load > 0.8 and executor.executor._max_workers > 1:
# 高负载时减少线程
new_workers = max(1, executor.executor._max_workers - 1)
executor.executor._max_workers = new_workers
print(f"Reduced threads to {new_workers} due to high load")
elif current_load < 0.3 and executor.executor._max_workers < 8:
# 低负载时增加线程
new_workers = executor.executor._max_workers + 1
executor.executor._max_workers = new_workers
print(f"Increased threads to {new_workers} due to low load")
8.3 资源泄漏问题
现象:长时间运行后内存占用持续增长
解决方案:实现资源自动回收
import gc
class ResourceManagedRunner(RunScriptClass):
def __del__(self):
"""析构函数中释放资源"""
# 强制垃圾回收
gc.collect()
# 释放底层资源
if hasattr(self, 'runner'):
self.runner.cleanup()
logger.debug(f"Thread {self.thread_id} resources released")
九、总结与展望
KeymouseGo的多线程执行能力极大扩展了其自动化边界,通过本文介绍的三种方案,可满足从简单并行到复杂任务调度的全场景需求。未来版本可能会原生集成线程池管理功能,提供更友好的多脚本执行界面。
建议根据实际场景选择合适方案:
- 简单并行:基础多线程方案
- 任务调度:线程池方案
- 资源隔离:进程隔离方案
通过合理的线程管理和资源控制,KeymouseGo能够在保持轻量特性的同时,实现高效的多任务自动化执行。
实用工具推荐:KeymouseGo脚本市场提供了大量可直接使用的多线程兼容脚本,包括数据采集、自动化测试、游戏辅助等类别,欢迎贡献你的原创脚本!
Kimi-K2.5Kimi K2.5 是一款开源的原生多模态智能体模型,它在 Kimi-K2-Base 的基础上,通过对约 15 万亿混合视觉和文本 tokens 进行持续预训练构建而成。该模型将视觉与语言理解、高级智能体能力、即时模式与思考模式,以及对话式与智能体范式无缝融合。Python00- QQwen3-Coder-Next2026年2月4日,正式发布的Qwen3-Coder-Next,一款专为编码智能体和本地开发场景设计的开源语言模型。Python00
xw-cli实现国产算力大模型零门槛部署,一键跑通 Qwen、GLM-4.7、Minimax-2.1、DeepSeek-OCR 等模型Go06
PaddleOCR-VL-1.5PaddleOCR-VL-1.5 是 PaddleOCR-VL 的新一代进阶模型,在 OmniDocBench v1.5 上实现了 94.5% 的全新 state-of-the-art 准确率。 为了严格评估模型在真实物理畸变下的鲁棒性——包括扫描伪影、倾斜、扭曲、屏幕拍摄和光照变化——我们提出了 Real5-OmniDocBench 基准测试集。实验结果表明,该增强模型在新构建的基准测试集上达到了 SOTA 性能。此外,我们通过整合印章识别和文本检测识别(text spotting)任务扩展了模型的能力,同时保持 0.9B 的超紧凑 VLM 规模,具备高效率特性。Python00
KuiklyUI基于KMP技术的高性能、全平台开发框架,具备统一代码库、极致易用性和动态灵活性。 Provide a high-performance, full-platform development framework with unified codebase, ultimate ease of use, and dynamic flexibility. 注意:本仓库为Github仓库镜像,PR或Issue请移步至Github发起,感谢支持!Kotlin08
VLOOKVLOOK™ 是优雅好用的 Typora/Markdown 主题包和增强插件。 VLOOK™ is an elegant and practical THEME PACKAGE × ENHANCEMENT PLUGIN for Typora/Markdown.Less00