diff --git a/stickergen/actions.py b/stickergen/actions.py index 06b8a3a..7f7af2e 100644 --- a/stickergen/actions.py +++ b/stickergen/actions.py @@ -1,6 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- from yaml import safe_load +from typing import Iterable from pathlib import Path import logging @@ -18,59 +19,127 @@ log.debug('path_stickers_root = %s', path_stickers_root) log.debug('path_stickers_data_dir = %s', path_stickers_data_dir) -def mktask(out_format, out_ext, ffmpeg_opts): - def task_stickers(): - log = logging.getLogger('stickers.actions.task_'+out_format) - log.setLevel(logging.DEBUG) - log.debug(f'{list(path_stickers_data_dir.glob("*.yaml"))}') - for sticker_description_file in path_stickers_data_dir.glob('*.yaml'): - ffmpeg_action = ['ffmpeg'] - file_dep = [ sticker_description_file ] - log.debug('input description file: %s', sticker_description_file) - with open(sticker_description_file) as yaml_file: - sticker_description = safe_load(yaml_file) - if sticker_description.get('disabled', False): +class StickerTasks: + defs: Iterable + dst_width: int + dst_height: int + dst_format: str + ffmpeg_opts: list[str] + + def __init__(self, + defs_src: Iterable | None = None, + dst_width: int = 512, + dst_height: int = 512, + dst_format: str = 'png', + dst_ext: str = 'png', + ffmpeg_opts: list[str] = list()): + if defs_src is None: + defs_src = path_stickers_data_dir.glob("*.yaml") + self._log = log.getChild(self.__class__.__name__) + self.defs = defs_src + self.dst_height = dst_height + self.dst_width = dst_width + self.dst_format = dst_format + self.dst_ext = dst_ext + self.ffmpeg_opts = ffmpeg_opts + + def create_doit_tasks(self=None): + log = self._log + if self is None: + log.debug('method called as class method, silently returning') + return + log.info('creating %s stickers with %s dimensions and %s opts', + self.dst_format, f"{self.dst_width}x{self.dst_height}", + self.ffmpeg_opts) + for def_filepath in self.defs: + log.debug("def file: %s", def_filepath) + with open(def_filepath) as stream: + sticker_info = safe_load(stream) + if sticker_info.get("disabled", False): + log.info("file %s is disabled, skipping", def_filepath) continue - log.debug('stuff %s', sticker_description) - out_filename = f'{sticker_description["name"]}.{out_ext}' - target_filename = path_stickers_output_dir / out_filename - if sticker_description['input']['base'].get('format','') not in ('lavfi', 'image2'): - file_dep.append(f'{path_stickers_source_dir/sticker_description["input"]["base"]["input"]}') - if sticker_description['input']['base'].get('format'): - ffmpeg_action.append('-f') - ffmpeg_action.append(sticker_description['input']['base']['format']) - if sticker_description['input']['base'].get('framerate'): - ffmpeg_action.append('-framerate') - ffmpeg_action.append(sticker_description['input']['base']['framerate']) - ffmpeg_action.append('-i') - ffmpeg_action.append(path_stickers_source_dir/sticker_description['input']['base']['input']) - ffmpeg_action.append('-vf') - filterspec = '' - if sticker_description['input'].get('colors',{}).get('transparent'): - transparent = sticker_description['input']['colors']['transparent'] - filterspec += 'geq=\'' - filterspec += f'r=if(lt(alpha(X,Y),128),{transparent[0]},r(X,Y)):' - filterspec += f'g=if(lt(alpha(X,Y),128),{transparent[1]},g(X,Y)):' - filterspec += f'b=if(lt(alpha(X,Y),128),{transparent[2]},b(X,Y)):' - filterspec += f'a=alpha(X,Y)\',' + yield self.create_doit_sticker_task(sticker_info, def_filepath) - out_width = sticker_description.get('output', {}).get('width', 512) - out_height = sticker_description.get('output', {}).get('height', 512) - filterspec += f'scale=w={out_width}:h={out_height}:force_original_aspect_ratio=decrease' - if sticker_description.get('output', {}).get('speed'): - filterspec += f',setpts={sticker_description['output']['speed']}' - ffmpeg_action.append(filterspec) - ffmpeg_action.append('-y') - ffmpeg_action += ffmpeg_opts - ffmpeg_action.append(target_filename) - yield { - 'actions': [ffmpeg_action], - 'name': out_filename, - 'targets': [target_filename], - 'file_dep': file_dep - } - return task_stickers + def create_doit_sticker_task(self, sticker_info, def_filepath): + # Handling logging + sticker_log = self._log.getChild(sticker_info['name']) + out_filename = f'{sticker_info["name"]}.{self.dst_ext}' + target_filename = path_stickers_output_dir / out_filename + sticker_log.debug('target filename: %s', target_filename) + # Handling dependencies + file_dep = [def_filepath] + try: + if ( + sticker_info['input']['base'].get('format', '') + not in ('lavfi', 'image2') + ): + base_input = path_stickers_source_dir / \ + sticker_info["input"]["base"]["input"] + file_dep.append(base_input) + sticker_log.debug("file_dep updated: %s", base_input) + except KeyError: + sticker_log.info("sticker does not have base file input") + base_input = None + # Handling ffmpeg invocation argument array + ffmpeg_argv = ['ffmpeg'] + # ffmpeg: input format + if sticker_info['input']['base'].get('format'): + ffmpeg_argv += ['-f', sticker_info['input']['base']['format']] + # ffmpeg: input frame rate + if sticker_info['input']['base'].get('framerate'): + ffmpeg_argv += ['-framerate', + sticker_info['input']['base']['framerate']] + # ffmpeg: input file itself + ffmpeg_argv += ['-i', base_input] + # ffmpeg: video filter + ffmpeg_argv.append('-vf') + filterspec = '' + if sticker_info['input'].get('colors', {}).get('transparent'): + transparent = sticker_info['input']['colors']['transparent'] + filterspec += 'geq=\'' + filterspec += f'r=if(lt(alpha(X,Y),128),{transparent[0]},r(X,Y)):' + filterspec += f'g=if(lt(alpha(X,Y),128),{transparent[1]},g(X,Y)):' + filterspec += f'b=if(lt(alpha(X,Y),128),{transparent[2]},b(X,Y)):' + filterspec += 'a=alpha(X,Y)\',' -task_png = mktask('png', 'png', ['-preset:v', 'icon', '-frames', '1']) -task_webm = mktask('webm', 'webm', ['-codec:v', 'libvpx-vp9', '-map', 'v', '-pix_fmt', 'yuva420p']) -task_webp = mktask('webp', 'webp', ['-lossless', '1']) + filterspec += f'scale=w={self.dst_width}:h={ + self.dst_height}:force_original_aspect_ratio=decrease' + if sticker_info.get('output', {}).get('speed'): + filterspec += f',setpts={ + sticker_info['output']['speed']}' + ffmpeg_argv.append(filterspec) + ffmpeg_argv.append('-y') + # ffmpeg: extra arguments + ffmpeg_argv += self.ffmpeg_opts + # ffmpeg: specify outut + ffmpeg_argv.append(target_filename) + sticker_log.debug('ffmpeg argv: %s', ffmpeg_argv) + ret = { + 'actions': [ffmpeg_argv], + 'name': out_filename, + 'targets': [target_filename], + 'file_dep': file_dep + } + sticker_log.debug('task spec: %s', ret) + return ret + + +png = StickerTasks(ffmpeg_opts=['-preset:v', 'icon', '-frames:v', '1']) +webm = StickerTasks( + dst_format='webm', + dst_ext='webm', + ffmpeg_opts=['-codec:v', 'libvpx-vp9', '-map', 'v', '-pix_fmt', 'yuva420p'] +) +webp = StickerTasks( + dst_format='webp', + dst_ext='webp', + ffmpeg_opts=['-lossless', '1', '-frames:v', '1'] +) + +emoji_webp = StickerTasks( + dst_format='webp', + dst_ext='emoji.webp', + ffmpeg_opts=['-lossless', '1', '-frames:v', '1'], + dst_height=100, + dst_width=100 +)