Skip to content
Back

GPU Hunter

AI 总结

该脚本用于在GPU空闲时自动执行训练任务。用户可配置多组并行任务,每组任务会在指定GPU使用率低于阈值时启动。脚本会持续监测GPU状态,并在满足条件时按顺序或并行执行命令,同时记录执行日志。这有助于在共享GPU资源时高效安排实验,避免手动等待。

在急着跑实验时可以使用该脚本挂机监测GPU占用情况并在满足条件时开启训练

import subprocess
import time
import logging
import os

# =============================================================================
# GPU Hunter Configuration
# =============================================================================
# 配置结构说明:
# CONFIG 是一个二维列表,每个子列表代表一组并行执行的任务
# 每个任务是一个字典,包含 command_list 和 GPU_INDEX 键
# 同一子列表中的任务会并行执行(使用不同的GPU)
# 不同子列表之间串行执行(后面的子列表会等待前面的全部完成)
CONFIG = [
    # 第一组任务(并行执行)
    [
        {
            "command_list": [

                "bash scripts.sh",
            ],
            "GPU_INDEX": "1,2,3"
        },
        {
            "command_list": [

                "bash scripts.sh",

                "bash scripts.sh",
            ],
            "GPU_INDEX": "4,5,6,7"
        }
    ],
    [
        {
            "command_list": [
                "conda deactivate",
                "conda activate ivtlr-wxr",
                "bash bash scripts.sh"
            ],
            "GPU_INDEX": "1,2,3,5,6,7"
        }
    ]
    # 可以继续添加更多组任务...
    # 每组任务会在前一组全部完成后才开始执行
]

# 全局配置参数
CHECK_INTERVAL = 600      # GPU检查间隔(秒)
GPU_THRESHOLD = 10        # GPU使用率阈值(%)
LOG_FILE_PATH = "gpu_hunter.log"

# =============================================================================
# Logger Setup
# =============================================================================
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
if not logger.handlers:
    file_handler = logging.FileHandler(LOG_FILE_PATH)
    formatter = logging.Formatter('%(asctime)s - %(message)s')
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)

print("start hunting")

def get_gpu_memory_usage(gpu_indices="0,1,2,3"):
    """获取指定GPU的内存使用情况"""
    try:
        output = subprocess.check_output(
            ["nvidia-smi", "--query-gpu=memory.used,memory.total",
             "--format=csv,noheader,nounits", "-i", gpu_indices],
            universal_newlines=True
        )
        return output.strip().split('\\n')
    except Exception as e:
        logger.error(f"Error getting GPU info: {e}")
        print(f"Error getting GPU info: {e}")
        return None

def check_low_usage(gpu_indices="0,1,2,3", threshold=10):
    """检查指定GPU使用率是否低于阈值"""
    gpu_data = get_gpu_memory_usage(gpu_indices)
    if not gpu_data:
        return False

    for gpu in gpu_data:
        used, total = map(int, gpu.split(', '))
        usage_percent = (used / total) * 100
        if usage_percent >= threshold:
            return False
    return True

def wait_for_gpu_idle(gpu_indices="0,1,2,3", threshold=10, check_interval=180):
    """等待指定GPU空闲"""
    time.sleep(check_interval)
    while True:
        if check_low_usage(gpu_indices=gpu_indices, threshold=threshold):
            print(f"GPUs [{gpu_indices}] have memory usage below {threshold}%. Ready for next command.")
            logger.info(f"GPUs [{gpu_indices}] have memory usage below {threshold}%. Ready for next command.")
            return True
        else:
            current_time = time.strftime("%Y-%m-%d %H:%M:%S")
            print(f"[{current_time}] GPUs [{gpu_indices}] are in use. Waiting {check_interval} seconds...")
            logger.info(f"GPUs [{gpu_indices}] are in use. Waiting {check_interval} seconds...")
            time.sleep(check_interval)

def execute_commands_sequentially(command_list, gpu_indices="0,1,2,3", gpu_threshold=10, check_interval=180):
    """按顺序执行命令列表"""
    completed_commands = []
    
    for i, command in enumerate(command_list, 1):
        print(f"\\n{'='*60}")
        print(f"Processing command {i}/{len(command_list)} on GPUs [{gpu_indices}]")
        print(f"{'='*60}")
        
        # 等待GPU空闲
        wait_for_gpu_idle(gpu_indices=gpu_indices, threshold=gpu_threshold, check_interval=check_interval)
        
        # 准备执行当前命令
        print(f"Executing command {i}: {command[:100]}..." if len(command) > 100 else f"Executing command {i}: {command}")
        logger.info(f"Starting command {i}/{len(command_list)} on GPUs [{gpu_indices}]")
        
        try:
            # 执行命令
            start_time = time.time()
            
            # 设置conda环境
            # subprocess.run('conda deactivate', shell=True, check=False)
            # subprocess.run('conda activate ivtlr-wxr', shell=True, check=False)
            
            # 执行主命令
            result = subprocess.run(command, shell=True, check=False)
            
            end_time = time.time()
            execution_time = end_time - start_time
            
            if result.returncode == 0:
                print(f"✓ Command {i} completed successfully in {execution_time:.1f} seconds")
                logger.info(f"Command {i} completed successfully in {execution_time:.1f} seconds")
                completed_commands.append({
                    'index': i,
                    'command': command[:200],  # 只记录前200个字符
                    'status': 'success',
                    'duration': execution_time
                })
            else:
                print(f"✗ Command {i} failed with return code {result.returncode}")
                logger.error(f"Command {i} failed with return code {result.returncode}")
                completed_commands.append({
                    'index': i,
                    'command': command[:200],
                    'status': f'failed (code: {result.returncode})',
                    'duration': execution_time
                })
                
        except Exception as e:
            print(f"✗ Error executing command {i}: {e}")
            logger.error(f"Error executing command {i}: {e}")
            completed_commands.append({
                'index': i,
                'command': command[:200],
                'status': f'error: {str(e)[:100]}',
                'duration': 0
            })
    
    return completed_commands

import subprocess
import threading

def execute_parallel_tasks(task_group, group_index, total_groups):
    """真正的并行执行任务组"""
    print(f"\\n{'='*80}")
    print(f"Processing Task Group {group_index+1}/{total_groups}")
    print(f"{'='*80}")
    
    threads = []
    results_list = []  # 用于收集结果的列表
    
    def run_task(task, task_index):
        """线程执行函数"""
        command_list = task.get("command_list", [])
        gpu_indices = task.get("GPU_INDEX", "0,1,2,3")
        
        print(f"\\n--- Starting Task {task_index+1}/{len(task_group)} on GPUs [{gpu_indices}] ---")
        
        results = execute_commands_sequentially(
            command_list=command_list,
            gpu_indices=gpu_indices,
            gpu_threshold=GPU_THRESHOLD,
            check_interval=CHECK_INTERVAL
        )
        
        results_list.extend(results)
    
    # 创建并启动线程
    for task_index, task in enumerate(task_group):
        thread = threading.Thread(
            target=run_task,
            args=(task, task_index)
        )
        threads.append(thread)
        thread.start()
    
    # 等待所有线程完成
    for thread in threads:
        thread.join()
    
    return results_list

def main():
    """主函数"""
    print(f"GPU Hunter started with {len(CONFIG)} task groups")
    print(f"GPU threshold: {GPU_THRESHOLD}%")
    print(f"Check interval: {CHECK_INTERVAL} seconds")
    logger.info(f"Starting GPU Hunter with {len(CONFIG)} task groups")
    
    # 依次执行每个任务组
    all_completed_commands = []
    for group_index, task_group in enumerate(CONFIG):
        results = execute_parallel_tasks(task_group, group_index, len(CONFIG))
        all_completed_commands.extend(results)
    
    # 打印执行摘要
    print(f"\\n{'='*80}")
    print("Execution Summary:")
    print('='*80)
    
    success_count = sum(1 for r in all_completed_commands if r['status'] == 'success')
    total_commands = len(all_completed_commands)
    print(f"Total commands executed: {total_commands}")
    print(f"Successfully executed: {success_count}")
    print(f"Failed/Error: {total_commands - success_count}")
    
    if all_completed_commands:
        print("\\nDetailed results:")
        for result in all_completed_commands:
            status_icon = "" if result['status'] == 'success' else ""
            print(f"  {status_icon} Command: {result['status']} "
                  f"({result['duration']:.1f}s)")
    
    print("\\nAll task groups processed. Exiting GPU Hunter.")
    logger.info(f"GPU Hunter completed. Success: {success_count}/{total_commands}")

if __name__ == "__main__":
    main()


Previous
北京百日漂流
Next
Prompts