首页
/ KeymouseGo多线程执行:同时运行多个脚本的方法

KeymouseGo多线程执行:同时运行多个脚本的方法

2026-02-05 05:29:03作者:裴麒琰

一、多线程执行的核心价值

在自动化测试、批量数据处理、游戏多开操作等场景中,单脚本顺序执行已无法满足效率需求。KeymouseGo作为轻量化的键鼠自动化工具,通过多线程技术实现多个脚本并行运行,可将工作效率提升3-5倍。本文将系统讲解实现多线程执行的完整方案,包括原生API调用、线程池管理、冲突解决方案及性能优化策略。

二、多线程执行的技术基础

2.1 线程模型分析

KeymouseGo的脚本执行基于RunScriptClass实现,该类继承QThread并通过信号槽机制实现线程间通信:

class RunScriptClass(QThread, RunScriptMeta):
    logSignal: Signal = Signal(str)       # 日志输出信号
    tnumrdSignal: Signal = Signal(str)    # 状态更新信号
    btnSignal: Signal = Signal(bool)      # 按钮状态信号
    statusSignal: Signal = Signal(bool)   # 执行状态信号
    playtuneSignal: Signal = Signal(str)  # 音效播放信号
    
    def run(self) -> None:
        # 线程主执行函数
        try:
            self.run_script_from_path(self.script_path)
        except Exception as e:
            logger.error(e)

2.2 关键限制与突破点

原生实现中存在两个核心限制:

  1. 单实例绑定:每个RunScriptClass实例绑定单个脚本路径
  2. 状态互斥:通过QMutexQWaitCondition实现的暂停/恢复机制为全局共享资源

突破方案:

  • 创建独立线程上下文
  • 实现资源隔离的状态管理
  • 构建线程池调度机制

三、实现多线程执行的三种方案

3.1 基础方案:手动创建多线程实例

通过直接实例化多个RunScriptClass对象实现并行执行:

from Util.RunScriptClass import RunScriptClass
from PySide6.QtWidgets import QApplication
import sys

def run_script_in_thread(script_path, run_times=1):
    """创建独立线程执行指定脚本"""
    app = QApplication(sys.argv)  # 确保每个线程有独立的QApplication实例
    thread = RunScriptClass(script_path, run_times, StopFlag(False))
    thread.finished.connect(app.quit)  # 执行完毕后退出线程事件循环
    thread.start()
    app.exec()

# 同时运行两个脚本
if __name__ == "__main__":
    import threading
    
    # 脚本1:表单自动填写
    t1 = threading.Thread(target=run_script_in_thread, 
                         args=("scripts/form_filler.kms", 5))
    
    # 脚本2:数据报表生成
    t2 = threading.Thread(target=run_script_in_thread,
                         args=("scripts/report_generator.kms", 1))
    
    t1.start()
    t2.start()
    t1.join()
    t2.join()

注意:该方案需为每个线程创建独立的QApplication实例,内存占用较高(约80-120MB/线程)

3.2 进阶方案:线程池管理

使用concurrent.futures.ThreadPoolExecutor实现线程池管理,优化资源分配:

from concurrent.futures import ThreadPoolExecutor, as_completed
from Util.RunScriptClass import RunScriptCMDClass
from dataclasses import dataclass

@dataclass
class ScriptTask:
    path: str          # 脚本路径
    run_times: int     # 执行次数
    priority: int = 5  # 优先级(1-10)

class ScriptThreadPool:
    def __init__(self, max_workers=4):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.futures = []
        
    def submit_task(self, task: ScriptTask):
        """提交脚本任务到线程池"""
        stop_flag = StopFlag(False)
        future = self.executor.submit(
            self._run_task, 
            task.path, 
            task.run_times, 
            stop_flag
        )
        self.futures.append((future, stop_flag, task))
        return future
        
    def _run_task(self, script_path, run_times, stop_flag):
        """执行单个脚本任务"""
        runner = RunScriptCMDClass(script_path, run_times, stop_flag)
        runner.run()
        return {
            "script": script_path,
            "status": "completed",
            "run_times": run_times
        }
        
    def cancel_all(self):
        """取消所有任务"""
        for future, stop_flag, _ in self.futures:
            if not future.done():
                stop_flag.value = True  # 触发停止标志
        self.executor.shutdown(wait=False)

# 使用示例
if __name__ == "__main__":
    pool = ScriptThreadPool(max_workers=3)  # 创建最多3个工作线程
    
    # 提交任务
    tasks = [
        ScriptTask("scripts/login.kms", 10),
        ScriptTask("scripts/data_entry.kms", 5),
        ScriptTask("scripts/screenshot.kms", 20)
    ]
    
    for task in tasks:
        pool.submit_task(task)
    
    # 等待所有任务完成
    for future, _, task in pool.futures:
        result = future.result()
        print(f"Task {task.path} completed with {result['run_times']} runs")

3.3 高级方案:进程隔离模式

对于资源密集型脚本,使用multiprocessing模块实现进程级隔离:

import multiprocessing
from Util.RunScriptClass import RunScriptCMDClass
from Util.Global import StopFlag

def process_worker(script_path, run_times):
    """进程工作函数"""
    stop_flag = StopFlag(False)
    runner = RunScriptCMDClass(script_path, run_times, stop_flag)
    runner.run()
    return f"Process {multiprocessing.current_process().name} completed"

# 创建进程池
if __name__ == "__main__":
    with multiprocessing.Pool(processes=2) as pool:
        # 提交两个脚本任务
        results = pool.starmap_async(
            process_worker,
            [
                ("scripts/auto_clicker.kms", 50),
                ("scripts/text_typer.kms", 100)
            ]
        )
        
        # 获取结果
        for result in results.get():
            print(result)

四、线程安全与冲突解决方案

4.1 资源竞争问题

当多个线程同时操作系统资源时会引发冲突,典型场景包括:

  • 剪贴板共享访问
  • 屏幕坐标定位偏差
  • 输入设备抢占

4.2 互斥锁实现

使用Python标准库的threading.Lock实现关键资源保护:

import threading

# 创建全局锁对象
clipboard_lock = threading.Lock()
input_lock = threading.Lock()

class SafeRunScriptClass(RunScriptClass):
    def execute_keyboard_event(self, event):
        """线程安全的键盘事件执行"""
        with input_lock:  # 获取输入锁
            super().execute_keyboard_event(event)
    
    def get_clipboard_content(self):
        """线程安全的剪贴板访问"""
        with clipboard_lock:  # 获取剪贴板锁
            return super().get_clipboard_content()

4.3 坐标偏移策略

多窗口并行操作时,通过动态计算坐标偏移避免点击冲突:

def calculate_offset_coords(base_x, base_y, window_index, grid_cols=2):
    """
    计算网格布局窗口的坐标偏移
    :param base_x: 基准X坐标
    :param base_y: 基准Y坐标
    :param window_index: 窗口索引(从0开始)
    :param grid_cols: 网格列数
    :return: 偏移后的坐标(x, y)
    """
    window_width = 800  # 假设窗口宽度
    window_height = 600  # 假设窗口高度
    
    col = window_index % grid_cols
    row = window_index // grid_cols
    
    offset_x = base_x + col * window_width
    offset_y = base_y + row * window_height
    
    return (offset_x, offset_y)

五、线程池管理工具开发

5.1 任务调度器实现

from dataclasses import dataclass
from enum import Enum
from typing import List, Dict
import time

class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    PAUSED = "paused"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

@dataclass
class ScheduledTask:
    script_path: str
    run_times: int
    priority: int = 5
    status: TaskStatus = TaskStatus.PENDING
    start_time: float = 0
    end_time: float = 0

class ScriptScheduler:
    def __init__(self, max_workers=3):
        self.pool = ScriptThreadPool(max_workers=max_workers)
        self.tasks: Dict[str, ScheduledTask] = {}
        
    def add_task(self, task: ScheduledTask):
        """添加任务到调度队列"""
        self.tasks[task.script_path] = task
        # 按优先级排序执行
        self._schedule_tasks()
        
    def _schedule_tasks(self):
        """按优先级调度任务"""
        sorted_tasks = sorted(
            [t for t in self.tasks.values() if t.status == TaskStatus.PENDING],
            key=lambda x: x.priority,
            reverse=True
        )
        
        for task in sorted_tasks[:self.pool.executor._max_workers]:
            self._start_task(task)
            
    def _start_task(self, task: ScheduledTask):
        """启动单个任务"""
        task.status = TaskStatus.RUNNING
        task.start_time = time.time()
        
        future = self.pool.submit_task(task)
        future.add_done_callback(
            lambda f: self._task_completed(f, task)
        )
        
    def _task_completed(self, future, task: ScheduledTask):
        """任务完成回调"""
        task.status = TaskStatus.COMPLETED
        task.end_time = time.time()
        # 调度下一个任务
        self._schedule_tasks()
        
    def get_task_stats(self) -> Dict:
        """获取任务统计信息"""
        return {
            "total": len(self.tasks),
            "running": sum(1 for t in self.tasks.values() if t.status == TaskStatus.RUNNING),
            "completed": sum(1 for t in self.tasks.values() if t.status == TaskStatus.COMPLETED),
            "avg_duration": self._calculate_avg_duration()
        }
        
    def _calculate_avg_duration(self) -> float:
        """计算平均任务时长"""
        completed = [t for t in self.tasks.values() if t.status == TaskStatus.COMPLETED]
        if not completed:
            return 0
        return sum(t.end_time - t.start_time for t in completed) / len(completed)

5.2 监控与管理界面

结合PySide6实现多线程监控面板:

from PySide6.QtWidgets import QTableWidget, QTableWidgetItem, QProgressBar

class ThreadMonitorWidget(QTableWidget):
    def __init__(self, scheduler: ScriptScheduler, parent=None):
        super().__init__(parent)
        self.scheduler = scheduler
        self.init_ui()
        self.update_timer.start(1000)  # 每秒更新一次
        
    def init_ui(self):
        """初始化UI组件"""
        self.setColumnCount(5)
        self.setHorizontalHeaderLabels([
            "脚本路径", "状态", "运行次数", "优先级", "进度"
        ])
        self.update_timer = QTimer(self)
        self.update_timer.timeout.connect(self.update_table)
        
    def update_table(self):
        """更新任务状态表格"""
        self.setRowCount(len(self.scheduler.tasks))
        
        for row, task in enumerate(self.scheduler.tasks.values()):
            # 脚本路径
            self.setItem(row, 0, QTableWidgetItem(task.script_path))
            # 任务状态
            self.setItem(row, 1, QTableWidgetItem(task.status.value))
            # 运行次数
            self.setItem(row, 2, QTableWidgetItem(str(task.run_times)))
            # 优先级
            self.setItem(row, 3, QTableWidgetItem(str(task.priority)))
            # 进度条
            progress = QProgressBar()
            progress.setValue(self._calculate_progress(task))
            self.setCellWidget(row, 4, progress)
            
    def _calculate_progress(self, task: ScheduledTask) -> int:
        """计算任务进度百分比"""
        if task.status == TaskStatus.COMPLETED:
            return 100
        if task.status != TaskStatus.RUNNING:
            return 0
        # 简单时间比例估算
        elapsed = time.time() - task.start_time
        avg_duration = self.scheduler.get_task_stats()["avg_duration"] or 1
        return min(int((elapsed / avg_duration) * 100), 99)

六、性能优化与最佳实践

6.1 线程数配置原则

根据硬件配置选择最优线程数:

  • CPU核心数 ≤ 4:建议线程数 = CPU核心数
  • CPU核心数 > 4:建议线程数 = CPU核心数 × 0.75
  • 磁盘IO密集型任务:建议线程数 = CPU核心数 × 1.5

6.2 资源限制策略

# 设置每个线程的资源限制
def configure_thread_resources():
    # 1. 限制内存使用
    import resource
    resource.setrlimit(
        resource.RLIMIT_AS,  # 地址空间限制
        (512 * 1024 * 1024, 512 * 1024 * 1024)  # 512MB上限
    )
    
    # 2. 设置CPU时间限制
    resource.setrlimit(
        resource.RLIMIT_CPU,
        (300, 300)  # 5分钟CPU时间限制
    )

6.3 冲突避免检查表

执行多线程任务前应检查:

  • [ ] 脚本是否依赖固定屏幕坐标
  • [ ] 是否使用全局热键
  • [ ] 是否操作同一文件/窗口
  • [ ] 是否需要网络资源竞争
  • [ ] 是否有共享剪贴板操作

七、完整实现案例

7.1 多线程执行器完整代码

import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from typing import List, Dict, Callable
from PySide6.QtCore import QObject, Signal

from Util.RunScriptClass import RunScriptCMDClass
from Util.Global import StopFlag

@dataclass
class ScriptJob:
    """脚本任务定义"""
    script_path: str
    run_times: int = 1
    priority: int = 5
    on_complete: Callable = None  # 完成回调函数

class MultiThreadExecutor(QObject):
    """多线程脚本执行器"""
    jobStarted = Signal(str)       # 任务开始信号
    jobCompleted = Signal(str, dict)  # 任务完成信号
    jobError = Signal(str, str)    # 任务错误信号
    
    def __init__(self, max_workers: int = 3):
        super().__init__()
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.jobs: Dict[str, ScriptJob] = {}
        self.futures = {}
        
    def submit_job(self, job: ScriptJob) -> None:
        """提交脚本任务"""
        if job.script_path in self.jobs:
            self.jobError.emit(
                job.script_path, 
                "Job already exists"
            )
            return
            
        self.jobs[job.script_path] = job
        self.jobStarted.emit(job.script_path)
        
        # 创建独立的停止标志
        stop_flag = StopFlag(False)
        
        # 提交线程任务
        future = self.executor.submit(
            self._execute_job,
            job,
            stop_flag
        )
        
        self.futures[future] = (job.script_path, stop_flag)
        future.add_done_callback(self._handle_complete)
        
    def _execute_job(self, job: ScriptJob, stop_flag: StopFlag) -> Dict:
        """执行脚本任务"""
        try:
            runner = RunScriptCMDClass(
                job.script_path,
                job.run_times,
                stop_flag
            )
            runner.run()
            return {
                "status": "success",
                "run_times": job.run_times,
                "thread_id": threading.get_ident()
            }
        except Exception as e:
            return {
                "status": "error",
                "message": str(e),
                "thread_id": threading.get_ident()
            }
            
    def _handle_complete(self, future) -> None:
        """处理任务完成"""
        job_path, stop_flag = self.futures.pop(future)
        job = self.jobs.pop(job_path)
        
        try:
            result = future.result()
            if result["status"] == "success":
                self.jobCompleted.emit(job_path, result)
                if job.on_complete:
                    job.on_complete(result)
            else:
                self.jobError.emit(job_path, result["message"])
        except Exception as e:
            self.jobError.emit(job_path, str(e))
            
    def cancel_job(self, script_path: str) -> bool:
        """取消指定任务"""
        for future, (path, stop_flag) in self.futures.items():
            if path == script_path and not future.done():
                stop_flag.value = True
                return True
        return False
        
    def cancel_all(self) -> None:
        """取消所有任务"""
        for future, (_, stop_flag) in self.futures.items():
            if not future.done():
                stop_flag.value = True
        self.executor.shutdown(wait=False)

# 使用示例
if __name__ == "__main__":
    # 创建执行器
    executor = MultiThreadExecutor(max_workers=2)
    
    # 定义完成回调
    def on_job_complete(result):
        print(f"Job completed with result: {result}")
    
    # 提交任务
    jobs = [
        ScriptJob(
            "scripts/auto_login.kms", 
            run_times=5, 
            priority=10,
            on_complete=on_job_complete
        ),
        ScriptJob(
            "scripts/data_process.kms",
            run_times=3,
            priority=5
        )
    ]
    
    for job in jobs:
        executor.submit_job(job)

7.2 典型应用场景配置

应用场景 推荐方案 线程数 资源限制 关键优化
自动化测试 线程池方案 CPU核心数×0.8 内存≤512MB/线程 测试数据隔离
批量数据录入 进程隔离方案 CPU核心数 无限制 输入速率控制
游戏多开操作 基础多线程 2-3线程 显存≤256MB/线程 窗口坐标偏移
监控告警系统 线程池+优先级 4-6线程 内存≤256MB/线程 任务超时控制

八、常见问题与解决方案

8.1 脚本冲突问题

现象:多个脚本同时操作同一窗口导致点击错位
解决方案:实现窗口句柄绑定

def bind_to_window(window_title: str):
    """绑定脚本执行到指定窗口"""
    import win32gui
    hwnd = win32gui.FindWindow(None, window_title)
    if hwnd == 0:
        raise Exception(f"Window {window_title} not found")
        
    # 获取窗口客户区坐标
    left, top, right, bottom = win32gui.GetClientRect(hwnd)
    # 转换为屏幕坐标
    left, top = win32gui.ClientToScreen(hwnd, (left, top))
    
    return lambda x, y: (left + x, top + y)  # 坐标转换函数

8.2 性能下降问题

现象:线程数增加后执行效率反而降低
解决方案:实现动态线程调整

def dynamic_thread_adjustment(executor, current_load: float):
    """根据系统负载动态调整线程数"""
    # current_load为CPU使用率(0.0-1.0)
    if current_load > 0.8 and executor.executor._max_workers > 1:
        # 高负载时减少线程
        new_workers = max(1, executor.executor._max_workers - 1)
        executor.executor._max_workers = new_workers
        print(f"Reduced threads to {new_workers} due to high load")
    elif current_load < 0.3 and executor.executor._max_workers < 8:
        # 低负载时增加线程
        new_workers = executor.executor._max_workers + 1
        executor.executor._max_workers = new_workers
        print(f"Increased threads to {new_workers} due to low load")

8.3 资源泄漏问题

现象:长时间运行后内存占用持续增长
解决方案:实现资源自动回收

import gc

class ResourceManagedRunner(RunScriptClass):
    def __del__(self):
        """析构函数中释放资源"""
        # 强制垃圾回收
        gc.collect()
        # 释放底层资源
        if hasattr(self, 'runner'):
            self.runner.cleanup()
        logger.debug(f"Thread {self.thread_id} resources released")

九、总结与展望

KeymouseGo的多线程执行能力极大扩展了其自动化边界,通过本文介绍的三种方案,可满足从简单并行到复杂任务调度的全场景需求。未来版本可能会原生集成线程池管理功能,提供更友好的多脚本执行界面。

建议根据实际场景选择合适方案:

  • 简单并行:基础多线程方案
  • 任务调度:线程池方案
  • 资源隔离:进程隔离方案

通过合理的线程管理和资源控制,KeymouseGo能够在保持轻量特性的同时,实现高效的多任务自动化执行。

实用工具推荐KeymouseGo脚本市场提供了大量可直接使用的多线程兼容脚本,包括数据采集、自动化测试、游戏辅助等类别,欢迎贡献你的原创脚本!

登录后查看全文
热门项目推荐
相关项目推荐