diff --git a/cherry_picknick.py b/cherry_picknick.py deleted file mode 100644 index ec2c9f5..0000000 --- a/cherry_picknick.py +++ /dev/null @@ -1,105 +0,0 @@ -# Copyright 2022 Lunar Ring. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os, sys -import torch -torch.backends.cudnn.benchmark = False -import numpy as np -import warnings -warnings.filterwarnings('ignore') -import warnings -import torch -from tqdm.auto import tqdm -from PIL import Image -import matplotlib.pyplot as plt -import torch -from movie_util import MovieSaver -from typing import Callable, List, Optional, Union -from latent_blending import LatentBlending, add_frames_linear_interp -from stable_diffusion_holder import StableDiffusionHolder -torch.set_grad_enabled(False) - - -#%% First let us spawn a stable diffusion holder -use_inpaint = True - -device = "cuda" -fp_ckpt= "../stable_diffusion_models/ckpt/512-inpainting-ema.ckpt" -fp_config = '../stablediffusion/configs//stable-diffusion/v2-inpainting-inference.yaml' - - -# fp_ckpt = "../stable_diffusion_models/ckpt/768-v-ema.ckpt" -# fp_config = '../stablediffusion/configs/stable-diffusion/v2-inference-v.yaml' - -sdh = StableDiffusionHolder(fp_ckpt, fp_config, device) - - - -#%% Next let's set up all parameters -num_inference_steps = 30 # Number of diffusion interations - -guidance_scale = 5 - -lb = LatentBlending(sdh, num_inference_steps, guidance_scale) - -list_prompts = [] -list_prompts.append("photo of a beautiful forest covered in white flowers, ambient light, very detailed, magic") -list_prompts.append("photo of an golden statue with a funny hat, surrounded by ferns and vines, grainy analog photograph, mystical ambience, incredible detail") - - -for k, prompt in enumerate(list_prompts): - # k = 6 - - # prompt = list_prompts[k] - for i in range(10): - lb.set_prompt1(prompt) - - seed = np.random.randint(999999999) - lb.set_seed(seed) - plt.imshow(lb.run_diffusion(lb.text_embedding1, return_image=True)) - plt.title(f"prompt {k}, seed {i} {seed}") - plt.show() - print(f"prompt {k} seed {seed} trial {i}") - -#%% -#%% Let's make a source image and mask. -k=0 -for i in range(10): - seed = 190791709# np.random.randint(999999999) -# seed0 = 629575320 - - lb = LatentBlending(sdh) - lb.autosetup_branching(quality='medium', depth_strength=0.65) - - prompt1 = "photo of a futuristic alien temple in a desert, mystic, glowing, organic, intricate, sci-fi movie, mesmerizing, scary" - lb.set_prompt1(prompt1) - lb.init_inpainting(init_empty=True) - lb.set_seed(seed) - plt.imshow(lb.run_diffusion(lb.text_embedding1, return_image=True)) - plt.title(f"prompt1 {k}, seed {i} {seed}") - plt.show() - print(f"prompt1 {k} seed {seed} trial {i}") - - xx - -#%% -mask_image = 255*np.ones([512,512], dtype=np.uint8) -mask_image[340:420, 170:280, ] = 0 -mask_image = Image.fromarray(mask_image) - -#%% - -""" -69731932, 504430820 -""" \ No newline at end of file diff --git a/latent_blending.py b/latent_blending.py index a54b4aa..01bc565 100644 --- a/latent_blending.py +++ b/latent_blending.py @@ -27,7 +27,7 @@ import warnings import torch from tqdm.auto import tqdm from PIL import Image -import matplotlib.pyplot as plt +# import matplotlib.pyplot as plt import torch from movie_util import MovieSaver import datetime @@ -41,7 +41,10 @@ from contextlib import nullcontext from ldm.models.diffusion.ddim import DDIMSampler from ldm.util import instantiate_from_config +from ldm.models.diffusion.ddpm import LatentUpscaleDiffusion, LatentInpaintDiffusion from stable_diffusion_holder import StableDiffusionHolder +import yaml + #%% class LatentBlending(): def __init__( @@ -49,7 +52,7 @@ class LatentBlending(): sdh: None, guidance_scale: float = 4, guidance_scale_mid_damper: float = 0.5, - mid_compression_scaler: float = 2.0, + mid_compression_scaler: float = 1.2, ): r""" Initializes the latent blending class. @@ -77,7 +80,8 @@ class LatentBlending(): self.height = self.sdh.height self.guidance_scale_mid_damper = guidance_scale_mid_damper self.mid_compression_scaler = mid_compression_scaler - self.seed = 420 # Run self.set_seed or fixed_seeds argument in run_transition + self.seed1 = 0 + self.seed2 = 0 # Initialize vars self.prompt1 = "" @@ -90,20 +94,25 @@ class LatentBlending(): self.list_injection_idx_prev = [] self.text_embedding1 = None self.text_embedding2 = None + self.image1_lowres = None + self.image2_lowres = None self.stop_diffusion = False self.negative_prompt = None - self.num_inference_steps = -1 + self.num_inference_steps = self.sdh.num_inference_steps + self.noise_level_upscaling = 20 self.list_injection_idx = None self.list_nmb_branches = None self.set_guidance_scale(guidance_scale) self.init_mode() - def init_mode(self, mode='standard'): + def init_mode(self): r""" - Sets the mode of this class, either inpaint of standard. + Sets the operational mode. Currently supported are standard, inpainting and x4 upscaling. """ - if mode == 'inpaint': + if isinstance(self.sdh.model, LatentUpscaleDiffusion): + self.mode = 'upscale' + elif isinstance(self.sdh.model, LatentInpaintDiffusion): self.sdh.image_source = None self.sdh.mask_image = None self.mode = 'inpaint' @@ -152,10 +161,26 @@ class LatentBlending(): self.prompt2 = prompt self.text_embedding2 = self.get_text_embeddings(self.prompt2) - def autosetup_branching( + def set_image1(self, image: Image): + r""" + Sets the first image (keyframe), relevant for the upscaling model transitions. + Args: + image: Image + """ + self.image1_lowres = image + + def set_image2(self, image: Image): + r""" + Sets the second image (keyframe), relevant for the upscaling model transitions. + Args: + image: Image + """ + self.image2_lowres = image + + def load_branching_profile( self, quality: str = 'medium', - deepth_strength: float = 0.65, + depth_strength: float = 0.65, nmb_frames: int = 360, nmb_mindist: int = 3, ): @@ -167,7 +192,7 @@ class LatentBlending(): Determines how many diffusion steps are being made + how many branches in total. Tradeoff between quality and speed of computation. Choose: lowest, low, medium, high, ultra - deepth_strength: float = 0.65, + depth_strength: float = 0.65, Determines how deep the first injection will happen. Deeper injections will cause (unwanted) formation of new structures, more shallow values will go into alpha-blendy land. @@ -175,7 +200,6 @@ class LatentBlending(): total number of frames nmb_mindist: int = 3 minimum distance in terms of diffusion iteratinos between subsequent injections - """ if quality == 'lowest': @@ -193,10 +217,42 @@ class LatentBlending(): elif quality == 'ultra': num_inference_steps = 100 nmb_branches_final = nmb_frames//2 + elif quality == 'upscaling_step1': + num_inference_steps = 40 + nmb_branches_final = 12 + elif quality == 'upscaling_step2': + num_inference_steps = 100 + nmb_branches_final = 4 else: - raise ValueError("quality = '{quality}' not supported") + raise ValueError(f"quality = '{quality}' not supported") + + self.autosetup_branching(depth_strength, num_inference_steps, nmb_branches_final) - idx_injection_first = int(np.round(num_inference_steps*deepth_strength)) + + def autosetup_branching( + self, + depth_strength: float = 0.65, + num_inference_steps: int = 30, + nmb_branches_final: int = 20, + nmb_mindist: int = 3, + ): + r""" + Automatically sets up the branching schedule. + + Args: + depth_strength: float = 0.65, + Determines how deep the first injection will happen. + Deeper injections will cause (unwanted) formation of new structures, + more shallow values will go into alpha-blendy land. + num_inference_steps: int + Number of diffusion steps. Larger values will take more compute time. + nmb_branches_final (int): The number of diffusion-generated images + at the end of the inference. + nmb_mindist (int): The minimum number of diffusion steps + between two injections. + """ + + idx_injection_first = int(np.round(num_inference_steps*depth_strength)) idx_injection_last = num_inference_steps - 3 nmb_injections = int(np.floor(num_inference_steps/5)) - 1 @@ -219,10 +275,6 @@ class LatentBlending(): list_injection_idx = list_injection_idx_clean list_nmb_branches = list_nmb_branches_clean - # print(f"num_inference_steps: {num_inference_steps}") - # print(f"list_injection_idx: {list_injection_idx}") - # print(f"list_nmb_branches: {list_nmb_branches}") - list_nmb_branches = list_nmb_branches list_injection_idx = list_injection_idx self.setup_branching(num_inference_steps, list_nmb_branches=list_nmb_branches, list_injection_idx=list_injection_idx) @@ -313,6 +365,7 @@ class LatentBlending(): recycle_img1: Optional[bool] = False, recycle_img2: Optional[bool] = False, fixed_seeds: Optional[List[int]] = None, + premature_stop: Optional[int] = np.inf, ): r""" Returns a list of transition images using spherical latent blending. @@ -324,6 +377,8 @@ class LatentBlending(): fixed_seeds: Optional[List[int)]: You can supply two seeds that are used for the first and second keyframe (prompt1 and prompt2). Otherwise random seeds will be taken. + premature_stop: Optional[int]: + Stop the computation after premature_stop frames have been computed in the transition """ # Sanity checks first @@ -336,28 +391,16 @@ class LatentBlending(): fixed_seeds = list(np.random.randint(0, 1000000, 2).astype(np.int32)) else: assert len(fixed_seeds)==2, "Supply a list with len = 2" - + + self.seed1 = fixed_seeds[0] + self.seed2 = fixed_seeds[1] + # Process interruption variable self.stop_diffusion = False # Ensure correct num_inference_steps in holder self.sdh.num_inference_steps = self.num_inference_steps - # # Recycling? There are requirements - # if recycle_img1 or recycle_img2: - # # if self.list_nmb_branches_prev == []: - # # print("Warning. You want to recycle but there is nothing here. Disabling recycling.") - # # recycle_img1 = False - # # recycle_img2 = False - # if self.list_nmb_branches_prev != self.list_nmb_branches: - # print("Warning. Cannot change list_nmb_branches if recycling latent. Disabling recycling.") - # recycle_img1 = False - # recycle_img2 = False - # elif self.list_injection_idx_prev != self.list_injection_idx: - # print("Warning. Cannot change list_nmb_branches if recycling latent. Disabling recycling.") - # recycle_img1 = False - # recycle_img2 = False - # Make a backup for future reference self.list_nmb_branches_prev = self.list_nmb_branches[:] self.list_injection_idx_prev = self.list_injection_idx[:] @@ -415,15 +458,19 @@ class LatentBlending(): # Diffusion computations start here time_start = time.time() - for t_block, idx_branch in tqdm(list_compute, desc="computing transition", smoothing=-1): + for t_block, idx_branch in tqdm(list_compute, desc="computing transition", smoothing=0.01): if self.stop_diffusion: print("run_transition: process interrupted") return self.tree_final_imgs + if idx_branch > premature_stop: + print(f"run_transition: premature_stop criterion reached. returning tree with {premature_stop} branches") + return self.tree_final_imgs # print(f"computing t_block {t_block} idx_branch {idx_branch}") idx_stop = self.list_injection_idx_ext[t_block+1] fract_mixing = self.tree_fracts[t_block][idx_branch] - text_embeddings_mix = interpolate_linear(self.text_embedding1, self.text_embedding2, fract_mixing) + + list_conditionings = self.get_mixed_conditioning(fract_mixing) self.set_guidance_mid_dampening(fract_mixing) # print(f"fract_mixing {fract_mixing} guid {self.sdh.guidance_scale}") if t_block == 0: @@ -432,7 +479,7 @@ class LatentBlending(): self.set_seed(fixed_seeds[0]) elif idx_branch == self.list_nmb_branches[0] -1: self.set_seed(fixed_seeds[1]) - list_latents = self.run_diffusion(text_embeddings_mix, idx_stop=idx_stop) + list_latents = self.run_diffusion(list_conditionings, idx_stop=idx_stop) else: # find parents latents b_parent1, b_parent2 = get_closest_idx(fract_mixing, self.tree_fracts[t_block-1]) @@ -444,7 +491,7 @@ class LatentBlending(): idx_start = self.list_injection_idx_ext[t_block] fract_mixing_parental = (fract_mixing - self.tree_fracts[t_block-1][b_parent1]) / (self.tree_fracts[t_block-1][b_parent2] - self.tree_fracts[t_block-1][b_parent1]) latents_for_injection = interpolate_spherical(latents1, latents2, fract_mixing_parental) - list_latents = self.run_diffusion(text_embeddings_mix, latents_for_injection, idx_start=idx_start, idx_stop=idx_stop) + list_latents = self.run_diffusion(list_conditionings, latents_for_injection, idx_start=idx_start, idx_stop=idx_stop) self.tree_latents[t_block][idx_branch] = list_latents self.tree_status[t_block][idx_branch] = 'computed' @@ -459,21 +506,20 @@ class LatentBlending(): def run_multi_transition( self, + fp_movie: str, list_prompts: List[str], list_seeds: List[int] = None, - ms: MovieSaver = None, fps: float = 24, duration_single_trans: float = 15, ): r""" Runs multiple transitions and stitches them together. You can supply the seeds for each prompt. Args: + fp_movie: file path for movie saving list_prompts: List[float]: list of the prompts. There will be a transition starting from the first to the last. list_seeds: List[int] = None: Random Seeds for each prompt. - ms: MovieSaver - You need to spawn a moviesaver instance. fps: float: frames per second duration_single_trans: float: @@ -486,7 +532,8 @@ class LatentBlending(): if list_seeds is None: list_seeds = list(np.random.randint(0, 10e10, len(list_prompts))) - + + ms = MovieSaver(fp_movie, fps=fps) for i in range(len(list_prompts)-1): print(f"Starting movie segment {i+1}/{len(list_prompts)-1}") @@ -516,7 +563,7 @@ class LatentBlending(): @torch.no_grad() def run_diffusion( self, - text_embeddings: torch.FloatTensor, + list_conditionings, latents_for_injection: torch.FloatTensor = None, idx_start: int = -1, idx_stop: int = -1, @@ -527,8 +574,7 @@ class LatentBlending(): Depending on the mode, the correct one will be executed. Args: - text_embeddings: torch.FloatTensor - Text embeddings used for diffusion + list_conditionings: List of all conditionings for the diffusion model. latents_for_injection: torch.FloatTensor Latents that are used for injection idx_start: int @@ -541,15 +587,131 @@ class LatentBlending(): # Ensure correct num_inference_steps in Holder self.sdh.num_inference_steps = self.num_inference_steps + assert type(list_conditionings) is list, "list_conditionings need to be a list" if self.mode == 'standard': + text_embeddings = list_conditionings[0] return self.sdh.run_diffusion_standard(text_embeddings, latents_for_injection=latents_for_injection, idx_start=idx_start, idx_stop=idx_stop, return_image=return_image) elif self.mode == 'inpaint': + text_embeddings = list_conditionings[0] assert self.sdh.image_source is not None, "image_source is None. Please run init_inpainting first." assert self.sdh.mask_image is not None, "image_source is None. Please run init_inpainting first." return self.sdh.run_diffusion_inpaint(text_embeddings, latents_for_injection=latents_for_injection, idx_start=idx_start, idx_stop=idx_stop, return_image=return_image) + elif self.mode == 'upscale': + cond = list_conditionings[0] + uc_full = list_conditionings[1] + return self.sdh.run_diffusion_upscaling(cond, uc_full, latents_for_injection=latents_for_injection, idx_start=idx_start, idx_stop=idx_stop, return_image=return_image) + + def run_upscaling_step1( + self, + dp_img: str, + quality: str = 'upscaling_step1', + depth_strength: float = 0.65, + fixed_seeds: Optional[List[int]] = None, + overwrite_folder: bool = False, + ): + r""" + Initializes inpainting with a source and maks image. + Args: + dp_img: + Path to directory where the low-res images and yaml will be saved to. + This directory cannot exist and will be created here. + quality: str + Determines how many diffusion steps are being made + how many branches in total. + We suggest to leave it with upscaling_step1 which has 10 final branches. + depth_strength: float = 0.65, + Determines how deep the first injection will happen. + Deeper injections will cause (unwanted) formation of new structures, + more shallow values will go into alpha-blendy land. + fixed_seeds: Optional[List[int)]: + You can supply two seeds that are used for the first and second keyframe (prompt1 and prompt2). + Otherwise random seeds will be taken. + """ + assert self.text_embedding1 is not None, 'run set_prompt1(yourprompt1) first' + assert self.text_embedding2 is not None, 'run set_prompt2(yourprompt2) first' + assert not os.path.isdir(dp_img), f"directory already exists: {dp_img}" + + if fixed_seeds is None: + fixed_seeds = list(np.random.randint(0, 1000000, 2).astype(np.int32)) + + # Run latent blending + self.autosetup_branching(quality='upscaling_step1', depth_strength=depth_strength) + imgs_transition = self.run_transition(fixed_seeds=fixed_seeds) + + self.write_imgs_transition(dp_img, imgs_transition) + + + print(f"run_upscaling_step1: completed! {dp_img}") + + + def run_upscaling_step2( + self, + dp_img: str, + quality: str = 'upscaling_step2', + depth_strength: float = 0.65, + fixed_seeds: Optional[List[int]] = None, + overwrite_folder: bool = False, + ): + + fp_yml = os.path.join(dp_img, "lowres.yaml") + fp_movie = os.path.join(dp_img, "movie.mp4") + fps = 24 + ms = MovieSaver(fp_movie, fps=fps) + assert os.path.isfile(fp_yml), "lowres.yaml does not exist. did you forget run_upscaling_step1?" + dict_stuff = yml_load(fp_yml) + + # load lowres images + nmb_images_lowres = dict_stuff['nmb_images'] + prompt1 = dict_stuff['prompt1'] + prompt2 = dict_stuff['prompt2'] + imgs_lowres = [] + for i in range(nmb_images_lowres): + fp_img_lowres = os.path.join(dp_img, f"lowres_img_{str(i).zfill(4)}.jpg") + assert os.path.isfile(fp_img_lowres), f"{fp_img_lowres} does not exist. did you forget run_upscaling_step1?" + imgs_lowres.append(Image.open(fp_img_lowres)) + + + # set up upscaling + text_embeddingA = self.sdh.get_text_embedding(prompt1) + text_embeddingB = self.sdh.get_text_embedding(prompt2) + + self.autosetup_branching(quality='upscaling_step2', depth_strength=depth_strength) + + # list_nmb_branches = [2, 3, 4] + # list_injection_strength = [0.0, 0.6, 0.95] + # num_inference_steps = 100 + # self.setup_branching(num_inference_steps, list_nmb_branches, list_injection_strength) + + duration_single_trans = 3 + list_fract_mixing = np.linspace(0, 1, nmb_images_lowres-1) + + for i in range(nmb_images_lowres-1): + print(f"Starting movie segment {i+1}/{nmb_images_lowres-1}") + + self.text_embedding1 = interpolate_linear(text_embeddingA, text_embeddingB, list_fract_mixing[i]) + self.text_embedding2 = interpolate_linear(text_embeddingA, text_embeddingB, 1-list_fract_mixing[i]) + + if i==0: + recycle_img1 = False + else: + self.swap_forward() + recycle_img1 = True + + self.set_image1(imgs_lowres[i]) + self.set_image2(imgs_lowres[i+1]) + list_imgs = self.run_transition(recycle_img1=recycle_img1) + list_imgs_interp = add_frames_linear_interp(list_imgs, fps, duration_single_trans) + + # Save movie frame + for img in list_imgs_interp: + ms.write_frame(img) + + ms.finalize() + + + def init_inpainting( self, image_source: Union[Image.Image, np.ndarray] = None, @@ -567,10 +729,29 @@ class LatentBlending(): Initialize inpainting with an empty image and mask, effectively disabling inpainting, useful for generating a first image for transitions using diffusion. """ - self.init_mode('inpaint') + self.init_mode() self.sdh.init_inpainting(image_source, mask_image, init_empty) + @torch.no_grad() + def get_mixed_conditioning(self, fract_mixing): + if self.mode == 'standard': + text_embeddings_mix = interpolate_linear(self.text_embedding1, self.text_embedding2, fract_mixing) + list_conditionings = [text_embeddings_mix] + elif self.mode == 'inpaint': + text_embeddings_mix = interpolate_linear(self.text_embedding1, self.text_embedding2, fract_mixing) + list_conditionings = [text_embeddings_mix] + elif self.mode == 'upscale': + text_embeddings_mix = interpolate_linear(self.text_embedding1, self.text_embedding2, fract_mixing) + cond, uc_full = self.sdh.get_cond_upscaling(self.image1_lowres, text_embeddings_mix, self.noise_level_upscaling) + condB, uc_fullB = self.sdh.get_cond_upscaling(self.image2_lowres, text_embeddings_mix, self.noise_level_upscaling) + cond['c_concat'][0] = interpolate_spherical(cond['c_concat'][0], condB['c_concat'][0], fract_mixing) + uc_full['c_concat'][0] = interpolate_spherical(uc_full['c_concat'][0], uc_fullB['c_concat'][0], fract_mixing) + list_conditionings = [cond, uc_full] + else: + raise ValueError(f"mix_conditioning: unknown mode {self.mode}") + return list_conditionings + @torch.no_grad() def get_text_embeddings( self, @@ -587,6 +768,27 @@ class LatentBlending(): return self.sdh.get_text_embedding(prompt) + def write_imgs_transition(self, dp_img, imgs_transition): + r""" + Writes the transition images into the folder dp_img. + """ + os.makedirs(dp_img) + for i, img in enumerate(imgs_transition): + img_leaf = Image.fromarray(img) + img_leaf.save(os.path.join(dp_img, f"lowres_img_{str(i).zfill(4)}.jpg")) + + # Dump everything relevant into yaml + dict_stuff = {} + dict_stuff['prompt1'] = self.prompt1 + dict_stuff['prompt2'] = self.prompt2 + dict_stuff['seed1'] = int(self.seed1) + dict_stuff['seed2'] = int(self.seed2) + dict_stuff['num_inference_steps'] = self.num_inference_steps + dict_stuff['height'] = self.sdh.height + dict_stuff['width'] = self.sdh.width + dict_stuff['nmb_images'] = len(imgs_transition) + yml_save(os.path.join(dp_img, "lowres.yaml"), dict_stuff) + def randomize_seed(self): r""" Set a random seed for a fresh start. @@ -815,7 +1017,7 @@ def add_frames_linear_interp( return list_imgs_interp -def get_spacing(nmb_points:int, scaling: float): +def get_spacing(nmb_points: int, scaling: float): """ Helper function for getting nonlinear spacing between 0 and 1, symmetric around 0.5 Args: @@ -834,9 +1036,7 @@ def get_spacing(nmb_points:int, scaling: float): else: left_side = np.abs(np.linspace(1, 0, nmb_points_per_side)**scaling / 2 - 0.5)[0:-1] right_side = 1-left_side[::-1] - all_fracts = np.hstack([left_side, right_side]) - return all_fracts @@ -861,16 +1061,126 @@ def get_time(resolution=None): return t +def yml_load(fp_yml, print_fields=False): + """ + Helper function for loading yaml files + """ + with open(fp_yml) as f: + data = yaml.load(f, Loader=yaml.loader.SafeLoader) + dict_data = dict(data) + print("load: loaded {}".format(fp_yml)) + return dict_data +def yml_save(fp_yml, dict_stuff): + """ + Helper function for saving yaml files + """ + with open(fp_yml, 'w') as f: + data = yaml.dump(dict_stuff, f, sort_keys=False, default_flow_style=False) + print("yml_save: saved {}".format(fp_yml)) #%% le main if __name__ == "__main__": - pass + # xxxx + # #%% First let us spawn a stable diffusion holder + # device = "cuda:0" + # fp_ckpt = "../stable_diffusion_models/ckpt/v2-1_512-ema-pruned.ckpt" + # fp_config = 'configs/v2-inference.yaml' + # sdh = StableDiffusionHolder(fp_ckpt, fp_config, device, height=384, width=512) + # #%% + # # Spawn latent blending + # self = LatentBlending(sdh) + + # dp_img = '/home/lugo/latentblending/test5' + + # fn1 = '230105_211545_photo_of_a_pyroclastic_ash_cloud_racing_down_mount_etna.txt' + # fn2 = '230105_211815_a_breathtaking_drone_photo_of_a_bizarre_cliff_structure,_lava_streams_flowing_down_into_the_ocean.txt' + + # dp_cherries ='/home/lugo/latentblending/cherries/' + + # dict1 = yml_load(os.path.join(dp_cherries, fn1)) + # dict2 = yml_load(os.path.join(dp_cherries, fn2)) + + # # prompt1 = "painting of a big pine tree" + # # prompt2 = "painting of the full moon shining, mountains in the background, rocks, eery" + # prompt1 = dict1['prompt'] + # prompt2 = dict2['prompt'] + # self.set_prompt1(prompt1) + # self.set_prompt2(prompt2) + # fixed_seeds = [dict1['seed'], dict2['seed']] + # self.run_upscaling_step1(dp_img, fixed_seeds=fixed_seeds, depth_strength=0.6) + + # # FIXME: depth_strength=0.6 CAN cause trouble. why?! + + #%% RUN UPSCALING_STEP2 (highres) + fp_ckpt= "../stable_diffusion_models/ckpt/x4-upscaler-ema.ckpt" + fp_config = 'configs/x4-upscaling.yaml' + sdh = StableDiffusionHolder(fp_ckpt, fp_config) + # self.run_upscaling_step2(dp_img) + #%% /home/lugo/latentblending/230106_210812 / + self = LatentBlending(sdh) + dp_img = '/home/lugo/latentblending/230107_144533' + fp_yml = os.path.join(dp_img, "lowres.yaml") + fp_movie = os.path.join(dp_img, "movie.mp4") + fps = 24 + ms = MovieSaver(fp_movie, fps=fps) + assert os.path.isfile(fp_yml), "lowres.yaml does not exist. did you forget run_upscaling_step1?" + dict_stuff = yml_load(fp_yml) + + # load lowres images + nmb_images_lowres = dict_stuff['nmb_images'] + prompt1 = dict_stuff['prompt1'] + prompt2 = dict_stuff['prompt2'] + imgs_lowres = [] + for i in range(nmb_images_lowres): + fp_img_lowres = os.path.join(dp_img, f"lowres_img_{str(i).zfill(4)}.jpg") + assert os.path.isfile(fp_img_lowres), f"{fp_img_lowres} does not exist. did you forget run_upscaling_step1?" + imgs_lowres.append(Image.open(fp_img_lowres)) + + + # set up upscaling + text_embeddingA = self.sdh.get_text_embedding(prompt1) + text_embeddingB = self.sdh.get_text_embedding(prompt2) + + list_nmb_branches = [2, 3, 6] + list_injection_strength = [0.0, 0.6, 0.95] + num_inference_steps = 100 + duration_single_trans = 3 + self.setup_branching(num_inference_steps, list_nmb_branches, list_injection_strength) + list_fract_mixing = np.linspace(0, 1, nmb_images_lowres-1) + + for i in range(nmb_images_lowres-1): + print(f"Starting movie segment {i+1}/{nmb_images_lowres-1}") + + self.text_embedding1 = interpolate_linear(text_embeddingA, text_embeddingB, list_fract_mixing[i]) + self.text_embedding2 = interpolate_linear(text_embeddingA, text_embeddingB, 1-list_fract_mixing[i]) + + if i==0: + recycle_img1 = False + else: + self.swap_forward() + recycle_img1 = True + + self.set_image1(imgs_lowres[i]) + self.set_image2(imgs_lowres[i+1]) + list_imgs = self.run_transition(recycle_img1=recycle_img1) + self.write_imgs_transition(os.path.join(dp_img, f"highres_{str(i).zfill(4)}"), list_imgs) + list_imgs_interp = add_frames_linear_interp(list_imgs, fps, duration_single_trans) + + # Save movie frame + for img in list_imgs_interp: + ms.write_frame(img) + + ms.finalize() + #%% """ + + TODO Coding: + CHECK IF ALL STUFF WORKS STILL: STANDARD MODEL, INPAINTING RUNNING WITHOUT PROMPT! save value ranges, can it be trashed? in the middle: have more branches + lower guidance scale @@ -878,8 +1188,6 @@ TODO Coding: TODO Other: github write text - requirements - make graphic explaining make colab license twitter et al diff --git a/stable_diffusion_holder.py b/stable_diffusion_holder.py index e6c7db0..17b31ae 100644 --- a/stable_diffusion_holder.py +++ b/stable_diffusion_holder.py @@ -27,7 +27,7 @@ import warnings import torch from tqdm.auto import tqdm from PIL import Image -import matplotlib.pyplot as plt +# import matplotlib.pyplot as plt import torch from movie_util import MovieSaver import datetime @@ -40,29 +40,21 @@ from torch import autocast from contextlib import nullcontext from ldm.util import instantiate_from_config from ldm.models.diffusion.ddim import DDIMSampler -from einops import repeat +from einops import repeat, rearrange + +#%% -def load_model_from_config(config, ckpt, verbose=False): - print(f"Loading model from {ckpt}") - pl_sd = torch.load(ckpt, map_location="cpu") - if "global_step" in pl_sd: - print(f"Global Step: {pl_sd['global_step']}") - sd = pl_sd["state_dict"] - model = instantiate_from_config(config.model) - m, u = model.load_state_dict(sd, strict=False) - if len(m) > 0 and verbose: - print("missing keys:") - print(m) - if len(u) > 0 and verbose: - print("unexpected keys:") - print(u) +def pad_image(input_image): + pad_w, pad_h = np.max(((2, 2), np.ceil( + np.array(input_image.size) / 64).astype(int)), axis=0) * 64 - input_image.size + im_padded = Image.fromarray( + np.pad(np.array(input_image), ((0, pad_h), (0, pad_w), (0, 0)), mode='edge')) + return im_padded - model.cuda() - model.eval() - return model -def make_batch_sd( + +def make_batch_inpaint( image, mask, txt, @@ -89,16 +81,42 @@ def make_batch_sd( } return batch + +def make_batch_superres( + image, + txt, + device, + num_samples=1, + ): + image = np.array(image.convert("RGB")) + image = torch.from_numpy(image).to(dtype=torch.float32) / 127.5 - 1.0 + batch = { + "lr": rearrange(image, 'h w c -> 1 c h w'), + "txt": num_samples * [txt], + } + batch["lr"] = repeat(batch["lr"].to(device=device), + "1 ... -> n ...", n=num_samples) + return batch + + +def make_noise_augmentation(model, batch, noise_level=None): + x_low = batch[model.low_scale_key] + x_low = x_low.to(memory_format=torch.contiguous_format).float() + x_aug, noise_level = model.low_scale_model(x_low, noise_level) + return x_aug, noise_level + + class StableDiffusionHolder: def __init__(self, fp_ckpt: str = None, fp_config: str = None, - device: str = None, + num_inference_steps: int = 30, height: Optional[int] = None, width: Optional[int] = None, - num_inference_steps: int = 30, + device: str = None, precision: str='autocast', ): + self.seed = 42 self.guidance_scale = 5.0 @@ -130,13 +148,15 @@ class StableDiffusionHolder: def init_model(self, fp_ckpt, fp_config): assert os.path.isfile(fp_ckpt), f"Your model checkpoint file does not exist: {fp_ckpt}" assert os.path.isfile(fp_config), f"Your config file does not exist: {fp_config}" - config = OmegaConf.load(fp_config) - self.model = load_model_from_config(config, fp_ckpt) + self.fp_ckpt = fp_ckpt + config = OmegaConf.load(fp_config) + self.model = instantiate_from_config(config.model) + self.model.load_state_dict(torch.load(fp_ckpt)["state_dict"], strict=False) + self.model = self.model.to(self.device) self.sampler = DDIMSampler(self.model) - self.fp_ckpt = fp_ckpt @@ -186,6 +206,26 @@ class StableDiffusionHolder: def get_text_embedding(self, prompt): c = self.model.get_learned_conditioning(prompt) return c + + @torch.no_grad() + def get_cond_upscaling(self, image, text_embedding, noise_level): + r""" + Initializes the conditioning for the x4 upscaling model. + """ + + image = pad_image(image) # resize to integer multiple of 32 + w, h = image.size + noise_level = torch.Tensor(1 * [noise_level]).to(self.sampler.model.device).long() + batch = make_batch_superres(image, txt="placeholder", device=self.device, num_samples=1) + + x_augment, noise_level = make_noise_augmentation(self.model, batch, noise_level) + + cond = {"c_concat": [x_augment], "c_crossattn": [text_embedding], "c_adm": noise_level} + # uncond cond + uc_cross = self.model.get_unconditional_conditioning(1, "") + uc_full = {"c_concat": [x_augment], "c_crossattn": [uc_cross], "c_adm": noise_level} + + return cond, uc_full @torch.no_grad() def run_diffusion_standard( @@ -317,7 +357,7 @@ class StableDiffusionHolder: with precision_scope("cuda"): with self.model.ema_scope(): - batch = make_batch_sd(self.image_source, self.mask_image, txt="willbereplaced", device=self.device, num_samples=1) + batch = make_batch_inpaint(self.image_source, self.mask_image, txt="willbereplaced", device=self.device, num_samples=1) c = text_embeddings c_cat = list() for ck in self.model.concat_keys: @@ -383,7 +423,93 @@ class StableDiffusionHolder: return self.latent2image(latents) else: return list_latents_out + + @torch.no_grad() + def run_diffusion_upscaling( + self, + cond, + uc_full, + latents_for_injection: torch.FloatTensor = None, + idx_start: int = -1, + idx_stop: int = -1, + return_image: Optional[bool] = False + ): + r""" + Wrapper function for run_diffusion_standard and run_diffusion_inpaint. + Depending on the mode, the correct one will be executed. + + Args: + ?? + latents_for_injection: torch.FloatTensor + Latents that are used for injection + idx_start: int + Index of the diffusion process start and where the latents_for_injection are injected + idx_stop: int + Index of the diffusion process end. + return_image: Optional[bool] + Optionally return image directly + """ + + + if latents_for_injection is None: + do_inject_latents = False + else: + do_inject_latents = True + + precision_scope = autocast if self.precision == "autocast" else nullcontext + generator = torch.Generator(device=self.device).manual_seed(int(self.seed)) + + h = uc_full['c_concat'][0].shape[2] + w = uc_full['c_concat'][0].shape[3] + + with precision_scope("cuda"): + with self.model.ema_scope(): + + + shape_latents = [self.model.channels, h, w] + + self.sampler.make_schedule(ddim_num_steps=self.num_inference_steps-1, ddim_eta=self.ddim_eta, verbose=False) + C, H, W = shape_latents + size = (1, C, H, W) + b = size[0] + + latents = torch.randn(size, generator=generator, device=self.device) + + timesteps = self.sampler.ddim_timesteps + + time_range = np.flip(timesteps) + total_steps = timesteps.shape[0] + + # collect latents + list_latents_out = [] + for i, step in enumerate(time_range): + if do_inject_latents: + # Inject latent at right place + if i < idx_start: + continue + elif i == idx_start: + latents = latents_for_injection.clone() + if i == idx_stop: + return list_latents_out + + # print(f"diffusion iter {i}") + index = total_steps - i - 1 + ts = torch.full((b,), step, device=self.device, dtype=torch.long) + outs = self.sampler.p_sample_ddim(latents, cond, ts, index=index, use_original_steps=False, + quantize_denoised=False, temperature=1.0, + noise_dropout=0.0, score_corrector=None, + corrector_kwargs=None, + unconditional_guidance_scale=self.guidance_scale, + unconditional_conditioning=uc_full, + dynamic_threshold=None) + latents, pred_x0 = outs + list_latents_out.append(latents.clone()) + + if return_image: + return self.latent2image(latents) + else: + return list_latents_out @torch.no_grad() def latent2image( @@ -405,47 +531,178 @@ class StableDiffusionHolder: if __name__ == "__main__": - num_inference_steps = 20 # Number of diffusion interations - - # fp_ckpt = "../stable_diffusion_models/ckpt/768-v-ema.ckpt" - # fp_config = '../stablediffusion/configs/stable-diffusion/v2-inference-v.yaml' - - fp_ckpt= "../stable_diffusion_models/ckpt/512-inpainting-ema.ckpt" - fp_config = '../stablediffusion/configs//stable-diffusion/v2-inpainting-inference.yaml' - - sdh = StableDiffusionHolder(fp_ckpt, fp_config, num_inference_steps) - # fp_ckpt= "../stable_diffusion_models/ckpt/512-base-ema.ckpt" - # fp_config = '../stablediffusion/configs//stable-diffusion/v2-inference.yaml' - - - - #%% INPAINT PREPS - image_source = Image.fromarray((255*np.random.rand(512,512,3)).astype(np.uint8)) - mask = 255*np.ones([512,512], dtype=np.uint8) - mask[0:50, 0:50] = 0 - mask = Image.fromarray(mask) - - sdh.init_inpainting(image_source, mask) - text_embedding = sdh.get_text_embedding("photo of a strange house, surreal painting") - list_latents = sdh.run_diffusion_inpaint(text_embedding) - - #%% - idx_inject = 3 - img_orig = sdh.latent2image(list_latents[-1]) - list_inject = sdh.run_diffusion_inpaint(text_embedding, list_latents[idx_inject], idx_start=idx_inject+1) - img_inject = sdh.latent2image(list_inject[-1]) - - img_diff = img_orig - img_inject - import matplotlib.pyplot as plt - plt.imshow(np.concatenate((img_orig, img_inject, img_diff), axis=1)) + + fp_ckpt= "../stable_diffusion_models/ckpt/x4-upscaler-ema.ckpt" + fp_config = 'configs/x4-upscaling.yaml' + num_inference_steps = 100 + self = StableDiffusionHolder(fp_ckpt, fp_config, num_inference_steps=num_inference_steps) + xxx + #%% image A + image = Image.open('/home/lugo/latentblending/test1/img_0007.jpg') + image = image.resize((32*20, 32*12)) + promptA = "photo of a an ancient castle surrounded by a forest" + noise_level = 20 #gradio min=0, max=350, value=20 + text_embeddingA = self.get_text_embedding(promptA) + cond, uc_full = self.get_cond_upscaling(image, text_embeddingA, noise_level) + list_samplesA = self.run_diffusion_upscaling(cond, uc_full) + image_result = Image.fromarray(self.latent2image(list_samplesA[-1])) + image_result.save('/home/lugo/latentblending/test1/high/imgA.jpg') + + + #%% image B + from latent_blending import interpolate_linear, interpolate_spherical + image = Image.open('/home/lugo/latentblending/test1/img_0006.jpg') + image = image.resize((32*20, 32*12)) + promptA = "photo of a an ancient castle surrounded by a forest" + promptB = "photo of a beautiful island on the horizon, blue sea with waves" + noise_level = 20 #gradio min=0, max=350, value=20 + text_embeddingA = self.get_text_embedding(promptA) + text_embeddingB = self.get_text_embedding(promptB) + text_embedding = interpolate_linear(text_embeddingA, text_embeddingB, 1/8) + + cond, uc_full = self.get_cond_upscaling(image, text_embedding, noise_level) + + list_samplesB = self.run_diffusion_upscaling(cond, uc_full) + image_result = Image.fromarray(self.latent2image(list_samplesB[-1])) + image_result.save('/home/lugo/latentblending/test1/high/imgB.jpg') + + + #%% reality check: run only for 50 iter. + image = Image.open('/home/lugo/latentblending/test1/img_0007.jpg') + image = image.resize((32*20, 32*12)) + promptA = "photo of a an ancient castle surrounded by a forest" + noise_level = 20 #gradio min=0, max=350, value=20 + text_embeddingA = self.get_text_embedding(promptA) + cond, uc_full = self.get_cond_upscaling(image, text_embeddingA, noise_level) + + latents_inject = list_samplesA[50] + list_samplesAx = self.run_diffusion_upscaling(cond, uc_full, latents_inject, idx_start=50) + image_result = Image.fromarray(self.latent2image(list_samplesAx[-1])) + image_result.save('/home/lugo/latentblending/test1/high/imgA_restart.jpg') + + # RESULTS ARE NOT EXACTLY IDENTICAL! INVESTIGATE WHY + + #%% mix in the middle! which uc_full should be taken? + # expA: take the one from A + idx_start = 90 + latentsA = list_samplesA[idx_start] + latentsB = list_samplesB[idx_start] + latents_inject = interpolate_spherical(latentsA, latentsB, 0.5) + + image = Image.open('/home/lugo/latentblending/test1/img_0007.jpg') + image = image.resize((32*20, 32*12)) + promptA = "photo of a an ancient castle surrounded by a forest" + noise_level = 20 #gradio min=0, max=350, value=20 + text_embeddingA = self.get_text_embedding(promptA) + cond, uc_full = self.get_cond_upscaling(image, text_embeddingA, noise_level) + + list_samples = self.run_diffusion_upscaling(cond, uc_full, latents_inject, idx_start=idx_start) + image_result = Image.fromarray(self.latent2image(list_samples[-1])) + image_result.save('/home/lugo/latentblending/test1/high/img_mix_expA_late.jpg') + + + #%% mix in the middle! which uc_full should be taken? + # expA: take the one from B + idx_start = 90 + latentsA = list_samplesA[idx_start] + latentsB = list_samplesB[idx_start] + latents_inject = interpolate_spherical(latentsA, latentsB, 0.5) + + image = Image.open('/home/lugo/latentblending/test1/img_0006.jpg').resize((32*20, 32*12)) + promptA = "photo of a an ancient castle surrounded by a forest" + promptB = "photo of a beautiful island on the horizon, blue sea with waves" + noise_level = 20 #gradio min=0, max=350, value=20 + text_embeddingA = self.get_text_embedding(promptA) + text_embeddingB = self.get_text_embedding(promptB) + text_embedding = interpolate_linear(text_embeddingA, text_embeddingB, 1/8) + cond, uc_full = self.get_cond_upscaling(image, text_embedding, noise_level) + + list_samples = self.run_diffusion_upscaling(cond, uc_full, latents_inject, idx_start=idx_start) + image_result = Image.fromarray(self.latent2image(list_samples[-1])) + image_result.save('/home/lugo/latentblending/test1/high/img_mix_expB_late.jpg') + + + + + #%% lets blend the uc_full too! + # expC + + idx_start = 50 + list_mix = np.linspace(0, 1, 20) + for fract_mix in list_mix: + # fract_mix = 0.75 + latentsA = list_samplesA[idx_start] + latentsB = list_samplesB[idx_start] + latents_inject = interpolate_spherical(latentsA, latentsB, fract_mix) + + text_embeddingA = self.get_text_embedding(promptA) + text_embeddingB = self.get_text_embedding(promptB) + text_embedding = interpolate_linear(text_embeddingA, text_embeddingB, 1/8) + + imageA = Image.open('/home/lugo/latentblending/test1/img_0007.jpg').resize((32*20, 32*12)) + condA, uc_fullA = self.get_cond_upscaling(imageA, text_embedding, noise_level) + + imageB = Image.open('/home/lugo/latentblending/test1/img_0006.jpg').resize((32*20, 32*12)) + condB, uc_fullB = self.get_cond_upscaling(imageB, text_embedding, noise_level) + + condA['c_concat'][0] = interpolate_spherical(condA['c_concat'][0], condB['c_concat'][0], fract_mix) + uc_fullA['c_concat'][0] = interpolate_spherical(uc_fullA['c_concat'][0], uc_fullB['c_concat'][0], fract_mix) + + list_samples = self.run_diffusion_upscaling(condA, uc_fullA, latents_inject, idx_start=idx_start) + image_result = Image.fromarray(self.latent2image(list_samples[-1])) + image_result.save(f'/home/lugo/latentblending/test1/high/img_mix_expC_{fract_mix}_start{idx_start}.jpg') + + + +#%% + +list_imgs = os.listdir('/home/lugo/latentblending/test1/high/') +list_imgs = [l for l in list_imgs if "expC" in l] +list_imgs.pop(0) + +lx = [] +for fn in list_imgs: + Image.open #%% + + + + if False: + + num_inference_steps = 20 # Number of diffusion interations + + # fp_ckpt = "../stable_diffusion_models/ckpt/768-v-ema.ckpt" + # fp_config = '../stablediffusion/configs/stable-diffusion/v2-inference-v.yaml' + + fp_ckpt= "../stable_diffusion_models/ckpt/512-inpainting-ema.ckpt" + fp_config = '../stablediffusion/configs//stable-diffusion/v2-inpainting-inference.yaml' + + sdh = StableDiffusionHolder(fp_ckpt, fp_config, num_inference_steps) + # fp_ckpt= "../stable_diffusion_models/ckpt/512-base-ema.ckpt" + # fp_config = '../stablediffusion/configs//stable-diffusion/v2-inference.yaml' + + + + image_source = Image.fromarray((255*np.random.rand(512,512,3)).astype(np.uint8)) + mask = 255*np.ones([512,512], dtype=np.uint8) + mask[0:50, 0:50] = 0 + mask = Image.fromarray(mask) + + sdh.init_inpainting(image_source, mask) + text_embedding = sdh.get_text_embedding("photo of a strange house, surreal painting") + list_latents = sdh.run_diffusion_inpaint(text_embedding) + + idx_inject = 3 + img_orig = sdh.latent2image(list_latents[-1]) + list_inject = sdh.run_diffusion_inpaint(text_embedding, list_latents[idx_inject], idx_start=idx_inject+1) + img_inject = sdh.latent2image(list_inject[-1]) + + img_diff = img_orig - img_inject + import matplotlib.pyplot as plt + plt.imshow(np.concatenate((img_orig, img_inject, img_diff), axis=1)) + + - -""" -next steps: - incorporate into lb - incorporate into outpaint -"""