| | import os |
| | import sys |
| | import torch |
| | import numpy as np |
| | import gradio as gr |
| | import soundfile as sf |
| | import tempfile |
| | import hashlib |
| | import requests |
| | import socket |
| | from huggingface_hub import snapshot_download |
| |
|
| | |
| | os.environ["HF_HUB_ENABLE_HF_TRANSFER"] = "0" |
| |
|
| | def sync_model_files(): |
| | """智能同步:优先保证离线可用,仅在在线且文件缺失时强制同步""" |
| | repo_id = "shawnpi/HQ-SVC" |
| | |
| | |
| | model_pth = "utils/pretrain/250000_step_val_loss_0.50.pth" |
| | vocoder_dir = "utils/pretrain/nsf_hifigan/model" |
| | rmvpe_path = "utils/pretrain/rmvpe/model.pt" |
| | |
| | local_exists = os.path.exists(model_pth) and os.path.exists(vocoder_dir) |
| | |
| | if local_exists: |
| | print(">>> [离线模式] 检测到本地权重已完整") |
| | return |
| |
|
| | |
| | print(">>> [同步模式] 本地权重不完整,正在检测网络以获取权重...") |
| |
|
| | try: |
| | snapshot_download( |
| | repo_id=repo_id, |
| | allow_patterns=["utils/pretrain/*", "config.json"], |
| | local_dir=".", |
| | local_dir_use_symlinks=False, |
| | |
| | resume_download=True |
| | ) |
| | print(">>> 权重同步完成。") |
| | except Exception as e: |
| | if local_exists: |
| | print(f">>> 同步失败但本地已有文件,将尝试继续运行。错误: {e}") |
| | else: |
| | print(f">>> [严重错误] 同步失败且本地缺少权重,程序可能无法运行: {e}") |
| |
|
| | |
| | sync_model_files() |
| |
|
| | |
| | now_dir = os.path.dirname(os.path.abspath(__file__)) |
| | sys.path.append(now_dir) |
| | utils_path = os.path.join(now_dir, 'utils') |
| | if utils_path not in sys.path: |
| | sys.path.append(utils_path) |
| |
|
| | from logger.utils import load_config |
| | from utils.models.models_v2_beta import load_hq_svc |
| | from utils.vocoder import Vocoder |
| | from utils.data_preprocessing import load_facodec, load_f0_extractor, load_volume_extractor, get_processed_file |
| |
|
| | |
| | NET_G = None |
| | VOCODER = None |
| | ARGS = None |
| | PREPROCESSORS = {} |
| | TARGET_CACHE = {"file_hash": None, "spk_ave": None, "all_tar_f0": None} |
| |
|
| | def initialize_models(config_path): |
| | global NET_G, VOCODER, ARGS, PREPROCESSORS |
| | ARGS = load_config(config_path) |
| | ARGS.config = config_path |
| | device = ARGS.device |
| | |
| | |
| | VOCODER = Vocoder(vocoder_type='nsf-hifigan', vocoder_ckpt='utils/pretrain/nsf_hifigan/model', device=device) |
| | NET_G = load_hq_svc(mode='infer', device=device, model_path=ARGS.model_path, args=ARGS) |
| | NET_G.eval() |
| | |
| | fa_encoder, fa_decoder = load_facodec(device) |
| | PREPROCESSORS = { |
| | "fa_encoder": fa_encoder, "fa_decoder": fa_decoder, |
| | "f0_extractor": load_f0_extractor(ARGS), |
| | "volume_extractor": load_volume_extractor(ARGS), |
| | "content_encoder": None, "spk_encoder": None |
| | } |
| |
|
| | |
| | def predict(source_audio, target_files, shift_key, adjust_f0): |
| | global TARGET_CACHE |
| | if source_audio is None: |
| | return "⚠️ 系统提示:未检测到源音频。请确保文件已上传完毕。", None |
| |
|
| | if not os.path.exists(source_audio): |
| | return "❌ 系统错误:找不到音频文件,请重新上传。", None |
| |
|
| | sr, encoder_sr, device = ARGS.sample_rate, ARGS.encoder_sr, ARGS.device |
| |
|
| | try: |
| | with torch.no_grad(): |
| | is_reconstruction = (target_files is None or len(target_files) == 0) |
| | target_names = "".join([f.name if hasattr(f, 'name') else f for f in (target_files or [])]) |
| | current_hash = hashlib.md5(target_names.encode()).hexdigest() |
| | |
| | if is_reconstruction: |
| | t_data = get_processed_file(source_audio, sr, encoder_sr, VOCODER, PREPROCESSORS["volume_extractor"], PREPROCESSORS["f0_extractor"], PREPROCESSORS["fa_encoder"], PREPROCESSORS["fa_decoder"], None, None, device=device) |
| | spk_ave, all_tar_f0 = t_data['spk'].squeeze().to(device), t_data['f0_origin'] |
| | status = "✨ Super-Resolution" |
| | elif TARGET_CACHE["file_hash"] == current_hash: |
| | spk_ave, all_tar_f0 = TARGET_CACHE["spk_ave"], TARGET_CACHE["all_tar_f0"] |
| | status = "🚀 Cache Loaded" |
| | else: |
| | spk_list, f0_list = [], [] |
| | for f in (target_files[:20] if target_files else []): |
| | f_path = f.name if hasattr(f, 'name') else f |
| | if not f_path or not os.path.exists(f_path): continue |
| | t_data = get_processed_file(f_path, sr, encoder_sr, VOCODER, PREPROCESSORS["volume_extractor"], PREPROCESSORS["f0_extractor"], PREPROCESSORS["fa_encoder"], PREPROCESSORS["fa_decoder"], None, None, device=device) |
| | if t_data: |
| | spk_list.append(t_data['spk']) |
| | f0_list.append(t_data['f0_origin']) |
| | |
| | if not spk_list: return "❌ 终端提示:参考音频处理失败。", None |
| | spk_ave = torch.stack(spk_list).mean(dim=0).squeeze().to(device) |
| | all_tar_f0 = np.concatenate(f0_list) |
| | TARGET_CACHE.update({"file_hash": current_hash, "spk_ave": spk_ave, "all_tar_f0": all_tar_f0}) |
| | status = "✅ VOICE CONVERSION" |
| |
|
| | src_data = get_processed_file(source_audio, sr, encoder_sr, VOCODER, PREPROCESSORS["volume_extractor"], PREPROCESSORS["f0_extractor"], PREPROCESSORS["fa_encoder"], PREPROCESSORS["fa_decoder"], None, None, device=device) |
| | f0 = src_data['f0'].unsqueeze(0).to(device) |
| | |
| | if adjust_f0 and not is_reconstruction: |
| | src_f0_valid = src_data['f0_origin'][src_data['f0_origin'] > 0] |
| | tar_f0_valid = all_tar_f0[all_tar_f0 > 0] |
| | if len(src_f0_valid) > 0 and len(tar_f0_valid) > 0: |
| | shift_key = round(12 * np.log2(tar_f0_valid.mean() / src_f0_valid.mean())) |
| | |
| | f0 = f0 * 2 ** (float(shift_key) / 12) |
| | mel_g = NET_G(src_data['vq_post'].unsqueeze(0).to(device), f0, src_data['vol'].unsqueeze(0).to(device), spk_ave, gt_spec=None, infer=True, infer_speedup=ARGS.infer_speedup, method=ARGS.infer_method, vocoder=VOCODER) |
| | wav_g = VOCODER.infer(mel_g, f0) if ARGS.vocoder == 'nsf-hifigan' else VOCODER.infer(mel_g) |
| | |
| | out_p = tempfile.mktemp(suffix=".wav") |
| | sf.write(out_p, wav_g.squeeze().cpu().numpy(), 44100) |
| | return f"{status} | Pitch Shifted: {shift_key}", out_p |
| | except Exception as e: |
| | return f"❌ 推理运行出错:{str(e)}", None |
| |
|
| | custom_css = """ |
| | @import url('https://fonts.googleapis.com/css2?family=Press+Start+2P&display=swap'); |
| | :root { --font: 'Press Start 2P', cursive !important; } |
| | * { font-family: 'Press Start 2P', cursive !important; border-radius: 0px !important; } |
| | .gradio-container { |
| | background: linear-gradient(rgba(0,0,0,0.85), rgba(0,0,0,0.85)), |
| | url('https://img.moegirl.org.cn/common/d/d3/K-ON_key_visual_2.jpg'); |
| | background-size: cover; |
| | } |
| | .gr-box, .gr-input, .gr-button { border: 4px solid #000 !important; box-shadow: 8px 8px 0px #000 !important; } |
| | label, p, .time-info { color: #f36c18 !important; font-size: 10px !important; text-transform: uppercase; } |
| | h1 { color: #FFFF00 !important; text-shadow: 4px 4px 0px #000 !important; text-align: center; } |
| | button.primary { background-color: #ff69b4 !important; color: #fff !important; } |
| | footer { display: none !important; } |
| | """ |
| |
|
| | |
| | def build_ui(): |
| | with gr.Blocks(css=custom_css, title="HQ-SVC Pixel Pro") as demo: |
| | gr.HTML('<div style="text-align:center; margin:20px 0;"><img src="file/images/kon-new.gif" style="max-width:400px; border:4px solid #000; box-shadow:8px 8px 0px #000;"></div>') |
| | gr.Markdown("# 🎸HQ-SVC: SINGING VOICE CONVERSION AND SUPER-RESOLUTION🍰") |
| | |
| | with gr.Row(): |
| | with gr.Column(): |
| | src_audio = gr.Audio(label="STEP 1: SOURCE VOICE", type="filepath") |
| | tar_files = gr.File(label="STEP 2: TARGET REFERENCE", file_count="multiple") |
| | with gr.Row(): |
| | key_shift = gr.Number(label="PITCH SHIFT", value=0) |
| | auto_f0 = gr.Checkbox(label="AUTO PITCH", value=False) |
| | run_btn = gr.Button("🎤 START CONVERSION!", variant="primary") |
| | |
| | with gr.Column(): |
| | status_box = gr.Textbox(label="SYSTEM TERMINAL", interactive=False) |
| | result_audio = gr.Audio(label="OUTPUT (44.1kHz HQ)") |
| |
|
| | run_btn.click(predict, [src_audio, tar_files, key_shift, auto_f0], [status_box, result_audio]) |
| | return demo |
| |
|
| | if __name__ == "__main__": |
| | config_p = "configs/hq_svc_infer.yaml" |
| | if os.path.exists(config_p): |
| | initialize_models(config_p) |
| | |
| | demo = build_ui() |
| | temp_dir = tempfile.gettempdir() |
| | demo.launch( |
| | share=True, |
| | allowed_paths=[os.path.join(os.path.dirname(__file__), "images"), os.path.dirname(__file__), temp_dir] |
| | ) |