|
|
|
|
|
import os |
|
|
import sys |
|
|
import traceback |
|
|
import json |
|
|
import argparse |
|
|
import torch |
|
|
from pathlib import Path |
|
|
from distributed_utils import setup_distributed, setup_environment, cleanup_distributed, RankZeroOnly |
|
|
|
|
|
def main(): |
|
|
|
|
|
parser = argparse.ArgumentParser(description="Humigence Training Launcher") |
|
|
parser.add_argument("--config", type=str, required=True, help="Path to configuration file") |
|
|
parser.add_argument("--fallback_single_gpu", action="store_true", help="Force single GPU training") |
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
ddp = False |
|
|
is_main = True |
|
|
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") |
|
|
|
|
|
|
|
|
setup_environment() |
|
|
|
|
|
try: |
|
|
|
|
|
ddp, rank, local_rank, world_size, device = setup_distributed() |
|
|
is_main = (rank == 0) |
|
|
|
|
|
with RankZeroOnly(is_main) as rank_zero: |
|
|
rank_zero.print(f"Training Mode: {'DDP' if ddp else 'Single-GPU'} " |
|
|
f"(world_size={world_size}, rank={rank}, local_rank={local_rank}, device={device})") |
|
|
|
|
|
|
|
|
with open(args.config, 'r') as f: |
|
|
config = json.load(f) |
|
|
|
|
|
|
|
|
config.update({ |
|
|
"device": str(device), |
|
|
"ddp": ddp, |
|
|
"rank": rank, |
|
|
"world_size": world_size, |
|
|
"is_main": is_main, |
|
|
"local_rank": local_rank, |
|
|
}) |
|
|
|
|
|
|
|
|
from pipelines.production_pipeline import ProductionPipeline |
|
|
|
|
|
|
|
|
pipeline = ProductionPipeline(config) |
|
|
|
|
|
|
|
|
results = pipeline.run() |
|
|
|
|
|
|
|
|
cleanup_distributed() |
|
|
|
|
|
return results |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
cleanup_distributed() |
|
|
|
|
|
|
|
|
error_msg = f"Training error: {type(e).__name__}: {e}" |
|
|
print(error_msg, file=sys.stderr) |
|
|
|
|
|
|
|
|
if _should_fallback_to_single_gpu(e): |
|
|
if is_main: |
|
|
print("DDP failed, falling back to single-GPU...") |
|
|
return _run_single_gpu_fallback(args.config) |
|
|
else: |
|
|
|
|
|
raise |
|
|
|
|
|
def _should_fallback_to_single_gpu(error: Exception) -> bool: |
|
|
"""Determine if error warrants single-GPU fallback""" |
|
|
fallback_errors = ( |
|
|
AttributeError, |
|
|
RuntimeError, |
|
|
ConnectionError, |
|
|
) |
|
|
return isinstance(error, fallback_errors) |
|
|
|
|
|
def _run_single_gpu_fallback(config_path: str): |
|
|
"""Clean single-GPU fallback implementation""" |
|
|
|
|
|
os.environ["CUDA_VISIBLE_DEVICES"] = "0" |
|
|
|
|
|
|
|
|
if torch.distributed.is_initialized(): |
|
|
torch.distributed.destroy_process_group() |
|
|
|
|
|
|
|
|
with open(config_path, 'r') as f: |
|
|
config = json.load(f) |
|
|
|
|
|
|
|
|
config.update({ |
|
|
"device": "cuda:0", |
|
|
"ddp": False, |
|
|
"rank": 0, |
|
|
"world_size": 1, |
|
|
"is_main": True, |
|
|
"local_rank": 0, |
|
|
"multi_gpu": False, |
|
|
"use_distributed": False, |
|
|
}) |
|
|
|
|
|
print("Running single-GPU fallback training...") |
|
|
|
|
|
try: |
|
|
from pipelines.production_pipeline import ProductionPipeline |
|
|
pipeline = ProductionPipeline(config) |
|
|
return pipeline.run() |
|
|
except Exception as e: |
|
|
print(f"Single-GPU fallback also failed: {e}") |
|
|
return {"status": "error", "message": str(e)} |
|
|
|
|
|
if __name__ == "__main__": |
|
|
try: |
|
|
results = main() |
|
|
if results and results.get("status") == "success": |
|
|
sys.exit(0) |
|
|
else: |
|
|
sys.exit(1) |
|
|
except KeyboardInterrupt: |
|
|
print("\nTraining interrupted by user") |
|
|
sys.exit(1) |
|
|
except Exception as e: |
|
|
print(f"Training failed: {e}") |
|
|
traceback.print_exc() |
|
|
sys.exit(1) |
|
|
|