#!/usr/bin/env python3 """ DJ Teaser Builder v2 (local, offline-friendly) - Scans folder for audio tracks (max N) - Select tracks by index/range or auto-select "bestof" - Finds highlight segments (energy + onset) - Snaps to bar grid (DJ phrasing) + optional pre-roll - Orders clips to minimize tempo jumps + ramp energy - Renders clips + acrossfades via FFmpeg - Exports WAV + MP3 - Writes JSON report - Optional: generate README + promo text via Ollama Requirements: ffmpeg in PATH pip install numpy librosa soundfile requests Examples: python dj_teaser_v2.py --tracks-dir ./tracks --select all --mode rollcall --teaser 60 --bars 2 python dj_teaser_v2.py --tracks-dir ./tracks --select 1-4,7,9 --teaser 60 --bars 2 python dj_teaser_v2.py --tracks-dir ./tracks --select auto --auto-n 8 --mode bestof --teaser 75 --bars 4 python dj_teaser_v2.py --tracks-dir ./tracks --select auto --auto-n 8 --ollama http://192.168.2.60:11434 --gen-readme """ import argparse import json import math import shutil import subprocess from dataclasses import dataclass from pathlib import Path from typing import List, Tuple, Optional, Dict import numpy as np import librosa try: import requests except Exception: requests = None # Ollama is optional AUDIO_EXTS = {".wav", ".mp3", ".flac", ".m4a", ".aiff", ".aac", ".ogg", ".opus"} @dataclass class TrackInfo: path: Path index_in_folder: int # 1-based duration_s: float tempo_bpm: float energy_score: float # overall (for ranking) highlight_score: float approx_start_s: float snapped_start_s: float clip_dur_s: float def run(cmd: List[str]) -> None: p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if p.returncode != 0: raise RuntimeError(f"Command failed:\n{' '.join(cmd)}\n\nSTDERR:\n{p.stderr}") def ensure_ffmpeg() -> None: if shutil.which("ffmpeg") is None: raise RuntimeError("ffmpeg not found in PATH. Install ffmpeg and try again.") def list_tracks(tracks_dir: Path, max_tracks: int) -> List[Path]: files = [p for p in sorted(tracks_dir.iterdir()) if p.is_file() and p.suffix.lower() in AUDIO_EXTS] return files[:max_tracks] def parse_selection(selection: str, num_tracks: int) -> List[int]: """ Returns 0-based indices. selection examples: "all" "1,2,3,7" "1-4,7,9-10" "auto" """ s = selection.strip().lower() if s in {"all", "auto"}: return list(range(num_tracks)) parts = [p.strip() for p in s.split(",") if p.strip()] out: List[int] = [] for part in parts: if "-" in part: a, b = part.split("-", 1) a_i = int(a) - 1 b_i = int(b) - 1 if a_i > b_i: a_i, b_i = b_i, a_i out.extend(list(range(a_i, b_i + 1))) else: out.append(int(part) - 1) seen = set() filtered = [] for i in out: if 0 <= i < num_tracks and i not in seen: seen.add(i) filtered.append(i) if not filtered: raise ValueError("Selection resulted in an empty track list. Check --select.") return filtered def ffmpeg_to_wav(in_path: Path, out_wav: Path, sr: int) -> None: out_wav.parent.mkdir(parents=True, exist_ok=True) run([ "ffmpeg", "-y", "-i", str(in_path), "-vn", "-ac", "2", "-ar", str(sr), "-f", "wav", str(out_wav), ]) def zscore(x: np.ndarray) -> np.ndarray: x = np.asarray(x, dtype=np.float32) mu = float(np.mean(x)) sd = float(np.std(x) + 1e-9) return (x - mu) / sd def compute_metrics(y: np.ndarray, sr: int, hop_length: int) -> Dict[str, np.ndarray]: rms = librosa.feature.rms(y=y, frame_length=2048, hop_length=hop_length)[0] onset = librosa.onset.onset_strength(y=y, sr=sr, hop_length=hop_length) n = min(len(rms), len(onset)) rms, onset = rms[:n], onset[:n] # clip negatives, normalize score = 0.35 * zscore(rms) + 0.65 * zscore(onset) score = np.maximum(score, 0.0) return { "rms": rms, "onset": onset, "score": score } def pick_highlight_start( score: np.ndarray, sr: int, hop_length: int, clip_s: float, avoid_intro_s: float, avoid_outro_s: float, duration_s: float ) -> Tuple[float, float]: """ Sliding window max over score. Returns (approx_start_s, highlight_score_sum). """ if duration_s <= (avoid_intro_s + avoid_outro_s + clip_s + 1.0): return max(0.0, (duration_s - clip_s) / 2.0), float(np.sum(score)) n = len(score) clip_frames = max(1, int(round((clip_s * sr) / hop_length))) t_seconds = (np.arange(n) * hop_length) / sr valid = (t_seconds >= avoid_intro_s) & (t_seconds <= (duration_s - avoid_outro_s - clip_s)) valid_idxs = np.where(valid)[0] if len(valid_idxs) == 0: return max(0.0, (duration_s - clip_s) / 2.0), float(np.sum(score)) window = np.ones(clip_frames, dtype=np.float32) summed = np.convolve(score, window, mode="same") best_idx = int(valid_idxs[np.argmax(summed[valid_idxs])]) center_t = float(t_seconds[best_idx]) start_t = center_t - (clip_s / 2.0) start_t = float(max(avoid_intro_s, min(start_t, duration_s - avoid_outro_s - clip_s))) return start_t, float(summed[best_idx]) def snap_to_bar_grid(y: np.ndarray, sr: int, approx_start: float, bars: int, beats_per_bar: int) -> Tuple[float, float, Optional[np.ndarray]]: """ Snap start to nearest bar grid based on beat tracking. Returns (snapped_start_s, tempo_bpm, beat_times or None). """ try: tempo, beat_frames = librosa.beat.beat_track(y=y, sr=sr) tempo = float(tempo) if beat_frames is None or len(beat_frames) < 8: return approx_start, tempo, None beat_times = librosa.frames_to_time(beat_frames, sr=sr) i = int(np.argmin(np.abs(beat_times - approx_start))) grid = max(1, bars * beats_per_bar) # beats per chunk snapped_i = int(round(i / grid) * grid) snapped_i = max(0, min(snapped_i, len(beat_times) - 1)) snapped_t = float(beat_times[snapped_i]) if abs(snapped_t - approx_start) <= 2.0: return snapped_t, tempo, beat_times return approx_start, tempo, beat_times except Exception: return approx_start, 0.0, None def bars_to_seconds(tempo_bpm: float, bars: int, beats_per_bar: int) -> float: beats = bars * beats_per_bar return (60.0 / max(1e-6, tempo_bpm)) * beats def apply_preroll(snapped_start: float, beat_times: Optional[np.ndarray], preroll_bars: int, beats_per_bar: int) -> float: """ Move start earlier by N bars if beat_times available. Otherwise, fallback to seconds guess. """ if preroll_bars <= 0: return snapped_start if beat_times is None or len(beat_times) < (preroll_bars * beats_per_bar + 2): # fallback: 1 bar ~ 2 sec at 120 bpm; safe-ish return max(0.0, snapped_start - preroll_bars * 2.0) # Find nearest beat index to snapped_start i = int(np.argmin(np.abs(beat_times - snapped_start))) back_beats = preroll_bars * beats_per_bar j = max(0, i - back_beats) return float(beat_times[j]) def render_clip(in_wav: Path, out_path: Path, start: float, dur: float, fade_s: float, target_lufs: float) -> None: out_path.parent.mkdir(parents=True, exist_ok=True) af = ( f"atrim=start={start}:duration={dur}," f"afade=t=in:st=0:d={fade_s}," f"afade=t=out:st={max(0.0, dur - fade_s)}:d={fade_s}," f"loudnorm=I={target_lufs}:TP=-1.5:LRA=11" ) run(["ffmpeg", "-y", "-i", str(in_wav), "-vn", "-af", af, str(out_path)]) def build_acrossfade_chain(clips: List[Path], out_wav: Path, crossfade_s: float) -> None: if len(clips) == 1: shutil.copyfile(clips[0], out_wav) return cmd = ["ffmpeg", "-y"] for c in clips: cmd += ["-i", str(c)] filter_parts = [] last = "[0:a]" for i in range(1, len(clips)): nxt = f"[{i}:a]" out = f"[a{i}]" filter_parts.append(f"{last}{nxt}acrossfade=d={crossfade_s}:c1=tri:c2=tri{out}") last = out cmd += ["-filter_complex", ";".join(filter_parts), "-map", last, str(out_wav)] run(cmd) def export_mp3(in_wav: Path, out_mp3: Path, bitrate: str = "320k") -> None: out_mp3.parent.mkdir(parents=True, exist_ok=True) run(["ffmpeg", "-y", "-i", str(in_wav), "-vn", "-codec:a", "libmp3lame", "-b:a", bitrate, str(out_mp3)]) def order_tracks_dj_style(track_infos: List[TrackInfo], tempo_tolerance: float, prefer_energy_ramp: bool = True) -> List[TrackInfo]: """ DJ ordering heuristic: 1) group by tempo clusters (within tolerance) 2) within each cluster, ramp by energy_score 3) order clusters by median tempo (ascending) and energy (ascending) """ if not track_infos: return [] # cluster by tempo sorted_by_tempo = sorted(track_infos, key=lambda t: (t.tempo_bpm if t.tempo_bpm > 0 else 1e9)) clusters: List[List[TrackInfo]] = [] for t in sorted_by_tempo: placed = False for c in clusters: # compare to cluster median tempo tempos = [x.tempo_bpm for x in c if x.tempo_bpm > 0] med = float(np.median(tempos)) if tempos else t.tempo_bpm if t.tempo_bpm > 0 and abs(t.tempo_bpm - med) <= tempo_tolerance: c.append(t) placed = True break if not placed: clusters.append([t]) # sort within cluster for c in clusters: if prefer_energy_ramp: c.sort(key=lambda x: x.energy_score) else: c.sort(key=lambda x: x.highlight_score, reverse=True) # sort clusters by (median tempo, median energy) def cluster_key(c: List[TrackInfo]): tempos = [x.tempo_bpm for x in c if x.tempo_bpm > 0] med_t = float(np.median(tempos)) if tempos else 9999.0 med_e = float(np.median([x.energy_score for x in c])) return (med_t, med_e) clusters.sort(key=cluster_key) # flatten ordered = [t for c in clusters for t in c] return ordered def ollama_generate(ollama_url: str, model: str, prompt: str) -> str: if requests is None: raise RuntimeError("requests not installed. Run: pip install requests") url = ollama_url.rstrip("/") + "/api/generate" payload = {"model": model, "prompt": prompt, "stream": False} r = requests.post(url, json=payload, timeout=60) r.raise_for_status() data = r.json() return data.get("response", "").strip() def main(): parser = argparse.ArgumentParser(description="Local DJ Teaser Builder v2 (Python + FFmpeg)") parser.add_argument("--tracks-dir", default="./tracks", help="Folder containing audio tracks") parser.add_argument("--work-dir", default="./work", help="Temp working folder") parser.add_argument("--out-dir", default="./out", help="Output folder") parser.add_argument("--max-tracks", type=int, default=20, help="Max tracks to scan (default 20)") parser.add_argument("--select", default="all", help='Selection: "all", "1,2,7", "1-4,9", or "auto"') parser.add_argument("--auto-n", type=int, default=8, help="If --select auto: how many tracks to keep (best-of)") parser.add_argument("--mode", choices=["rollcall", "bestof"], default="rollcall", help="Teaser style") parser.add_argument("--teaser", type=float, default=60.0, help="Final teaser length (seconds)") parser.add_argument("--bars", type=int, default=2, help="Bars per clip (DJ phrasing). rollcall=2 typical") parser.add_argument("--bpb", type=int, default=4, help="Beats per bar (4 for trance)") parser.add_argument("--preroll-bars", type=int, default=1, help="Start N bars before highlight (DJ lead-in)") parser.add_argument("--crossfade", type=float, default=0.25, help="Acrossfade duration seconds") parser.add_argument("--fade", type=float, default=0.08, help="Fade in/out per clip seconds") parser.add_argument("--avoid-intro", type=float, default=30.0, help="Skip intro when searching highlights") parser.add_argument("--avoid-outro", type=float, default=20.0, help="Skip outro when searching highlights") parser.add_argument("--tempo-tol", type=float, default=4.0, help="Tempo clustering tolerance (BPM)") parser.add_argument("--target-lufs", type=float, default=-14.0, help="Loudness target LUFS (approx)") parser.add_argument("--output-wav", default="album_teaser.wav", help="Output teaser WAV filename") parser.add_argument("--output-mp3", default="album_teaser.mp3", help="Output teaser MP3 filename") parser.add_argument("--mp3-bitrate", default="320k", help="MP3 bitrate (e.g. 192k, 320k)") # Ollama (optional) parser.add_argument("--ollama", default="", help="Ollama base URL (e.g. http://192.168.2.60:11434)") parser.add_argument("--ollama-model", default="qwen2.5:latest", help="Ollama model name") parser.add_argument("--gen-readme", action="store_true", help="Generate README + promo text using Ollama") args = parser.parse_args() ensure_ffmpeg() tracks_dir = Path(args.tracks_dir) work_dir = Path(args.work_dir) out_dir = Path(args.out_dir) work_dir.mkdir(parents=True, exist_ok=True) out_dir.mkdir(parents=True, exist_ok=True) tracks = list_tracks(tracks_dir, args.max_tracks) if not tracks: raise SystemExit(f"No audio tracks found in: {tracks_dir.resolve()}") print("\nDiscovered tracks:") for i, t in enumerate(tracks, start=1): print(f" {i:02d}. {t.name}") selected_idxs = parse_selection(args.select, len(tracks)) selected_tracks = [tracks[i] for i in selected_idxs] # math: avg duration per clip given acrossfades n = len(selected_tracks) teaser_s = float(args.teaser) cf = float(args.crossfade) avg_dur = (teaser_s + (n - 1) * cf) / max(1, n) # Analyze each selected track into TrackInfo infos: List[TrackInfo] = [] for local_idx, track in enumerate(selected_tracks, start=1): tmp_wav = work_dir / f"track_{local_idx:02d}.wav" ffmpeg_to_wav(track, tmp_wav, sr=22050) y, sr = librosa.load(tmp_wav, sr=22050, mono=True) duration_s = float(len(y) / sr) m = compute_metrics(y, sr, hop_length=512) score = m["score"] # overall energy score: mean of top percentile of score (robust) top = np.quantile(score, 0.90) if len(score) else 0.0 energy_score = float(np.mean(score[score >= top])) if np.any(score >= top) else float(np.mean(score) if len(score) else 0.0) # choose a search window size (not necessarily final dur): use avg_dur-ish but safe search_clip = float(np.clip(avg_dur, 4.0, 10.0)) approx_start, highlight_score = pick_highlight_start( score=score, sr=sr, hop_length=512, clip_s=search_clip, avoid_intro_s=float(args.avoid_intro), avoid_outro_s=float(args.avoid_outro), duration_s=duration_s ) snapped_start, tempo, beat_times = snap_to_bar_grid( y=y, sr=sr, approx_start=approx_start, bars=int(args.bars), beats_per_bar=int(args.bpb) ) # apply preroll bars (DJ lead-in) snapped_start = apply_preroll(snapped_start, beat_times, int(args.preroll_bars), int(args.bpb)) # duration based on bars + tempo, clamped to avg_dur if tempo and tempo > 1.0: dur = bars_to_seconds(tempo, int(args.bars), int(args.bpb)) else: dur = avg_dur dur = float(np.clip(dur, 2.5, avg_dur)) infos.append(TrackInfo( path=track, index_in_folder=int(selected_idxs[local_idx - 1] + 1), duration_s=duration_s, tempo_bpm=float(tempo), energy_score=energy_score, highlight_score=float(highlight_score), approx_start_s=float(approx_start), snapped_start_s=float(snapped_start), clip_dur_s=float(dur), )) # Auto best-of selection (if requested) if args.select.strip().lower() == "auto": auto_n = int(max(1, min(args.auto_n, len(infos)))) # rank by highlight_score primarily, then energy_score infos_sorted = sorted(infos, key=lambda t: (t.highlight_score, t.energy_score), reverse=True) infos = infos_sorted[:auto_n] print(f"\nAuto-selected best-of: {auto_n} tracks (ranked by highlight score).") # DJ ordering ordered = order_tracks_dj_style(infos, tempo_tolerance=float(args.tempo_tol), prefer_energy_ramp=True) print("\nFinal clip order:") for i, t in enumerate(ordered, start=1): print(f" {i:02d}. [{t.tempo_bpm:.1f} BPM] (E={t.energy_score:.3f}) {t.path.name}") # Render clips clip_paths: List[Path] = [] report_tracks = [] for i, t in enumerate(ordered, start=1): tmp_wav = work_dir / f"track_{i:02d}.wav" ffmpeg_to_wav(t.path, tmp_wav, sr=22050) clip_out = work_dir / f"clip_{i:02d}.wav" render_clip( in_wav=tmp_wav, out_path=clip_out, start=t.snapped_start_s, dur=t.clip_dur_s, fade_s=float(args.fade), target_lufs=float(args.target_lufs) ) clip_paths.append(clip_out) report_tracks.append({ "folder_index": t.index_in_folder, "filename": t.path.name, "tempo_bpm_est": round(t.tempo_bpm, 2), "energy_score": round(t.energy_score, 6), "highlight_score": round(t.highlight_score, 6), "approx_start_seconds": round(t.approx_start_s, 3), "snapped_start_seconds": round(t.snapped_start_s, 3), "clip_duration_seconds": round(t.clip_dur_s, 3), }) # Build teaser WAV then MP3 out_wav = out_dir / args.output_wav out_mp3 = out_dir / args.output_mp3 build_acrossfade_chain(clip_paths, out_wav, crossfade_s=float(args.crossfade)) export_mp3(out_wav, out_mp3, bitrate=str(args.mp3_bitrate)) report = { "version": "v2", "inputs": { "tracks_dir": str(tracks_dir.resolve()), "select": args.select, "auto_n": int(args.auto_n), "mode": args.mode, }, "settings": { "teaser_seconds": teaser_s, "bars": int(args.bars), "beats_per_bar": int(args.bpb), "preroll_bars": int(args.preroll_bars), "crossfade_seconds": float(args.crossfade), "fade_seconds": float(args.fade), "avoid_intro_seconds": float(args.avoid_intro), "avoid_outro_seconds": float(args.avoid_outro), "tempo_tolerance_bpm": float(args.tempo_tol), "target_lufs": float(args.target_lufs), "mp3_bitrate": str(args.mp3_bitrate), }, "outputs": { "wav": str(out_wav.resolve()), "mp3": str(out_mp3.resolve()), }, "tracks": report_tracks } report_path = out_dir / "teaser_report.json" with open(report_path, "w", encoding="utf-8") as f: json.dump(report, f, ensure_ascii=False, indent=2) print(f"\nāœ… Teaser WAV: {out_wav.resolve()}") print(f"āœ… Teaser MP3: {out_mp3.resolve()}") print(f"šŸ“ Report: {report_path.resolve()}") # Optional: generate README / promo text via Ollama if args.gen_readme: if not args.ollama: raise SystemExit("--gen-readme requires --ollama http://host:11434") prompt = ( "You are helping with a GitHub repo for a local DJ teaser builder.\n" "Write a concise README in English with:\n" "- What it does\n- Requirements\n- Install\n- Usage examples\n- Tips for old-school trance / DJ phrasing\n" "Also write a short promo text (YouTube/Instagram) for an album teaser.\n\n" f"Settings:\n{json.dumps(report['settings'], indent=2)}\n\n" f"Tracks (order):\n{json.dumps(report_tracks, indent=2)}\n" ) text = ollama_generate(args.ollama, args.ollama_model, prompt) readme_path = out_dir / "README_generated.md" with open(readme_path, "w", encoding="utf-8") as f: f.write(text + "\n") print(f"🧠 Ollama README generated: {readme_path.resolve()}") if __name__ == "__main__": main()