|
|
"""Reachy Mini Controller - Fun, Queue-Based Control Interface""" |
|
|
|
|
|
import sys |
|
|
import subprocess |
|
|
import threading |
|
|
import time |
|
|
from dataclasses import dataclass |
|
|
from typing import List, Optional |
|
|
|
|
|
import gradio as gr |
|
|
import numpy as np |
|
|
import cv2 |
|
|
|
|
|
from reachy_mini import ReachyMini |
|
|
from reachy_mini.daemon.backend.mujoco.video_tcp import TCPJPEGFrameClient |
|
|
from reachy_mini.utils import create_head_pose |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class Movement: |
|
|
name: str |
|
|
x: float = 0 |
|
|
y: float = 0 |
|
|
z: float = 0 |
|
|
roll: float = 0 |
|
|
pitch: float = 0 |
|
|
yaw: float = 0 |
|
|
left_antenna: Optional[float] = None |
|
|
right_antenna: Optional[float] = None |
|
|
duration: float = 1.0 |
|
|
|
|
|
|
|
|
|
|
|
PRESET_MOVEMENTS = { |
|
|
"Home": Movement("Home", 0, 0, 0, 0, 0, 0, 0, 0), |
|
|
"Look Left": Movement("Look Left", 0, 0, 0, 0, 0, -30), |
|
|
"Look Right": Movement("Look Right", 0, 0, 0, 0, 0, 30), |
|
|
"Look Up": Movement("Look Up", 0, 0, 0, 0, -20, 0), |
|
|
"Look Down": Movement("Look Down", 0, 0, 0, 0, 15, 0), |
|
|
"Tilt Left": Movement("Tilt Left", 0, 0, 0, -20, 0, 0), |
|
|
"Tilt Right": Movement("Tilt Right", 0, 0, 0, 20, 0, 0), |
|
|
"Curious": Movement("Curious", 10, 0, 10, 15, -10, -15, 45, -45), |
|
|
"Excited": Movement("Excited", 0, 0, 20, 0, -15, 0, 90, 90), |
|
|
"Shy": Movement("Shy", -10, 0, -10, 10, 10, 20, -30, 30), |
|
|
} |
|
|
|
|
|
|
|
|
PRESET_SEQUENCES = { |
|
|
"Wave": ["Home", "Look Left", "Look Right", "Look Left", "Look Right", "Home"], |
|
|
"Nod": ["Home", "Look Down", "Look Up", "Look Down", "Home"], |
|
|
"Excited Dance": ["Home", "Excited", "Tilt Left", "Tilt Right", "Tilt Left", "Home"], |
|
|
"Look Around": ["Home", "Look Left", "Look Up", "Look Right", "Look Down", "Home"], |
|
|
"Curious Peek": ["Home", "Curious", "Look Right", "Look Left", "Home"], |
|
|
} |
|
|
|
|
|
|
|
|
class ReachyController: |
|
|
def __init__(self): |
|
|
self.daemon_process = None |
|
|
self.reachy_mini = None |
|
|
self.receiver = None |
|
|
self.frame_thread = None |
|
|
self.running = False |
|
|
self.frame_received_event = threading.Event() |
|
|
self.current_frame = np.zeros((1080, 1080, 3), dtype=np.uint8) |
|
|
|
|
|
self.movement_queue: List[Movement] = [] |
|
|
self.is_playing = False |
|
|
self.playback_speed = 1.0 |
|
|
self.play_thread = None |
|
|
self.auto_play = True |
|
|
|
|
|
|
|
|
self.connection_mode = "external" |
|
|
self.external_ip = "192.168.1.100" |
|
|
|
|
|
def start_daemon(self): |
|
|
"""Start the Reachy Mini daemon (local mode only)""" |
|
|
try: |
|
|
if self.connection_mode != "local": |
|
|
return "ℹ️ External mode: daemon should be running on remote host" |
|
|
|
|
|
if self.daemon_process is not None: |
|
|
return "⚠️ Daemon already running" |
|
|
|
|
|
python_cmd = "mjpython" if sys.platform == "darwin" else sys.executable |
|
|
self.daemon_process = subprocess.Popen( |
|
|
[python_cmd, "-m", "reachy_mini.daemon.app.main", "--sim", |
|
|
"--scene", "minimal", "--headless", "--stream-robot-view", |
|
|
"--log-file", "daemon.log", "--deactivate-audio"], |
|
|
stdout=subprocess.PIPE, |
|
|
stderr=subprocess.PIPE, |
|
|
text=True |
|
|
) |
|
|
|
|
|
self.start_frame_listener() |
|
|
frame_received = self.frame_received_event.wait(timeout=10) |
|
|
|
|
|
if self.daemon_process.poll() is not None: |
|
|
return "❌ Daemon failed to start" |
|
|
|
|
|
return "✅ Daemon started" if frame_received else "⚠️ Daemon started (no video)" |
|
|
except Exception as e: |
|
|
return f"❌ Daemon error: {str(e)}" |
|
|
|
|
|
def start_frame_listener(self): |
|
|
if self.receiver is None: |
|
|
server_ip = self.external_ip if self.connection_mode == "external" else "127.0.0.1" |
|
|
self.receiver = TCPJPEGFrameClient(server_ip=server_ip, server_port=5010) |
|
|
|
|
|
if self.frame_thread and self.frame_thread.is_alive(): |
|
|
return |
|
|
|
|
|
self.running = True |
|
|
self.frame_thread = threading.Thread(target=self._recv_loop, daemon=True) |
|
|
self.frame_thread.start() |
|
|
|
|
|
def _recv_loop(self): |
|
|
import time |
|
|
|
|
|
num_frames = 0 |
|
|
last_log = time.time() |
|
|
print("[ReachyController] _recv_loop started") |
|
|
while self.running: |
|
|
try: |
|
|
frame = self.receiver.recv_frame() |
|
|
except Exception as e: |
|
|
print(f"[ReachyController] Error in recv_frame: {e}") |
|
|
continue |
|
|
|
|
|
if frame is not None: |
|
|
num_frames += 1 |
|
|
|
|
|
resized_frame = cv2.resize(frame, (1080, 1080), interpolation=cv2.INTER_CUBIC) |
|
|
self.current_frame = resized_frame |
|
|
self.frame_received_event.set() |
|
|
|
|
|
now = time.time() |
|
|
if now - last_log > 5: |
|
|
print(f"[ReachyController] FPS in last 5s: {num_frames/(now - last_log):.1f}") |
|
|
num_frames = 0 |
|
|
last_log = now |
|
|
|
|
|
|
|
|
def stream_frames(self): |
|
|
while True: |
|
|
yield self.current_frame |
|
|
time.sleep(0.04) |
|
|
|
|
|
def initialize_robot(self): |
|
|
"""Initialize robot connection""" |
|
|
try: |
|
|
if self.reachy_mini is not None: |
|
|
return "⚠️ Robot already connected" |
|
|
|
|
|
|
|
|
localhost_only = self.connection_mode == "local" |
|
|
external_ip = self.external_ip if self.connection_mode == "external" else None |
|
|
|
|
|
|
|
|
print(f"[DEBUG] Initializing robot with:") |
|
|
print(f" connection_mode: {self.connection_mode}") |
|
|
print(f" localhost_only: {localhost_only}") |
|
|
print(f" external_ip: {external_ip}") |
|
|
|
|
|
|
|
|
timeout = 15.0 if self.connection_mode == "external" else 5.0 |
|
|
|
|
|
self.reachy_mini = ReachyMini( |
|
|
media_backend="default_no_video", |
|
|
localhost_only=localhost_only, |
|
|
external_ip=external_ip, |
|
|
timeout=timeout |
|
|
) |
|
|
mode_text = "local" if localhost_only else f"external ({self.external_ip})" |
|
|
return f"✅ Robot connected ({mode_text}) - Ready to control!" |
|
|
except Exception as e: |
|
|
return f"❌ Connection failed: {str(e)}" |
|
|
|
|
|
def auto_start(self): |
|
|
"""Auto-start daemon and robot connection""" |
|
|
status_msgs = [] |
|
|
|
|
|
|
|
|
if self.connection_mode == "local": |
|
|
msg = self.start_daemon() |
|
|
status_msgs.append(msg) |
|
|
yield "\n".join(status_msgs) |
|
|
else: |
|
|
|
|
|
self.start_frame_listener() |
|
|
status_msgs.append(f"ℹ️ External mode: connecting to {self.external_ip}") |
|
|
yield "\n".join(status_msgs) |
|
|
|
|
|
|
|
|
msg = self.initialize_robot() |
|
|
status_msgs.append(msg) |
|
|
yield "\n".join(status_msgs) |
|
|
|
|
|
def restart_system(self): |
|
|
"""Restart daemon and robot connection""" |
|
|
yield from self.stop_all() |
|
|
yield "🔄 Restarting..." |
|
|
yield from self.auto_start() |
|
|
|
|
|
def stop_all(self): |
|
|
"""Stop everything""" |
|
|
self.is_playing = False |
|
|
|
|
|
if self.daemon_process: |
|
|
self.daemon_process.terminate() |
|
|
self.daemon_process.wait(timeout=5) |
|
|
self.daemon_process = None |
|
|
|
|
|
self.running = False |
|
|
|
|
|
if self.frame_thread: |
|
|
self.frame_thread.join(timeout=2) |
|
|
self.frame_thread = None |
|
|
|
|
|
if self.receiver: |
|
|
self.receiver.close() |
|
|
time.sleep(0.5) |
|
|
self.receiver = None |
|
|
|
|
|
if self.reachy_mini: |
|
|
try: |
|
|
self.reachy_mini.__exit__(None, None, None) |
|
|
except: |
|
|
pass |
|
|
self.reachy_mini = None |
|
|
|
|
|
return "✅ Stopped" |
|
|
|
|
|
def add_to_queue(self, movement_name, x, y, z, roll, pitch, yaw, |
|
|
left_ant, right_ant, duration): |
|
|
"""Add a movement to the queue""" |
|
|
movement = Movement( |
|
|
name=movement_name or f"Custom {len(self.movement_queue) + 1}", |
|
|
x=x, y=y, z=z, |
|
|
roll=roll, pitch=pitch, yaw=yaw, |
|
|
left_antenna=left_ant, |
|
|
right_antenna=right_ant, |
|
|
duration=duration |
|
|
) |
|
|
self.movement_queue.append(movement) |
|
|
|
|
|
|
|
|
if self.auto_play and not self.is_playing: |
|
|
self._start_auto_play() |
|
|
|
|
|
return self.format_queue(), f"✅ Added: {movement.name}" |
|
|
|
|
|
def add_preset(self, preset_name): |
|
|
"""Add a preset movement to queue""" |
|
|
if preset_name not in PRESET_MOVEMENTS: |
|
|
return self.format_queue(), f"❌ Unknown preset: {preset_name}" |
|
|
|
|
|
self.movement_queue.append(PRESET_MOVEMENTS[preset_name]) |
|
|
|
|
|
|
|
|
if self.auto_play and not self.is_playing: |
|
|
self._start_auto_play() |
|
|
|
|
|
return self.format_queue(), f"✅ Added: {preset_name}" |
|
|
|
|
|
def add_sequence(self, sequence_name): |
|
|
"""Add a preset sequence to queue""" |
|
|
if sequence_name not in PRESET_SEQUENCES: |
|
|
return self.format_queue(), f"❌ Unknown sequence" |
|
|
|
|
|
for preset_name in PRESET_SEQUENCES[sequence_name]: |
|
|
self.movement_queue.append(PRESET_MOVEMENTS[preset_name]) |
|
|
|
|
|
|
|
|
if self.auto_play and not self.is_playing: |
|
|
self._start_auto_play() |
|
|
|
|
|
return self.format_queue(), f"✅ Added sequence: {sequence_name}" |
|
|
|
|
|
def clear_queue(self): |
|
|
"""Clear the movement queue""" |
|
|
self.movement_queue.clear() |
|
|
self.is_playing = False |
|
|
return self.format_queue(), "🗑️ Queue cleared" |
|
|
|
|
|
def remove_last(self): |
|
|
"""Remove last movement from queue""" |
|
|
if self.movement_queue: |
|
|
removed = self.movement_queue.pop() |
|
|
return self.format_queue(), f"🗑️ Removed: {removed.name}" |
|
|
return self.format_queue(), "⚠️ Queue is empty" |
|
|
|
|
|
def format_queue(self): |
|
|
"""Format queue for display""" |
|
|
if not self.movement_queue: |
|
|
return "📋 Queue is empty\n\nAdd movements using presets or custom controls" |
|
|
|
|
|
lines = ["📋 Movement Queue:\n"] |
|
|
total_duration = 0 |
|
|
|
|
|
for i, mov in enumerate(self.movement_queue, 1): |
|
|
total_duration += mov.duration |
|
|
emoji = "▶️" if i == 1 else "⏸️" |
|
|
|
|
|
|
|
|
head_str = f"Head: x={mov.x:.0f} y={mov.y:.0f} z={mov.z:.0f} r={mov.roll:.0f}° p={mov.pitch:.0f}° y={mov.yaw:.0f}°" |
|
|
|
|
|
|
|
|
ant_str = "" |
|
|
if mov.left_antenna is not None and mov.right_antenna is not None: |
|
|
ant_str = f"\n Antennas: L={mov.left_antenna:.0f}° R={mov.right_antenna:.0f}°" |
|
|
|
|
|
lines.append( |
|
|
f"{emoji} {i}. {mov.name} ({mov.duration}s)\n" |
|
|
f" {head_str}{ant_str}" |
|
|
) |
|
|
|
|
|
lines.append(f"\n⏱️ Total duration: {total_duration:.1f}s") |
|
|
lines.append(f"{'🔄 Auto-play: ON' if self.auto_play else '⏸️ Auto-play: OFF'}") |
|
|
return "\n".join(lines) |
|
|
|
|
|
def play_queue(self, speed): |
|
|
"""Execute the movement queue""" |
|
|
if not self.movement_queue: |
|
|
return self.format_queue(), "⚠️ Queue is empty" |
|
|
|
|
|
if self.reachy_mini is None: |
|
|
return self.format_queue(), "❌ Robot not initialized" |
|
|
|
|
|
if self.is_playing: |
|
|
return self.format_queue(), "⚠️ Already playing" |
|
|
|
|
|
self.playback_speed = speed |
|
|
self.is_playing = True |
|
|
self.play_thread = threading.Thread(target=self._play_loop, daemon=True) |
|
|
self.play_thread.start() |
|
|
|
|
|
return self.format_queue(), f"▶️ Playing at {speed}x speed..." |
|
|
|
|
|
def _play_loop(self): |
|
|
"""Background thread to execute movements""" |
|
|
try: |
|
|
current_index = 0 |
|
|
while self.is_playing: |
|
|
|
|
|
if current_index < len(self.movement_queue): |
|
|
movement = self.movement_queue[current_index] |
|
|
|
|
|
|
|
|
pose = create_head_pose( |
|
|
x=movement.x, y=movement.y, z=movement.z, |
|
|
roll=movement.roll, pitch=movement.pitch, yaw=movement.yaw, |
|
|
degrees=True, mm=True |
|
|
) |
|
|
|
|
|
|
|
|
actual_duration = movement.duration / self.playback_speed |
|
|
|
|
|
|
|
|
if movement.left_antenna is not None and movement.right_antenna is not None: |
|
|
self.reachy_mini.goto_target( |
|
|
head=pose, |
|
|
antennas=[ |
|
|
np.deg2rad(movement.right_antenna), |
|
|
np.deg2rad(movement.left_antenna) |
|
|
], |
|
|
duration=actual_duration |
|
|
) |
|
|
else: |
|
|
self.reachy_mini.goto_target(head=pose, duration=actual_duration) |
|
|
|
|
|
current_index += 1 |
|
|
else: |
|
|
|
|
|
if not self.auto_play: |
|
|
break |
|
|
|
|
|
time.sleep(0.1) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error during playback: {e}") |
|
|
finally: |
|
|
self.is_playing = False |
|
|
|
|
|
def _start_auto_play(self): |
|
|
"""Start auto-play mode""" |
|
|
if self.reachy_mini is None: |
|
|
return |
|
|
|
|
|
if not self.is_playing: |
|
|
self.is_playing = True |
|
|
self.play_thread = threading.Thread(target=self._play_loop, daemon=True) |
|
|
self.play_thread.start() |
|
|
|
|
|
def toggle_auto_play(self, enabled): |
|
|
"""Toggle auto-play mode""" |
|
|
self.auto_play = enabled |
|
|
|
|
|
if self.auto_play and self.movement_queue and not self.is_playing: |
|
|
self._start_auto_play() |
|
|
|
|
|
return self.format_queue(), f"{'🔄 Auto-play enabled' if enabled else '⏸️ Auto-play disabled'}" |
|
|
|
|
|
def update_speed(self, speed): |
|
|
"""Update playback speed in real-time""" |
|
|
self.playback_speed = speed |
|
|
return f"⚡ Speed: {speed}x" |
|
|
|
|
|
def stop_playback(self): |
|
|
"""Stop current playback""" |
|
|
self.is_playing = False |
|
|
if self.play_thread: |
|
|
self.play_thread.join(timeout=2) |
|
|
|
|
|
|
|
|
msg = "⏹️ Stopped" |
|
|
if self.auto_play: |
|
|
msg += " (auto-play still enabled)" |
|
|
|
|
|
return self.format_queue(), msg |
|
|
|
|
|
def set_connection_mode(self, mode, external_ip=None): |
|
|
"""Set connection mode and optionally external IP""" |
|
|
self.connection_mode = mode |
|
|
print(f"External IP: {external_ip}") |
|
|
if external_ip: |
|
|
self.external_ip = external_ip |
|
|
|
|
|
yield f"🔄 Connection mode set to: {mode}\n🔄 Restarting..." |
|
|
yield from self.restart_system() |
|
|
|
|
|
|
|
|
|
|
|
manager = ReachyController() |
|
|
|
|
|
|
|
|
with gr.Blocks(title="Reachy Controller", theme=gr.themes.Soft()) as demo: |
|
|
gr.Markdown("# 🤖 Reachy Mini Controller") |
|
|
gr.Markdown("Create fun movement sequences for your robot!") |
|
|
|
|
|
with gr.Row(): |
|
|
|
|
|
with gr.Column(scale=2): |
|
|
|
|
|
gr.Markdown("### 🔌 Connection Settings") |
|
|
connection_mode = gr.Radio( |
|
|
choices=["local", "external"], |
|
|
value="external", |
|
|
label="Connection Mode", |
|
|
info="Local: Launch daemon locally | External: Connect to remote daemon" |
|
|
) |
|
|
external_ip_input = gr.Textbox( |
|
|
label="External IP or Hostname", |
|
|
value="192.168.1.100", |
|
|
placeholder="192.168.1.100 or 8.tcp.ngrok.io:18951", |
|
|
visible=True, |
|
|
info="IP/hostname of remote daemon. Supports custom ports (e.g., ngrok.io:18951)" |
|
|
) |
|
|
|
|
|
def toggle_external_ip_visibility(mode): |
|
|
return gr.update(visible=(mode == "external")) |
|
|
|
|
|
connection_mode.change( |
|
|
fn=toggle_external_ip_visibility, |
|
|
inputs=[connection_mode], |
|
|
outputs=[external_ip_input] |
|
|
) |
|
|
|
|
|
apply_connection_btn = gr.Button("✅ Apply Connection Settings", variant="primary", size="sm") |
|
|
|
|
|
|
|
|
gr.Markdown("### 📡 System Status") |
|
|
status = gr.Textbox( |
|
|
label="Status", |
|
|
lines=3, |
|
|
interactive=False, |
|
|
value="🔄 Initializing system..." |
|
|
) |
|
|
restart_btn = gr.Button("🔄 Restart System", variant="secondary", size="sm") |
|
|
|
|
|
gr.Markdown("### 🎮 Playback Controls") |
|
|
|
|
|
auto_play_toggle = gr.Checkbox( |
|
|
label="🔄 Auto-play", |
|
|
value=True, |
|
|
info="Execute movements automatically when added" |
|
|
) |
|
|
|
|
|
speed_slider = gr.Slider( |
|
|
0.25, 3.0, 1.0, |
|
|
label="⚡ Speed Multiplier", |
|
|
info="Adjust playback speed" |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
play_btn = gr.Button("▶️ Play All", variant="primary", scale=2) |
|
|
stop_play_btn = gr.Button("⏹️ Stop", scale=1) |
|
|
|
|
|
with gr.Row(): |
|
|
clear_btn = gr.Button("🗑️ Clear All") |
|
|
remove_btn = gr.Button("↶ Remove Last") |
|
|
|
|
|
|
|
|
queue_display = gr.Textbox( |
|
|
label="📋 Movement Queue", |
|
|
lines=20, |
|
|
interactive=False, |
|
|
value=manager.format_queue() |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Column(scale=3): |
|
|
sim_view = gr.Image( |
|
|
label="🎬 Robot Simulation", |
|
|
type="numpy", |
|
|
height=1080, |
|
|
width=1080, |
|
|
show_label=True |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
gr.Markdown("### 🎨 Quick Presets") |
|
|
|
|
|
with gr.Row(): |
|
|
preset_btns = [] |
|
|
for preset in list(PRESET_MOVEMENTS.keys())[:5]: |
|
|
btn = gr.Button(preset, size="sm") |
|
|
preset_btns.append((btn, preset)) |
|
|
|
|
|
with gr.Row(): |
|
|
for preset in list(PRESET_MOVEMENTS.keys())[5:]: |
|
|
btn = gr.Button(preset, size="sm") |
|
|
preset_btns.append((btn, preset)) |
|
|
|
|
|
with gr.Column(): |
|
|
gr.Markdown("### 🎬 Sequences") |
|
|
with gr.Row(): |
|
|
sequence_dropdown = gr.Dropdown( |
|
|
choices=list(PRESET_SEQUENCES.keys()), |
|
|
label="Select Sequence", |
|
|
value=None, |
|
|
scale=3 |
|
|
) |
|
|
add_seq_btn = gr.Button("➕ Add", scale=1) |
|
|
|
|
|
|
|
|
with gr.Accordion("🎯 Custom Movement Builder", open=False): |
|
|
custom_name = gr.Textbox(label="Movement Name", placeholder="My Move") |
|
|
|
|
|
with gr.Row(): |
|
|
x = gr.Slider(-50, 50, 0, label="X (mm)", step=5) |
|
|
y = gr.Slider(-50, 50, 0, label="Y (mm)", step=5) |
|
|
z = gr.Slider(-20, 50, 0, label="Z (mm)", step=5) |
|
|
|
|
|
with gr.Row(): |
|
|
roll = gr.Slider(-30, 30, 0, label="Roll (°)", step=5) |
|
|
pitch = gr.Slider(-30, 30, 0, label="Pitch (°)", step=5) |
|
|
yaw = gr.Slider(-45, 45, 0, label="Yaw (°)", step=5) |
|
|
|
|
|
with gr.Row(): |
|
|
left_ant = gr.Slider(-180, 180, 0, label="Left Antenna (°)", step=15) |
|
|
right_ant = gr.Slider(-180, 180, 0, label="Right Antenna (°)", step=15) |
|
|
|
|
|
duration = gr.Slider(0.3, 3.0, 1.0, label="Duration (s)", step=0.1) |
|
|
|
|
|
add_custom_btn = gr.Button("➕ Add to Queue", variant="primary") |
|
|
|
|
|
|
|
|
demo.load(fn=manager.stream_frames, outputs=sim_view) |
|
|
|
|
|
|
|
|
apply_connection_btn.click( |
|
|
fn=manager.set_connection_mode, |
|
|
inputs=[connection_mode, external_ip_input], |
|
|
outputs=[status] |
|
|
) |
|
|
|
|
|
|
|
|
restart_btn.click(fn=manager.restart_system, outputs=[status]) |
|
|
|
|
|
|
|
|
auto_play_toggle.change( |
|
|
fn=manager.toggle_auto_play, |
|
|
inputs=[auto_play_toggle], |
|
|
outputs=[queue_display, status] |
|
|
) |
|
|
|
|
|
speed_slider.change( |
|
|
fn=manager.update_speed, |
|
|
inputs=[speed_slider], |
|
|
outputs=[status] |
|
|
) |
|
|
|
|
|
play_btn.click( |
|
|
fn=manager.play_queue, |
|
|
inputs=[speed_slider], |
|
|
outputs=[queue_display, status] |
|
|
) |
|
|
stop_play_btn.click(fn=manager.stop_playback, outputs=[queue_display, status]) |
|
|
clear_btn.click(fn=manager.clear_queue, outputs=[queue_display, status]) |
|
|
remove_btn.click(fn=manager.remove_last, outputs=[queue_display, status]) |
|
|
|
|
|
|
|
|
for btn, preset_name in preset_btns: |
|
|
btn.click( |
|
|
fn=lambda p=preset_name: manager.add_preset(p), |
|
|
outputs=[queue_display, status] |
|
|
) |
|
|
|
|
|
|
|
|
add_seq_btn.click( |
|
|
fn=manager.add_sequence, |
|
|
inputs=[sequence_dropdown], |
|
|
outputs=[queue_display, status] |
|
|
) |
|
|
|
|
|
|
|
|
add_custom_btn.click( |
|
|
fn=manager.add_to_queue, |
|
|
inputs=[custom_name, x, y, z, roll, pitch, yaw, left_ant, right_ant, duration], |
|
|
outputs=[queue_display, status] |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch(server_name="0.0.0.0", server_port=7860) |