在急着跑实验时可以使用该脚本挂机监测GPU占用情况并在满足条件时开启训练
import subprocess
import time
import logging
import os
# =============================================================================
# GPU Hunter Configuration
# =============================================================================
# 配置结构说明:
# CONFIG 是一个二维列表,每个子列表代表一组并行执行的任务
# 每个任务是一个字典,包含 command_list 和 GPU_INDEX 键
# 同一子列表中的任务会并行执行(使用不同的GPU)
# 不同子列表之间串行执行(后面的子列表会等待前面的全部完成)
CONFIG = [
# 第一组任务(并行执行)
[
{
"command_list": [
"bash scripts.sh",
],
"GPU_INDEX": "1,2,3"
},
{
"command_list": [
"bash scripts.sh",
"bash scripts.sh",
],
"GPU_INDEX": "4,5,6,7"
}
],
[
{
"command_list": [
"conda deactivate",
"conda activate ivtlr-wxr",
"bash bash scripts.sh"
],
"GPU_INDEX": "1,2,3,5,6,7"
}
]
# 可以继续添加更多组任务...
# 每组任务会在前一组全部完成后才开始执行
]
# 全局配置参数
CHECK_INTERVAL = 600 # GPU检查间隔(秒)
GPU_THRESHOLD = 10 # GPU使用率阈值(%)
LOG_FILE_PATH = "gpu_hunter.log"
# =============================================================================
# Logger Setup
# =============================================================================
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
if not logger.handlers:
file_handler = logging.FileHandler(LOG_FILE_PATH)
formatter = logging.Formatter('%(asctime)s - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
print("start hunting")
def get_gpu_memory_usage(gpu_indices="0,1,2,3"):
"""获取指定GPU的内存使用情况"""
try:
output = subprocess.check_output(
["nvidia-smi", "--query-gpu=memory.used,memory.total",
"--format=csv,noheader,nounits", "-i", gpu_indices],
universal_newlines=True
)
return output.strip().split('\\n')
except Exception as e:
logger.error(f"Error getting GPU info: {e}")
print(f"Error getting GPU info: {e}")
return None
def check_low_usage(gpu_indices="0,1,2,3", threshold=10):
"""检查指定GPU使用率是否低于阈值"""
gpu_data = get_gpu_memory_usage(gpu_indices)
if not gpu_data:
return False
for gpu in gpu_data:
used, total = map(int, gpu.split(', '))
usage_percent = (used / total) * 100
if usage_percent >= threshold:
return False
return True
def wait_for_gpu_idle(gpu_indices="0,1,2,3", threshold=10, check_interval=180):
"""等待指定GPU空闲"""
time.sleep(check_interval)
while True:
if check_low_usage(gpu_indices=gpu_indices, threshold=threshold):
print(f"GPUs [{gpu_indices}] have memory usage below {threshold}%. Ready for next command.")
logger.info(f"GPUs [{gpu_indices}] have memory usage below {threshold}%. Ready for next command.")
return True
else:
current_time = time.strftime("%Y-%m-%d %H:%M:%S")
print(f"[{current_time}] GPUs [{gpu_indices}] are in use. Waiting {check_interval} seconds...")
logger.info(f"GPUs [{gpu_indices}] are in use. Waiting {check_interval} seconds...")
time.sleep(check_interval)
def execute_commands_sequentially(command_list, gpu_indices="0,1,2,3", gpu_threshold=10, check_interval=180):
"""按顺序执行命令列表"""
completed_commands = []
for i, command in enumerate(command_list, 1):
print(f"\\n{'='*60}")
print(f"Processing command {i}/{len(command_list)} on GPUs [{gpu_indices}]")
print(f"{'='*60}")
# 等待GPU空闲
wait_for_gpu_idle(gpu_indices=gpu_indices, threshold=gpu_threshold, check_interval=check_interval)
# 准备执行当前命令
print(f"Executing command {i}: {command[:100]}..." if len(command) > 100 else f"Executing command {i}: {command}")
logger.info(f"Starting command {i}/{len(command_list)} on GPUs [{gpu_indices}]")
try:
# 执行命令
start_time = time.time()
# 设置conda环境
# subprocess.run('conda deactivate', shell=True, check=False)
# subprocess.run('conda activate ivtlr-wxr', shell=True, check=False)
# 执行主命令
result = subprocess.run(command, shell=True, check=False)
end_time = time.time()
execution_time = end_time - start_time
if result.returncode == 0:
print(f"✓ Command {i} completed successfully in {execution_time:.1f} seconds")
logger.info(f"Command {i} completed successfully in {execution_time:.1f} seconds")
completed_commands.append({
'index': i,
'command': command[:200], # 只记录前200个字符
'status': 'success',
'duration': execution_time
})
else:
print(f"✗ Command {i} failed with return code {result.returncode}")
logger.error(f"Command {i} failed with return code {result.returncode}")
completed_commands.append({
'index': i,
'command': command[:200],
'status': f'failed (code: {result.returncode})',
'duration': execution_time
})
except Exception as e:
print(f"✗ Error executing command {i}: {e}")
logger.error(f"Error executing command {i}: {e}")
completed_commands.append({
'index': i,
'command': command[:200],
'status': f'error: {str(e)[:100]}',
'duration': 0
})
return completed_commands
import subprocess
import threading
def execute_parallel_tasks(task_group, group_index, total_groups):
"""真正的并行执行任务组"""
print(f"\\n{'='*80}")
print(f"Processing Task Group {group_index+1}/{total_groups}")
print(f"{'='*80}")
threads = []
results_list = [] # 用于收集结果的列表
def run_task(task, task_index):
"""线程执行函数"""
command_list = task.get("command_list", [])
gpu_indices = task.get("GPU_INDEX", "0,1,2,3")
print(f"\\n--- Starting Task {task_index+1}/{len(task_group)} on GPUs [{gpu_indices}] ---")
results = execute_commands_sequentially(
command_list=command_list,
gpu_indices=gpu_indices,
gpu_threshold=GPU_THRESHOLD,
check_interval=CHECK_INTERVAL
)
results_list.extend(results)
# 创建并启动线程
for task_index, task in enumerate(task_group):
thread = threading.Thread(
target=run_task,
args=(task, task_index)
)
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
return results_list
def main():
"""主函数"""
print(f"GPU Hunter started with {len(CONFIG)} task groups")
print(f"GPU threshold: {GPU_THRESHOLD}%")
print(f"Check interval: {CHECK_INTERVAL} seconds")
logger.info(f"Starting GPU Hunter with {len(CONFIG)} task groups")
# 依次执行每个任务组
all_completed_commands = []
for group_index, task_group in enumerate(CONFIG):
results = execute_parallel_tasks(task_group, group_index, len(CONFIG))
all_completed_commands.extend(results)
# 打印执行摘要
print(f"\\n{'='*80}")
print("Execution Summary:")
print('='*80)
success_count = sum(1 for r in all_completed_commands if r['status'] == 'success')
total_commands = len(all_completed_commands)
print(f"Total commands executed: {total_commands}")
print(f"Successfully executed: {success_count}")
print(f"Failed/Error: {total_commands - success_count}")
if all_completed_commands:
print("\\nDetailed results:")
for result in all_completed_commands:
status_icon = "✓" if result['status'] == 'success' else "✗"
print(f" {status_icon} Command: {result['status']} "
f"({result['duration']:.1f}s)")
print("\\nAll task groups processed. Exiting GPU Hunter.")
logger.info(f"GPU Hunter completed. Success: {success_count}/{total_commands}")
if __name__ == "__main__":
main()