From 753a4078ad6309ad35a88f6ef9da14c47892b817 Mon Sep 17 00:00:00 2001 From: m5rcel { Marcel } Date: Sat, 15 Nov 2025 21:20:50 +0100 Subject: [PATCH] Update bytebeat_play.py --- bytebeat_play.py | 477 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 376 insertions(+), 101 deletions(-) diff --git a/bytebeat_play.py b/bytebeat_play.py index 5b62170..01360b8 100644 --- a/bytebeat_play.py +++ b/bytebeat_play.py @@ -1,113 +1,388 @@ from __future__ import annotations -import ast, argparse, math, sys, wave, struct, subprocess, tempfile, os, platform +import ast +import argparse +import math +import sys +import wave +import struct +import subprocess +import tempfile +import os +import platform +from typing import Callable, Iterator, Tuple, List, Union, TYPE_CHECKING, Any + +if TYPE_CHECKING: + import numpy as np +else: + np = Any + try: - import numpy as _np + import numpy as np + NUMPY_AVAILABLE = True except ImportError: - _np = None + np = None + NUMPY_AVAILABLE = False + try: - import sounddevice as _sd + import sounddevice as sd + SOUNDDEVICE_AVAILABLE = True except ImportError: - _sd = None -if platform.system().lower()=="windows": - import winsound as _ws + sd = None + SOUNDDEVICE_AVAILABLE = False + +if platform.system().lower() == "windows": + import winsound os.system("") -_Z = {ast.Add,ast.Sub,ast.Mult,ast.Div,ast.FloorDiv,ast.Mod,ast.LShift,ast.RShift,ast.BitOr,ast.BitAnd,ast.BitXor,ast.Pow} -_Y = {ast.UAdd,ast.USub,ast.Invert,ast.Not} -_C = {"r":"\033[0m","g":"\033[92m","c":"\033[96m","y":"\033[93m","R":"\033[91m","B":"\033[1m"} -def _a(n): - if isinstance(n,ast.Expression): _a(n.body); return - if isinstance(n,ast.BinOp): - if type(n.op) not in _Z: raise ValueError(type(n.op).__name__) - _a(n.left); _a(n.right); return - if isinstance(n,ast.UnaryOp): - if type(n.op) not in _Y: raise ValueError(type(n.op).__name__) - _a(n.operand); return - if isinstance(n,ast.Constant): - if not isinstance(n.value,(int,float)): raise ValueError("c") - return - if isinstance(n,ast.Name): - if n.id!='t': raise ValueError(n.id) - return - if isinstance(n,(ast.Call,ast.Attribute,ast.Compare,ast.BoolOp,ast.IfExp)): raise ValueError(type(n).__name__) - if not isinstance(n,(ast.Load,)): raise ValueError(type(n).__name__) -def _b(s): - s=s.replace("/","//"); p=ast.parse(s,mode='eval'); _a(p); co=compile(p,'','eval') - def _e(t): - L={'t':int(t)} + + +class Colors: + RESET = "\033[0m" + BOLD = "\033[1m" + DIM = "\033[2m" + + BLACK = "\033[30m" + RED = "\033[91m" + GREEN = "\033[92m" + YELLOW = "\033[93m" + BLUE = "\033[94m" + MAGENTA = "\033[95m" + CYAN = "\033[96m" + WHITE = "\033[97m" + + BG_BLACK = "\033[40m" + BG_RED = "\033[41m" + BG_GREEN = "\033[42m" + BG_YELLOW = "\033[43m" + BG_BLUE = "\033[44m" + BG_MAGENTA = "\033[45m" + BG_CYAN = "\033[46m" + BG_WHITE = "\033[47m" + + @classmethod + def gradient_bar(cls, progress: float, width: int = 40) -> str: + filled = int(width * progress) + empty = width - filled + bar = f"{cls.CYAN}{'█' * filled}{cls.DIM}{'░' * empty}{cls.RESET}" + return bar + + +class BytebeatValidator: + + ALLOWED_BINARY_OPS = { + ast.Add, ast.Sub, ast.Mult, ast.Div, ast.FloorDiv, + ast.Mod, ast.LShift, ast.RShift, ast.BitOr, + ast.BitAnd, ast.BitXor, ast.Pow + } + + ALLOWED_UNARY_OPS = { + ast.UAdd, ast.USub, ast.Invert, ast.Not + } + + @staticmethod + def validate_ast_node(node: ast.AST) -> None: + if isinstance(node, ast.Expression): + BytebeatValidator.validate_ast_node(node.body) + return + + if isinstance(node, ast.BinOp): + if type(node.op) not in BytebeatValidator.ALLOWED_BINARY_OPS: + raise ValueError(f"Unsupported binary operator: {type(node.op).__name__}") + BytebeatValidator.validate_ast_node(node.left) + BytebeatValidator.validate_ast_node(node.right) + return + + if isinstance(node, ast.UnaryOp): + if type(node.op) not in BytebeatValidator.ALLOWED_UNARY_OPS: + raise ValueError(f"Unsupported unary operator: {type(node.op).__name__}") + BytebeatValidator.validate_ast_node(node.operand) + return + + if isinstance(node, ast.Constant): + if not isinstance(node.value, (int, float)): + raise ValueError(f"Unsupported constant type: {type(node.value).__name__}") + return + + if isinstance(node, ast.Name): + if node.id != 't': + raise ValueError(f"Only variable 't' is allowed, found: {node.id}") + return + + if isinstance(node, (ast.Call, ast.Attribute, ast.Compare, ast.BoolOp, ast.IfExp)): + raise ValueError(f"Unsupported operation: {type(node).__name__}") + + if not isinstance(node, (ast.Load,)): + raise ValueError(f"Unsupported node type: {type(node).__name__}") + + @staticmethod + def compile_expression(expression: str) -> Callable[[int], int]: + expression = expression.replace("/", "//") + + parsed = ast.parse(expression, mode='eval') + BytebeatValidator.validate_ast_node(parsed) + + compiled_code = compile(parsed, '', 'eval') + + def evaluate(t: int) -> int: + local_vars = {'t': int(t)} + try: + result = eval(compiled_code, {"__builtins__": None, 'math': math}, local_vars) + return int(result) + except Exception: + return 0 + + return evaluate + + +class BytebeatGenerator: + @staticmethod + def generate_chunks( + func: Callable[[int], int], + duration: float = 10.0, + sample_rate: int = 8000, + time_offset: int = 0 + ) -> Iterator[Tuple[float, Union[Any, List[int]]]]: + chunk_size = sample_rate // 4 + total_samples = int(duration * sample_rate) + total_chunks = total_samples // chunk_size + + if NUMPY_AVAILABLE: + for chunk_idx in range(total_chunks): + base_time = time_offset + chunk_idx * chunk_size + chunk = np.empty(chunk_size, dtype=np.int16) + + for sample_idx in range(chunk_size): + value = func(base_time + sample_idx) + chunk[sample_idx] = np.int16(((int(value) & 0xFF) - 128) * 256) + + progress = (chunk_idx + 1) / float(total_chunks) + yield progress, chunk + else: + for chunk_idx in range(total_chunks): + base_time = time_offset + chunk_idx * chunk_size + chunk = [((int(func(base_time + j)) & 0xFF) - 128) * 256 for j in range(chunk_size)] + + progress = (chunk_idx + 1) / float(total_chunks) + yield progress, chunk + + +class AudioPlayer: + @staticmethod + def play_realtime( + func: Callable[[int], int], + duration: float, + sample_rate: int, + time_offset: int + ) -> None: + if not SOUNDDEVICE_AVAILABLE or not NUMPY_AVAILABLE: + print(f"{Colors.YELLOW}⚠ sounddevice/numpy not installed — using WAV fallback{Colors.RESET}") + AudioPlayer.play_wav_fallback(func, duration, sample_rate, time_offset) + return + + print(f"\n{Colors.BOLD}{Colors.GREEN}▶ Playing live audio... {Colors.DIM}(Press Ctrl+C to stop){Colors.RESET}") + + stream = sd.OutputStream(samplerate=sample_rate, channels=1, dtype='float32') + stream.start() + try: - r=eval(co,{"__builtins__":None,'math':math},L); return int(r) - except Exception: - return 0 - return _e -def _g(f,d=10.0,sr=8000,t0=0): - cs=sr//4; n=int(d*sr); tc=n//cs - if _np is not None: - for i in range(tc): - b=t0+i*cs; a=_np.empty(cs,dtype=_np.int16) - for j in range(cs): - v=f(b+j); a[j]=_np.int16(((int(v)&0xFF)-128)*256) - yield i/float(tc),a - else: - for i in range(tc): - b=t0+i*cs - ch=[((int(f(b+j))&0xFF)-128)*256 for j in range(cs)] - yield i/float(tc),ch -def _p(f,d,sr,t0): - if _sd is None or _np is None: - print(f"{_C['y']}sounddevice not installed — using WAV fallback.{_C['r']}") - allv=[] - try: - for pr,ch in _g(f,d,sr,t0): - allv.extend(ch) - bar="█"*int(40*pr)+"-"*int(40*(1-pr)) - print(f"\r{_C['c']}[ {bar} ] {int(pr*100)}%{_C['r']}",end="",flush=True) + for progress, chunk in BytebeatGenerator.generate_chunks(func, duration, sample_rate, time_offset): + bar = Colors.gradient_bar(progress) + percentage = int(progress * 100) + print(f"\r{Colors.CYAN}Progress: {Colors.RESET}[ {bar} {Colors.CYAN}] {percentage}%{Colors.RESET}", end="", flush=True) + + stream.write(chunk.astype('float32') / 32768.0) + except KeyboardInterrupt: - print(f"\n{_C['R']}Stopped by user!{_C['r']}") - print() - _w(allv,sr); return - print(f"{_C['B']}{_C['g']}▶ Playing live... Press Ctrl+C to stop.{_C['r']}") - st=_sd.OutputStream(samplerate=sr,channels=1,dtype='float32'); st.start() + print(f"\n\n{Colors.YELLOW}⏹ Playback interrupted by user{Colors.RESET}") + + finally: + stream.stop() + stream.close() + print(f"\n{Colors.GREEN}✓ Playback complete{Colors.RESET}\n") + + @staticmethod + def play_wav_fallback( + func: Callable[[int], int], + duration: float, + sample_rate: int, + time_offset: int + ) -> None: + print(f"\n{Colors.CYAN}Rendering audio...{Colors.RESET}") + all_samples = [] + + try: + for progress, chunk in BytebeatGenerator.generate_chunks(func, duration, sample_rate, time_offset): + all_samples.extend(chunk) + bar = Colors.gradient_bar(progress) + percentage = int(progress * 100) + print(f"\r{Colors.CYAN}Rendering: {Colors.RESET}[ {bar} {Colors.CYAN}] {percentage}%{Colors.RESET}", end="", flush=True) + + except KeyboardInterrupt: + print(f"\n\n{Colors.YELLOW}⏹ Rendering interrupted{Colors.RESET}") + + print(f"\n{Colors.GREEN}✓ Rendering complete{Colors.RESET}") + AudioPlayer._play_wav_file(all_samples, sample_rate) + + @staticmethod + def _play_wav_file(samples: Union[Any, List[int]], sample_rate: int) -> None: + with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_file: + temp_filename = temp_file.name + + if isinstance(samples, list): + frames = b''.join(struct.pack(' None: + print(f""" +{Colors.MAGENTA}██████╗ ██╗ ██╗████████╗███████╗██████╗ ███████╗ █████╗ ████████╗ +██╔══██╗╚██╗ ██╔╝╚══██╔══╝██╔════╝██╔══██╗██╔════╝██╔══██╗╚══██╔══╝ +██████╔╝ ╚████╔╝ ██║ █████╗ ██████╔╝█████╗ ███████║ ██║ +██╔══██╗ ╚██╔╝ ██║ ██╔══╝ ██╔══██╗██╔══╝ ██╔══██║ ██║ +██████╔╝ ██║ ██║ ███████╗██████╔╝███████╗██║ ██║ ██║ +╚═════╝ ╚═╝ ╚═╝ ╚══════╝╚═════╝ ╚══════╝╚═╝ ╚═╝ ╚═╝ +{Colors.YELLOW} ╔═╗╦ ╔═╗╦ ╦╔═╗╦═╗ + ╠═╝║ ╠═╣╚╦╝║╣ ╠╦╝ + ╩ ╩═╝╩ ╩ ╩ ╚═╝╩╚═{Colors.RESET} +""") + + +def print_info(expression: str, sample_rate: int, duration: float, time_offset: int) -> None: + print(f"\n{Colors.BOLD}{Colors.YELLOW}Configuration:{Colors.RESET}") + print(f" {Colors.BOLD}Expression:{Colors.RESET} {Colors.GREEN}{expression}{Colors.RESET}") + print(f" {Colors.BOLD}Sample Rate:{Colors.RESET} {Colors.MAGENTA}{sample_rate}{Colors.RESET} Hz") + print(f" {Colors.BOLD}Duration:{Colors.RESET} {Colors.MAGENTA}{duration:.1f}{Colors.RESET} seconds") + print(f" {Colors.BOLD}Time Offset:{Colors.RESET} {Colors.MAGENTA}{time_offset}{Colors.RESET}") + + status_np = f"{Colors.GREEN}✓{Colors.RESET}" if NUMPY_AVAILABLE else f"{Colors.RED}✗{Colors.RESET}" + status_sd = f"{Colors.GREEN}✓{Colors.RESET}" if SOUNDDEVICE_AVAILABLE else f"{Colors.RED}✗{Colors.RESET}" + + print(f" {Colors.BOLD}Status:{Colors.RESET} NumPy {status_np} Sounddevice {status_sd}") + + +def print_styled_help(): + print(f""" +{Colors.MAGENTA}{Colors.BOLD}BYTEBEAT PLAYER{Colors.RESET} - 8-bit audio from math + +{Colors.BOLD}{Colors.YELLOW}USAGE:{Colors.RESET} + python bytebeat_play.py {Colors.GREEN}{Colors.RESET} [{Colors.CYAN}options{Colors.RESET}] + +{Colors.BOLD}{Colors.YELLOW}ARGUMENTS:{Colors.RESET} + {Colors.GREEN}{Colors.RESET} Path to bytebeat expression file + +{Colors.BOLD}{Colors.YELLOW}OPTIONS:{Colors.RESET} + {Colors.CYAN}--duration{Colors.RESET} {Colors.DIM}{Colors.RESET} Duration in seconds (default: 60.0) + {Colors.CYAN}--sr{Colors.RESET} {Colors.DIM}{Colors.RESET} Sample rate in Hz (default: 8000) + {Colors.CYAN}--tstart{Colors.RESET} {Colors.DIM}{Colors.RESET} Time offset (default: 0) + {Colors.CYAN}-h, --help{Colors.RESET} Show this help message + +{Colors.BOLD}{Colors.YELLOW}EXAMPLES:{Colors.RESET} + python bytebeat_play.py {Colors.GREEN}song.byteb{Colors.RESET} + python bytebeat_play.py {Colors.GREEN}song.byteb{Colors.RESET} {Colors.CYAN}--duration 30{Colors.RESET} + python bytebeat_play.py {Colors.GREEN}song.byteb{Colors.RESET} {Colors.CYAN}--sr 16000{Colors.RESET} + +{Colors.BOLD}{Colors.YELLOW}EXPRESSION EXAMPLE:{Colors.RESET} + {Colors.MAGENTA}t*(t>>8|t>>9)&46&t>>8{Colors.RESET} +""") + + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Bytebeat Player", + formatter_class=argparse.RawDescriptionHelpFormatter, + add_help=False + ) + + parser.add_argument('-h', '--help', action='store_true', help='Show help message') + parser.add_argument('file', nargs='?', help='Path to bytebeat expression file') + parser.add_argument('--duration', type=float, default=60.0, help='Duration in seconds') + parser.add_argument('--sr', type=int, default=8000, help='Sample rate in Hz') + parser.add_argument('--tstart', type=int, default=0, help='Starting time offset') + + args = parser.parse_args() + + if args.help or not args.file: + print_styled_help() + sys.exit(0) + + print_banner() + try: - for pr,ch in _g(f,d,sr,t0): - bar="█"*int(40*pr)+"-"*int(40*(1-pr)) - print(f"\r{_C['c']}[ {bar} ] {int(pr*100)}%{_C['r']}",end="",flush=True) - st.write(ch.astype('float32')/32768.0) - except KeyboardInterrupt: - print(f"\n{_C['R']}⏹ Interrupted by user!{_C['r']}") - finally: - st.stop(); st.close(); print(f"{_C['g']}Done.{_C['r']}") -def _w(samps,sr): - with tempfile.NamedTemporaryFile(suffix='.wav',delete=False) as t: fn=t.name - frames = samps.tobytes() if not isinstance(samps,list) else b''.join(struct.pack('