Spaces:
Runtime error
Runtime error
| from typing import List, Dict | |
| import torch | |
| from loguru import logger | |
| import numpy as np | |
| from iopaint.download import scan_models | |
| from iopaint.helper import switch_mps_device | |
| from iopaint.model import models, ControlNet, SD, SDXL | |
| from iopaint.model.utils import torch_gc, is_local_files_only | |
| from iopaint.schema import InpaintRequest, ModelInfo, ModelType | |
| class ModelManager: | |
| def __init__(self, name: str, device: torch.device, **kwargs): | |
| self.name = name | |
| self.device = device | |
| self.kwargs = kwargs | |
| self.available_models: Dict[str, ModelInfo] = {} | |
| self.scan_models() | |
| self.enable_controlnet = kwargs.get("enable_controlnet", False) | |
| controlnet_method = kwargs.get("controlnet_method", None) | |
| if ( | |
| controlnet_method is None | |
| and name in self.available_models | |
| and self.available_models[name].support_controlnet | |
| ): | |
| controlnet_method = self.available_models[name].controlnets[0] | |
| self.controlnet_method = controlnet_method | |
| self.model = self.init_model(name, device, **kwargs) | |
| def current_model(self) -> ModelInfo: | |
| return self.available_models[self.name] | |
| def init_model(self, name: str, device, **kwargs): | |
| logger.info(f"Loading model: {name}") | |
| if name not in self.available_models: | |
| raise NotImplementedError( | |
| f"Unsupported model: {name}. Available models: {list(self.available_models.keys())}" | |
| ) | |
| model_info = self.available_models[name] | |
| kwargs = { | |
| **kwargs, | |
| "model_info": model_info, | |
| "enable_controlnet": self.enable_controlnet, | |
| "controlnet_method": self.controlnet_method, | |
| } | |
| if model_info.support_controlnet and self.enable_controlnet: | |
| return ControlNet(device, **kwargs) | |
| elif model_info.name in models: | |
| return models[name](device, **kwargs) | |
| else: | |
| if model_info.model_type in [ | |
| ModelType.DIFFUSERS_SD_INPAINT, | |
| ModelType.DIFFUSERS_SD, | |
| ]: | |
| return SD(device, **kwargs) | |
| if model_info.model_type in [ | |
| ModelType.DIFFUSERS_SDXL_INPAINT, | |
| ModelType.DIFFUSERS_SDXL, | |
| ]: | |
| return SDXL(device, **kwargs) | |
| raise NotImplementedError(f"Unsupported model: {name}") | |
| def __call__(self, image, mask, config: InpaintRequest): | |
| """ | |
| Args: | |
| image: [H, W, C] RGB | |
| mask: [H, W, 1] 255 means area to repaint | |
| config: | |
| Returns: | |
| BGR image | |
| """ | |
| self.switch_controlnet_method(config) | |
| self.enable_disable_freeu(config) | |
| self.enable_disable_lcm_lora(config) | |
| return self.model(image, mask, config).astype(np.uint8) | |
| def scan_models(self) -> List[ModelInfo]: | |
| available_models = scan_models() | |
| self.available_models = {it.name: it for it in available_models} | |
| return available_models | |
| def switch(self, new_name: str): | |
| if new_name == self.name: | |
| return | |
| old_name = self.name | |
| old_controlnet_method = self.controlnet_method | |
| self.name = new_name | |
| if ( | |
| self.available_models[new_name].support_controlnet | |
| and self.controlnet_method | |
| not in self.available_models[new_name].controlnets | |
| ): | |
| self.controlnet_method = self.available_models[new_name].controlnets[0] | |
| try: | |
| # TODO: enable/disable controlnet without reload model | |
| del self.model | |
| torch_gc() | |
| self.model = self.init_model( | |
| new_name, switch_mps_device(new_name, self.device), **self.kwargs | |
| ) | |
| except Exception as e: | |
| self.name = old_name | |
| self.controlnet_method = old_controlnet_method | |
| logger.info(f"Switch model from {old_name} to {new_name} failed, rollback") | |
| self.model = self.init_model( | |
| old_name, switch_mps_device(old_name, self.device), **self.kwargs | |
| ) | |
| raise e | |
| def switch_controlnet_method(self, config): | |
| if not self.available_models[self.name].support_controlnet: | |
| return | |
| if ( | |
| self.enable_controlnet | |
| and config.controlnet_method | |
| and self.controlnet_method != config.controlnet_method | |
| ): | |
| old_controlnet_method = self.controlnet_method | |
| self.controlnet_method = config.controlnet_method | |
| self.model.switch_controlnet_method(config.controlnet_method) | |
| logger.info( | |
| f"Switch Controlnet method from {old_controlnet_method} to {config.controlnet_method}" | |
| ) | |
| elif self.enable_controlnet != config.enable_controlnet: | |
| self.enable_controlnet = config.enable_controlnet | |
| self.controlnet_method = config.controlnet_method | |
| pipe_components = { | |
| "vae": self.model.model.vae, | |
| "text_encoder": self.model.model.text_encoder, | |
| "unet": self.model.model.unet, | |
| } | |
| if hasattr(self.model.model, "text_encoder_2"): | |
| pipe_components["text_encoder_2"] = self.model.model.text_encoder_2 | |
| self.model = self.init_model( | |
| self.name, | |
| switch_mps_device(self.name, self.device), | |
| pipe_components=pipe_components, | |
| **self.kwargs, | |
| ) | |
| if not config.enable_controlnet: | |
| logger.info(f"Disable controlnet") | |
| else: | |
| logger.info(f"Enable controlnet: {config.controlnet_method}") | |
| def enable_disable_freeu(self, config: InpaintRequest): | |
| if str(self.model.device) == "mps": | |
| return | |
| if self.available_models[self.name].support_freeu: | |
| if config.sd_freeu: | |
| freeu_config = config.sd_freeu_config | |
| self.model.model.enable_freeu( | |
| s1=freeu_config.s1, | |
| s2=freeu_config.s2, | |
| b1=freeu_config.b1, | |
| b2=freeu_config.b2, | |
| ) | |
| else: | |
| self.model.model.disable_freeu() | |
| def enable_disable_lcm_lora(self, config: InpaintRequest): | |
| if self.available_models[self.name].support_lcm_lora: | |
| # TODO: change this if load other lora is supported | |
| lcm_lora_loaded = bool(self.model.model.get_list_adapters()) | |
| if config.sd_lcm_lora: | |
| if not lcm_lora_loaded: | |
| self.model.model.load_lora_weights( | |
| self.model.lcm_lora_id, | |
| weight_name="pytorch_lora_weights.safetensors", | |
| local_files_only=is_local_files_only(), | |
| ) | |
| else: | |
| if lcm_lora_loaded: | |
| self.model.model.disable_lora() | |