diff --git a/examples/multi_trans.py b/examples/multi_trans.py index ac3e726..d9cd7a4 100644 --- a/examples/multi_trans.py +++ b/examples/multi_trans.py @@ -31,47 +31,12 @@ nmb_prompts = 20 list_prompts = [] list_prompts.append("high resolution ultra 8K image with lake and forest") -# list_prompts.append("strange and alien desolate lanscapes 8K") -# list_prompts.append("ultra high res psychedelic skyscraper city landscape 8K unreal engine") -list_prompts = list_prompts*10 - -be.set_prompt1(list_prompts[0]) -be.compute_latents1(True) - -#img = pipe(list_prompts[0]).images[0] - - -#%% -# import os -# import random - -# # Directory containing the text files -# dir_prompts = "/raid/data/diffusion/flamengalo/prompts_surreal" - -# # List to store the contents of selected text files -# list_prompts = [] - -# # List all files in the directory -# file_list = os.listdir(dir_prompts) - -# # Shuffle the file list to get random files -# random.shuffle(file_list) - -# # Loop through the first nmb_prompts files and read their contents -# for filename in file_list[:nmb_prompts]: -# file_path = os.path.join(dir_prompts, filename) -# try: -# with open(file_path, 'r') as file: -# content = file.read() -# list_prompts.append(content) -# except Exception as e: -# print(f"except {e}") +list_prompts.append("strange and alien desolate lanscapes 8K") +list_prompts.append("ultra high res psychedelic skyscraper city landscape 8K unreal engine") #%% fp_movie = f'surreal_nmb{len(list_prompts)}.mp4' # Specify the seeds -list_seeds = np.random.randint(0, 1111111111111, len(list_prompts)) - - +list_seeds = np.random.randint(0, np.iinfo(np.int32).max, len(list_prompts)) list_movie_parts = [] for i in range(len(list_prompts) - 1): diff --git a/latentblending/blending_engine.py b/latentblending/blending_engine.py index 6cd004f..7bbdc05 100644 --- a/latentblending/blending_engine.py +++ b/latentblending/blending_engine.py @@ -11,6 +11,7 @@ import lpips import platform from latentblending.diffusers_holder import DiffusersHolder from latentblending.utils import interpolate_spherical, interpolate_linear, add_frames_linear_interp +from lunar_tools import MovieSaver, fill_up_frames_linear_interpolation warnings.filterwarnings('ignore') torch.backends.cudnn.benchmark = False torch.set_grad_enabled(False) @@ -696,7 +697,7 @@ class BlendingEngine(): """ # Let's get more cheap frames via linear interpolation (duration_transition*fps frames) - imgs_transition_ext = add_frames_linear_interp(self.tree_final_imgs, duration_transition, fps) + imgs_transition_ext = fill_up_frames_linear_interpolation(self.tree_final_imgs, duration_transition, fps) # Save as MP4 if os.path.isfile(fp_movie): diff --git a/latentblending/movie_util.py b/latentblending/movie_util.py deleted file mode 100644 index eb7e157..0000000 --- a/latentblending/movie_util.py +++ /dev/null @@ -1,301 +0,0 @@ -# Copyright 2022 Lunar Ring. All rights reserved. -# Written by Johannes Stelzer, email stelzer@lunar-ring.ai twitter @j_stelzer - -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import subprocess -import os -import numpy as np -from tqdm import tqdm -import cv2 -from typing import List -import ffmpeg # pip install ffmpeg-python. if error with broken pipe: conda update ffmpeg - - -class MovieSaver(): - def __init__( - self, - fp_out: str, - fps: int = 24, - shape_hw: List[int] = None, - crf: int = 21, - codec: str = 'libx264', - preset: str = 'fast', - pix_fmt: str = 'yuv420p', - silent_ffmpeg: bool = True): - r""" - Initializes movie saver class - a human friendly ffmpeg wrapper. - After you init the class, you can dump numpy arrays x into moviesaver.write_frame(x). - Don't forget toi finalize movie file with moviesaver.finalize(). - Args: - fp_out: str - Output file name. If it already exists, it will be deleted. - fps: int - Frames per second. - shape_hw: List[int, int] - Output shape, optional argument. Can be initialized automatically when first frame is written. - crf: int - ffmpeg doc: the range of the CRF scale is 0–51, where 0 is lossless - (for 8 bit only, for 10 bit use -qp 0), 23 is the default, and 51 is worst quality possible. - A lower value generally leads to higher quality, and a subjectively sane range is 17–28. - Consider 17 or 18 to be visually lossless or nearly so; - it should look the same or nearly the same as the input but it isn't technically lossless. - The range is exponential, so increasing the CRF value +6 results in - roughly half the bitrate / file size, while -6 leads to roughly twice the bitrate. - codec: int - Number of diffusion steps. Larger values will take more compute time. - preset: str - Choose between ultrafast, superfast, veryfast, faster, fast, medium, slow, slower, veryslow. - ffmpeg doc: A preset is a collection of options that will provide a certain encoding speed - to compression ratio. A slower preset will provide better compression - (compression is quality per filesize). - This means that, for example, if you target a certain file size or constant bit rate, - you will achieve better quality with a slower preset. Similarly, for constant quality encoding, - you will simply save bitrate by choosing a slower preset. - pix_fmt: str - Pixel format. Run 'ffmpeg -pix_fmts' in your shell to see all options. - silent_ffmpeg: bool - Surpress the output from ffmpeg. - """ - if len(os.path.split(fp_out)[0]) > 0: - assert os.path.isdir(os.path.split(fp_out)[0]), "Directory does not exist!" - - self.fp_out = fp_out - self.fps = fps - self.crf = crf - self.pix_fmt = pix_fmt - self.codec = codec - self.preset = preset - self.silent_ffmpeg = silent_ffmpeg - - if os.path.isfile(fp_out): - os.remove(fp_out) - - self.init_done = False - self.nmb_frames = 0 - if shape_hw is None: - self.shape_hw = [-1, 1] - else: - if len(shape_hw) == 2: - shape_hw.append(3) - self.shape_hw = shape_hw - self.initialize() - - print(f"MovieSaver initialized. fps={fps} crf={crf} pix_fmt={pix_fmt} codec={codec} preset={preset}") - - def initialize(self): - args = ( - ffmpeg - .input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(self.shape_hw[1], self.shape_hw[0]), framerate=self.fps) - .output(self.fp_out, crf=self.crf, pix_fmt=self.pix_fmt, c=self.codec, preset=self.preset) - .overwrite_output() - .compile() - ) - if self.silent_ffmpeg: - self.ffmpg_process = subprocess.Popen(args, stdin=subprocess.PIPE, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL) - else: - self.ffmpg_process = subprocess.Popen(args, stdin=subprocess.PIPE) - self.init_done = True - self.shape_hw = tuple(self.shape_hw) - print(f"Initialization done. Movie shape: {self.shape_hw}") - - def write_frame(self, out_frame: np.ndarray): - r""" - Function to dump a numpy array as frame of a movie. - Args: - out_frame: np.ndarray - Numpy array, in np.uint8 format. Convert with np.astype(x, np.uint8). - Dim 0: y - Dim 1: x - Dim 2: RGB - """ - assert out_frame.dtype == np.uint8, "Convert to np.uint8 before" - assert len(out_frame.shape) == 3, "out_frame needs to be three dimensional, Y X C" - assert out_frame.shape[2] == 3, f"need three color channels, but you provided {out_frame.shape[2]}." - - if not self.init_done: - self.shape_hw = out_frame.shape - self.initialize() - - assert self.shape_hw == out_frame.shape, f"You cannot change the image size after init. Initialized with {self.shape_hw}, out_frame {out_frame.shape}" - - # write frame - self.ffmpg_process.stdin.write( - out_frame - .astype(np.uint8) - .tobytes() - ) - - self.nmb_frames += 1 - - def finalize(self): - r""" - Call this function to finalize the movie. If you forget to call it your movie will be garbage. - """ - if self.nmb_frames == 0: - print("You did not write any frames yet! nmb_frames = 0. Cannot save.") - return - self.ffmpg_process.stdin.close() - self.ffmpg_process.wait() - duration = int(self.nmb_frames / self.fps) - print(f"Movie saved, {duration}s playtime, watch here: \n{self.fp_out}") - - -def concatenate_movies(fp_final: str, list_fp_movies: List[str]): - r""" - Concatenate multiple movie segments into one long movie, using ffmpeg. - - Parameters - ---------- - fp_final : str - Full path of the final movie file. Should end with .mp4 - list_fp_movies : list[str] - List of full paths of movie segments. - """ - assert fp_final[-4] == ".", "fp_final seems to miss file extension: {fp_final}" - for fp in list_fp_movies: - assert os.path.isfile(fp), f"Input movie does not exist: {fp}" - assert os.path.getsize(fp) > 100, f"Input movie seems empty: {fp}" - - if os.path.isfile(fp_final): - os.remove(fp_final) - - # make a list for ffmpeg - list_concat = [] - for fp_part in list_fp_movies: - list_concat.append(f"""file '{fp_part}'""") - - # save this list - fp_list = "tmp_move.txt" - with open(fp_list, "w") as fa: - for item in list_concat: - fa.write("%s\n" % item) - - cmd = f'ffmpeg -f concat -safe 0 -i {fp_list} -c copy {fp_final}' - subprocess.call(cmd, shell=True) - os.remove(fp_list) - if os.path.isfile(fp_final): - print(f"concatenate_movies: success! Watch here: {fp_final}") - - -def add_sound(fp_final, fp_silentmovie, fp_sound): - cmd = f'ffmpeg -i {fp_silentmovie} -i {fp_sound} -c copy -map 0:v:0 -map 1:a:0 {fp_final}' - subprocess.call(cmd, shell=True) - if os.path.isfile(fp_final): - print(f"add_sound: success! Watch here: {fp_final}") - - -def add_subtitles_to_video( - fp_input: str, - fp_output: str, - subtitles: list, - fontsize: int = 50, - font_name: str = "Arial", - color: str = 'yellow' - ): - from moviepy.editor import VideoFileClip, TextClip, CompositeVideoClip - r""" - Function to add subtitles to a video. - - Args: - fp_input (str): File path of the input video. - fp_output (str): File path of the output video with subtitles. - subtitles (list): List of dictionaries containing subtitle information - (start, duration, text). Example: - subtitles = [ - {"start": 1, "duration": 3, "text": "hello test"}, - {"start": 4, "duration": 2, "text": "this works"}, - ] - fontsize (int): Font size of the subtitles. - font_name (str): Font name of the subtitles. - color (str): Color of the subtitles. - """ - - # Check if the input file exists - if not os.path.isfile(fp_input): - raise FileNotFoundError(f"Input file not found: {fp_input}") - - # Check the subtitles format and sort them by the start time - time_points = [] - for subtitle in subtitles: - if not isinstance(subtitle, dict): - raise ValueError("Each subtitle must be a dictionary containing 'start', 'duration' and 'text'.") - if not all(key in subtitle for key in ["start", "duration", "text"]): - raise ValueError("Each subtitle dictionary must contain 'start', 'duration' and 'text'.") - if subtitle['start'] < 0 or subtitle['duration'] <= 0: - raise ValueError("'start' should be non-negative and 'duration' should be positive.") - time_points.append((subtitle['start'], subtitle['start'] + subtitle['duration'])) - - # Check for overlaps - time_points.sort() - for i in range(1, len(time_points)): - if time_points[i][0] < time_points[i - 1][1]: - raise ValueError("Subtitle time intervals should not overlap.") - - # Load the video clip - video = VideoFileClip(fp_input) - - # Create a list to store subtitle clips - subtitle_clips = [] - - # Loop through the subtitle information and create TextClip for each - for subtitle in subtitles: - text_clip = TextClip(subtitle["text"], fontsize=fontsize, color=color, font=font_name) - text_clip = text_clip.set_position(('center', 'bottom')).set_start(subtitle["start"]).set_duration(subtitle["duration"]) - subtitle_clips.append(text_clip) - - # Overlay the subtitles on the video - video = CompositeVideoClip([video] + subtitle_clips) - - # Write the final clip to a new file - video.write_videofile(fp_output) - - - -class MovieReader(): - r""" - Class to read in a movie. - """ - - def __init__(self, fp_movie): - self.video_player_object = cv2.VideoCapture(fp_movie) - self.nmb_frames = int(self.video_player_object.get(cv2.CAP_PROP_FRAME_COUNT)) - self.fps_movie = int(self.video_player_object.get(cv2.CAP_PROP_FPS)) - self.shape = [100, 100, 3] - self.shape_is_set = False - - def get_next_frame(self): - success, image = self.video_player_object.read() - if success: - if not self.shape_is_set: - self.shape_is_set = True - self.shape = image.shape - return image - else: - return np.zeros(self.shape) - - -if __name__ == "__main__": - fps = 2 - list_fp_movies = [] - for k in range(4): - fp_movie = f"/tmp/my_random_movie_{k}.mp4" - list_fp_movies.append(fp_movie) - ms = MovieSaver(fp_movie, fps=fps) - for fn in tqdm(range(30)): - img = (np.random.rand(512, 1024, 3) * 255).astype(np.uint8) - ms.write_frame(img) - ms.finalize() - - fp_final = "/tmp/my_concatenated_movie.mp4" - concatenate_movies(fp_final, list_fp_movies) diff --git a/requirements.txt b/requirements.txt index 3f4291a..1bd965d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ lpips==0.1.4 opencv-python -ffmpeg-python diffusers==0.25.0 transformers pytest +accelerate \ No newline at end of file diff --git a/setup.py b/setup.py index 8268a93..d5d4ca2 100644 --- a/setup.py +++ b/setup.py @@ -6,14 +6,14 @@ with open('requirements.txt') as f: setup( name='latentblending', - version='0.2', + version='0.3', url='https://github.com/lunarring/latentblending', description='Butter-smooth video transitions', long_description=open('README.md').read(), - install_requires=required, - dependency_links=[ - 'git+https://github.com/lunarring/lunar_tools#egg=lunar_tools' - ], + install_requires=[ + 'lunar_tools @ git+https://github.com/lunarring/lunar_tools.git#egg=lunar_tools' + ] + required, include_package_data=False, ) +