#!/usr/bin/env python # -*- coding: utf-8 -*- # This script relies on large code sections located at: # https://github.com/comfyanonymous/ComfyUI/blob/master/script_examples/websockets_api_example.py # Other code sections are (C) Copyright 2023, P. Lutus: https://www.arachnoid.com # and are released under the GPL: https://www.gnu.org/licenses/gpl-3.0.en.html # This is the ComfyUI version of the morph script # must pip install websocket-client import websocket import uuid import json import urllib.request import urllib.parse import random import io import time from PIL import Image from PIL.PngImagePlugin import PngInfo from dataclasses import dataclass import asyncio import os class MultiDraw: def __init__(self): self.server='localhost:8188' self.client_id = str(uuid.uuid4()) # acquire this JSON from ComfyUI user interface with "save (api format) button" # to make this option visible click gear icon then "enable dev mode options" json_text = """ { "3": { "inputs": { "seed": 333, "steps": 24, "cfg": 7, "sampler_name": "dpmpp_3m_sde_gpu", "scheduler": "karras", "denoise": 1, "model": [ "4", 0 ], "positive": [ "10", 0 ], "negative": [ "7", 0 ], "latent_image": [ "5", 0 ] }, "class_type": "KSampler" }, "4": { "inputs": { "ckpt_name": "sd_xl_base_1.0.safetensors" }, "class_type": "CheckpointLoaderSimple" }, "5": { "inputs": { "width": 1024, "height": 1024, "batch_size": 1 }, "class_type": "EmptyLatentImage" }, "6": { "inputs": { "text": "photorealistic detailed cat ", "clip": [ "4", 1 ] }, "class_type": "CLIPTextEncode" }, "7": { "inputs": { "text": "", "clip": [ "4", 1 ] }, "class_type": "CLIPTextEncode" }, "8": { "inputs": { "samples": [ "3", 0 ], "vae": [ "4", 2 ] }, "class_type": "VAEDecode" }, "9": { "inputs": { "filename_prefix": "ComfyUI", "images": [ "8", 0 ] }, "class_type": "SaveImage" }, "10": { "inputs": { "conditioning_to_strength": 0.5, "conditioning_to": [ "11", 0 ], "conditioning_from": [ "6", 0 ] }, "class_type": "ConditioningAverage " }, "11": { "inputs": { "text": "photorealistic detailed dog", "clip": [ "4", 1 ] }, "class_type": "CLIPTextEncode" } } """ self.json_prompt = json.loads(json_text) def queue_prompt(self,prompt): p = {"prompt": prompt, "client_id": self.client_id} data = json.dumps(p).encode('utf-8') req = urllib.request.Request(f"http://{self.server}/prompt", data=data) return json.loads(urllib.request.urlopen(req).read()) def get_image(self,filename, subfolder, folder_type): data = {"filename": filename, "subfolder": subfolder, "type": folder_type} url_values = urllib.parse.urlencode(data) with urllib.request.urlopen(f"http://{self.server}/view?{url_values}") as response: return response.read() def get_history(self,prompt_id): with urllib.request.urlopen(f"http://{self.server}/history/{prompt_id}") as response: return json.loads(response.read()) def get_images(self,ws, prompt): prompt_id = self.queue_prompt(prompt)['prompt_id'] output_images = {} while True: #print("looping in get_images ...") out = ws.recv() if isinstance(out, str): message = json.loads(out) if message['type'] == 'executing': data = message['data'] if data['node'] is None and data['prompt_id'] == prompt_id: break #Execution is done else: continue history = self.get_history(prompt_id)[prompt_id] prompt = history['prompt'][2] metadata = PngInfo() metadata.add_text('prompt',str(prompt)) for o in history['outputs']: for node_id in history['outputs']: node_output = history['outputs'][node_id] if 'images' in node_output: images_output = [] for image in node_output['images']: image_data = self.get_image(image['filename'], image['subfolder'], image['type']) images_output.append(image_data) output_images[node_id] = images_output return output_images, metadata def draw_save_morph(self, item, draw = True): filepath = f'{item.savepath}/output_index_{item.index:03d}.png' #print(f'Drawing {filepath} ...') if draw: # positive prompt A self.json_prompt["6"]["inputs"]["text"] = f"{item.preshared} {item.promptA} {item.postshared}" # positive prompt B self.json_prompt["11"]["inputs"]["text"] = f"{item.preshared} {item.promptB} {item.postshared}" # negative prompt self.json_prompt["7"]["inputs"]["text"] = item.negative_prompt # conditioning, a float between 0 and 1 self.json_prompt["10"]["inputs"]["conditioning_to_strength"] = f'{item.ratio:.3f}' self.json_prompt["3"]["inputs"]["seed"] = 333 self.json_prompt["3"]["inputs"]["cfg"] = 7 self.json_prompt["3"]["inputs"]["steps"] = 24 self.json_prompt["3"]["inputs"]["sampler_name"] = "dpm_2" self.json_prompt["3"]["inputs"]["scheduler"] = "karras" self.json_prompt["4"]["inputs"]["ckpt_name"] = "sd_xl_base_1.0.safetensors" self.json_prompt["5"]["inputs"]["height"] = "1024" self.json_prompt["5"]["inputs"]["width"] = "1024" ws = websocket.WebSocket() ws.connect(f"ws://{self.server}/ws?clientId={self.client_id}") images,metadata = self.get_images(ws, self.json_prompt) ws.close() for node_id in images: for image_data in images[node_id]: image = Image.open(io.BytesIO(image_data)) image.save(filepath, pnginfo = metadata) return filepath @dataclass class MorphImageData: savepath:str preshared:str promptA: str promptB: str postshared: str negative_prompt: str ratio: float index: int # interpolation function maps range { xa : xb } -> { ya : yb } def ntrp(x,xa,xb,ya,yb): return (x-xa) * (yb-ya) / (xb-xa) + ya def draw_morph(size,savepath, preshared,pa,pb,postshared = "", negative="", start=0.01, end=0.99): md = MultiDraw() os.system(f'mkdir {savepath} > /dev/null 2>&1') os.system(f'rm {savepath}/*.png > /dev/null 2>&1') for index in range(size): ratio = ntrp(index,0,size-1,start,end) item = MorphImageData(savepath,preshared,pa,pb,postshared,negative,ratio,index) start_time = time.time() # note an available draw argument: True/False # this saves time until prompt # debugging is complete md.draw_save_morph(item) duration = float(time.time() - start_time) print(f'{savepath}: index {index:03d} ratio:{ratio:7.3f} processing time: {duration:7.03f} sec.') #break # Cat/Dog example: #draw_morph(10,"cat_dog","hyper-realistic","cat","dog","in forest setting")