#!/usr/bin/env python3 """ DJ Teaser Builder v3 (local, offline-friendly) Adds: - Key detection (Krumhansl-Schmuckler on chroma) + Camelot mapping - Harmonic ordering (Camelot adjacent keys) + tempo clustering + energy ramp - Downbeat-ish snap (bar start scoring) on top of beat grid - 2-pass EBU R128 loudnorm per clip for consistent loudness - Exports WAV + MP3 + report JSON Requirements: - ffmpeg in PATH - pip install numpy librosa soundfile requests (requests only needed if you use Ollama) Examples: python dj_teaser_v3.py --tracks-dir ./tracks --select all --teaser 60 --bars 2 --preroll-bars 1 python dj_teaser_v3.py --tracks-dir ./tracks --select auto --auto-n 8 --teaser 75 --bars 4 --harmonic """ import argparse import json import math import shutil import subprocess from dataclasses import dataclass from pathlib import Path from typing import List, Tuple, Optional, Dict import numpy as np import librosa AUDIO_EXTS = {".wav", ".mp3", ".flac", ".m4a", ".aiff", ".aac", ".ogg", ".opus"} # --------------------------- # Key profiles (Krumhansl) # --------------------------- KRUMHANSL_MAJOR = np.array([6.35, 2.23, 3.48, 2.33, 4.38, 4.09, 2.52, 5.19, 2.39, 3.66, 2.29, 2.88], dtype=np.float32) KRUMHANSL_MINOR = np.array([6.33, 2.68, 3.52, 5.38, 2.60, 3.53, 2.54, 4.75, 3.98, 2.69, 3.34, 3.17], dtype=np.float32) PITCHES = ["C", "C#", "D", "D#", "E", "F", "F#", "G", "G#", "A", "A#", "B"] # Camelot mappings (simplified) # We map major keys to "B" and minor keys to "A" numbers. # Common Camelot wheel: # 8B = C Major, 5A = C Minor, etc. # We'll use a standard mapping table for pitch class -> camelot number. CAMELOT_MAJOR = {"C": "8B", "G": "9B", "D": "10B", "A": "11B", "E": "12B", "B": "1B", "F#": "2B", "C#": "3B", "G#": "4B", "D#": "5B", "A#": "6B", "F": "7B"} CAMELOT_MINOR = {"A": "8A", "E": "9A", "B": "10A", "F#": "11A", "C#": "12A", "G#": "1A", "D#": "2A", "A#": "3A", "F": "4A", "C": "5A", "G": "6A", "D": "7A"} def run(cmd: List[str]) -> Tuple[str, str]: p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if p.returncode != 0: raise RuntimeError(f"Command failed:\n{' '.join(cmd)}\n\nSTDERR:\n{p.stderr}") return p.stdout, p.stderr def ensure_ffmpeg() -> None: if shutil.which("ffmpeg") is None: raise RuntimeError("ffmpeg not found in PATH. Install ffmpeg and try again.") def list_tracks(tracks_dir: Path, max_tracks: int) -> List[Path]: files = [p for p in sorted(tracks_dir.iterdir()) if p.is_file() and p.suffix.lower() in AUDIO_EXTS] return files[:max_tracks] def parse_selection(selection: str, num_tracks: int) -> List[int]: s = selection.strip().lower() if s in {"all", "auto"}: return list(range(num_tracks)) parts = [p.strip() for p in s.split(",") if p.strip()] out: List[int] = [] for part in parts: if "-" in part: a, b = part.split("-", 1) a_i = int(a) - 1 b_i = int(b) - 1 if a_i > b_i: a_i, b_i = b_i, a_i out.extend(list(range(a_i, b_i + 1))) else: out.append(int(part) - 1) seen = set() filtered = [] for i in out: if 0 <= i < num_tracks and i not in seen: seen.add(i) filtered.append(i) if not filtered: raise ValueError("Selection resulted in an empty track list. Check --select.") return filtered def ffmpeg_to_wav(in_path: Path, out_wav: Path, sr: int) -> None: out_wav.parent.mkdir(parents=True, exist_ok=True) run([ "ffmpeg", "-y", "-i", str(in_path), "-vn", "-ac", "2", "-ar", str(sr), "-f", "wav", str(out_wav), ]) def zscore(x: np.ndarray) -> np.ndarray: x = np.asarray(x, dtype=np.float32) mu = float(np.mean(x)) sd = float(np.std(x) + 1e-9) return (x - mu) / sd @dataclass class TrackInfo: path: Path folder_index: int # 1-based duration_s: float tempo_bpm: float energy_score: float highlight_score: float approx_start_s: float snapped_start_s: float clip_dur_s: float key_name: str camelot: str def compute_score(y: np.ndarray, sr: int, hop_length: int) -> np.ndarray: rms = librosa.feature.rms(y=y, frame_length=2048, hop_length=hop_length)[0] onset = librosa.onset.onset_strength(y=y, sr=sr, hop_length=hop_length) n = min(len(rms), len(onset)) rms, onset = rms[:n], onset[:n] score = 0.35 * zscore(rms) + 0.65 * zscore(onset) return np.maximum(score, 0.0) def pick_highlight_start(score: np.ndarray, sr: int, hop_length: int, clip_s: float, avoid_intro_s: float, avoid_outro_s: float, duration_s: float) -> Tuple[float, float]: if duration_s <= (avoid_intro_s + avoid_outro_s + clip_s + 1.0): return max(0.0, (duration_s - clip_s) / 2.0), float(np.sum(score)) n = len(score) clip_frames = max(1, int(round((clip_s * sr) / hop_length))) t_seconds = (np.arange(n) * hop_length) / sr valid = (t_seconds >= avoid_intro_s) & (t_seconds <= (duration_s - avoid_outro_s - clip_s)) valid_idxs = np.where(valid)[0] if len(valid_idxs) == 0: return max(0.0, (duration_s - clip_s) / 2.0), float(np.sum(score)) window = np.ones(clip_frames, dtype=np.float32) summed = np.convolve(score, window, mode="same") best_idx = int(valid_idxs[np.argmax(summed[valid_idxs])]) center_t = float(t_seconds[best_idx]) start_t = center_t - (clip_s / 2.0) start_t = float(max(avoid_intro_s, min(start_t, duration_s - avoid_outro_s - clip_s))) return start_t, float(summed[best_idx]) def estimate_key(y: np.ndarray, sr: int) -> Tuple[str, str, float]: """ Krumhansl-Schmuckler key estimation using average chroma. Returns (key_name, camelot, confidence) """ # Use harmonic component for more stable key yh = librosa.effects.harmonic(y) chroma = librosa.feature.chroma_cqt(y=yh, sr=sr) chroma_mean = np.mean(chroma, axis=1) chroma_mean /= (np.sum(chroma_mean) + 1e-9) def corr_profile(profile): # rotate profile for each tonic corrs = [] for shift in range(12): prof = np.roll(profile, shift) corrs.append(np.corrcoef(chroma_mean, prof)[0, 1]) return np.array(corrs, dtype=np.float32) major_corr = corr_profile(KRUMHANSL_MAJOR) minor_corr = corr_profile(KRUMHANSL_MINOR) best_major = int(np.argmax(major_corr)) best_minor = int(np.argmax(minor_corr)) maj_val = float(major_corr[best_major]) min_val = float(minor_corr[best_minor]) if maj_val >= min_val: tonic = PITCHES[best_major] key_name = f"{tonic} Major" camelot = CAMELOT_MAJOR.get(tonic, "") conf = maj_val else: tonic = PITCHES[best_minor] key_name = f"{tonic} Minor" camelot = CAMELOT_MINOR.get(tonic, "") conf = min_val if not camelot: camelot = "??" return key_name, camelot, conf def bars_to_seconds(tempo_bpm: float, bars: int, beats_per_bar: int) -> float: beats = bars * beats_per_bar return (60.0 / max(1e-6, tempo_bpm)) * beats def snap_to_downbeat_like(y: np.ndarray, sr: int, approx_start: float, bars: int, beats_per_bar: int, onset_weight: float = 1.0) -> Tuple[float, float, Optional[np.ndarray]]: """ "Downbeat-ish" snap: - get beat_times - build a bar-grid (every beats_per_bar beats) - score each bar start around approx_start by local onset strength - pick best bar start near approx_start Returns (snapped_start, tempo, beat_times) """ try: tempo, beat_frames = librosa.beat.beat_track(y=y, sr=sr) tempo = float(tempo) if beat_frames is None or len(beat_frames) < (beats_per_bar * 4): return approx_start, tempo, None beat_times = librosa.frames_to_time(beat_frames, sr=sr) # onset envelope for scoring onset_env = librosa.onset.onset_strength(y=y, sr=sr) onset_t = librosa.times_like(onset_env, sr=sr) # candidate bar starts near approx_start # bar start indices in beat grid bar_stride = beats_per_bar bar_idxs = np.arange(0, len(beat_times), bar_stride) # focus region +/- 8 seconds around approx_start region = [] for bi in bar_idxs: t0 = float(beat_times[bi]) if abs(t0 - approx_start) <= 8.0: region.append(bi) if not region: # fallback: nearest bar nearest = int(bar_idxs[np.argmin(np.abs(beat_times[bar_idxs] - approx_start))]) return float(beat_times[nearest]), tempo, beat_times # score each candidate bar start by onset energy in small window after it best_bi = region[0] best_val = -1.0 for bi in region: t0 = float(beat_times[bi]) # window: first ~0.35s after bar start (kick/transient) mask = (onset_t >= t0) & (onset_t <= (t0 + 0.35)) val = float(np.mean(onset_env[mask])) if np.any(mask) else 0.0 # also prefer closeness to approx_start closeness = 1.0 - min(1.0, abs(t0 - approx_start) / 8.0) val = onset_weight * val + 0.25 * closeness if val > best_val: best_val = val best_bi = bi snapped = float(beat_times[best_bi]) # additionally snap to bar-grid chunk size (bars) for phrase alignment # i.e. every (bars * beats_per_bar) beats chunk = max(1, bars * beats_per_bar) # convert best_bi (beat index) into chunk-aligned beat index chunk_bi = int(round(best_bi / chunk) * chunk) chunk_bi = max(0, min(chunk_bi, len(beat_times) - 1)) snapped2 = float(beat_times[chunk_bi]) # keep in sane range if abs(snapped2 - approx_start) <= 2.5: return snapped2, tempo, beat_times return snapped, tempo, beat_times except Exception: return approx_start, 0.0, None def apply_preroll(snapped_start: float, beat_times: Optional[np.ndarray], preroll_bars: int, beats_per_bar: int) -> float: if preroll_bars <= 0: return snapped_start if beat_times is None or len(beat_times) < (preroll_bars * beats_per_bar + 2): return max(0.0, snapped_start - preroll_bars * 2.0) i = int(np.argmin(np.abs(beat_times - snapped_start))) back_beats = preroll_bars * beats_per_bar j = max(0, i - back_beats) return float(beat_times[j]) # --------------------------- # 2-pass loudnorm helpers # --------------------------- def loudnorm_2pass_filter(infile: Path, start: float, dur: float, fade_s: float, target_lufs: float) -> str: """ Build a 2-pass loudnorm filter for a trimmed segment. Pass1: measure JSON from ffmpeg stderr Pass2: apply measured params """ # pass1 measure pre = f"atrim=start={start}:duration={dur},afade=t=in:st=0:d={fade_s},afade=t=out:st={max(0.0, dur - fade_s)}:d={fade_s}" measure = f"{pre},loudnorm=I={target_lufs}:TP=-1.5:LRA=11:print_format=json" _, err = run(["ffmpeg", "-y", "-i", str(infile), "-vn", "-af", measure, "-f", "null", "-"]) # extract the last JSON object from stderr jtxt = err[err.rfind("{") : err.rfind("}") + 1] data = json.loads(jtxt) # pass2 apply # Use measured values applied = ( f"{pre},loudnorm=I={target_lufs}:TP=-1.5:LRA=11:" f"measured_I={data['input_i']}:measured_TP={data['input_tp']}:measured_LRA={data['input_lra']}:" f"measured_thresh={data['input_thresh']}:offset={data['target_offset']}:linear=true:print_format=summary" ) return applied def render_clip_2pass(in_wav: Path, out_path: Path, start: float, dur: float, fade_s: float, target_lufs: float) -> None: out_path.parent.mkdir(parents=True, exist_ok=True) af2 = loudnorm_2pass_filter(in_wav, start, dur, fade_s, target_lufs) run(["ffmpeg", "-y", "-i", str(in_wav), "-vn", "-af", af2, str(out_path)]) def build_acrossfade_chain(clips: List[Path], out_wav: Path, crossfade_s: float) -> None: if len(clips) == 1: shutil.copyfile(clips[0], out_wav) return cmd = ["ffmpeg", "-y"] for c in clips: cmd += ["-i", str(c)] filter_parts = [] last = "[0:a]" for i in range(1, len(clips)): nxt = f"[{i}:a]" out = f"[a{i}]" filter_parts.append(f"{last}{nxt}acrossfade=d={crossfade_s}:c1=tri:c2=tri{out}") last = out cmd += ["-filter_complex", ";".join(filter_parts), "-map", last, str(out_wav)] run(cmd) def export_mp3(in_wav: Path, out_mp3: Path, bitrate: str) -> None: out_mp3.parent.mkdir(parents=True, exist_ok=True) run(["ffmpeg", "-y", "-i", str(in_wav), "-vn", "-codec:a", "libmp3lame", "-b:a", bitrate, str(out_mp3)]) # --------------------------- # Harmonic / DJ ordering # --------------------------- def camelot_neighbors(c: str) -> List[str]: """ Camelot adjacency: same number A<->B, +/-1 same letter. Example: 8A neighbors -> 8B, 7A, 9A """ if len(c) != 2 and len(c) != 3: return [] # handle 10A/11B/12A num = int(c[:-1]) letter = c[-1].upper() def wrap(n): return 12 if n == 0 else (1 if n == 13 else n) neigh = [] neigh.append(f"{num}{'A' if letter=='B' else 'B'}") neigh.append(f"{wrap(num-1)}{letter}") neigh.append(f"{wrap(num+1)}{letter}") return neigh def harmonic_path_order(infos: List[TrackInfo]) -> List[TrackInfo]: """ Greedy harmonic chaining: start from a low-energy track, then pick next that is Camelot-neighbor if possible, otherwise fall back to closest tempo + energy. """ if not infos: return [] remaining = infos[:] remaining.sort(key=lambda t: t.energy_score) # start calm ordered = [remaining.pop(0)] while remaining: cur = ordered[-1] neigh = set(camelot_neighbors(cur.camelot)) # prefer harmonic neighbors candidates = [t for t in remaining if t.camelot in neigh] if not candidates: candidates = remaining # pick best candidate by (tempo closeness, energy slightly higher) def keyfn(t: TrackInfo): tempo_pen = abs((t.tempo_bpm or 0) - (cur.tempo_bpm or 0)) energy_pen = max(0.0, cur.energy_score - t.energy_score) # prefer rising energy return (tempo_pen, energy_pen, -t.energy_score) pick = min(candidates, key=keyfn) remaining.remove(pick) ordered.append(pick) return ordered def tempo_cluster_energy_ramp(infos: List[TrackInfo], tempo_tol: float) -> List[TrackInfo]: infos_sorted = sorted(infos, key=lambda t: (t.tempo_bpm if t.tempo_bpm > 0 else 1e9)) clusters: List[List[TrackInfo]] = [] for t in infos_sorted: placed = False for c in clusters: tempos = [x.tempo_bpm for x in c if x.tempo_bpm > 0] med = float(np.median(tempos)) if tempos else t.tempo_bpm if t.tempo_bpm > 0 and abs(t.tempo_bpm - med) <= tempo_tol: c.append(t) placed = True break if not placed: clusters.append([t]) for c in clusters: c.sort(key=lambda x: x.energy_score) def ckey(c): tempos = [x.tempo_bpm for x in c if x.tempo_bpm > 0] med_t = float(np.median(tempos)) if tempos else 9999.0 med_e = float(np.median([x.energy_score for x in c])) return (med_t, med_e) clusters.sort(key=ckey) return [t for c in clusters for t in c] def main(): parser = argparse.ArgumentParser(description="Local DJ Teaser Builder v3") parser.add_argument("--tracks-dir", default="./tracks") parser.add_argument("--work-dir", default="./work") parser.add_argument("--out-dir", default="./out") parser.add_argument("--max-tracks", type=int, default=20) parser.add_argument("--select", default="all", help='all | auto | "1,2,7" | "1-4,9"') parser.add_argument("--auto-n", type=int, default=8, help="when --select auto: keep N best tracks") parser.add_argument("--teaser", type=float, default=60.0) parser.add_argument("--bars", type=int, default=2) parser.add_argument("--bpb", type=int, default=4) parser.add_argument("--preroll-bars", type=int, default=1) parser.add_argument("--crossfade", type=float, default=0.25) parser.add_argument("--fade", type=float, default=0.08) parser.add_argument("--avoid-intro", type=float, default=30.0) parser.add_argument("--avoid-outro", type=float, default=20.0) parser.add_argument("--tempo-tol", type=float, default=4.0) parser.add_argument("--target-lufs", type=float, default=-14.0) parser.add_argument("--output-wav", default="album_teaser.wav") parser.add_argument("--output-mp3", default="album_teaser.mp3") parser.add_argument("--mp3-bitrate", default="320k") parser.add_argument("--harmonic", action="store_true", help="Enable Camelot harmonic ordering (recommended for trance)") args = parser.parse_args() ensure_ffmpeg() tracks_dir = Path(args.tracks_dir) work_dir = Path(args.work_dir) out_dir = Path(args.out_dir) work_dir.mkdir(parents=True, exist_ok=True) out_dir.mkdir(parents=True, exist_ok=True) tracks = list_tracks(tracks_dir, args.max_tracks) if not tracks: raise SystemExit(f"No tracks found in {tracks_dir.resolve()}") print("\nDiscovered tracks:") for i, t in enumerate(tracks, start=1): print(f" {i:02d}. {t.name}") selected_idxs = parse_selection(args.select, len(tracks)) selected_tracks = [tracks[i] for i in selected_idxs] n = len(selected_tracks) teaser_s = float(args.teaser) cf = float(args.crossfade) avg_dur = (teaser_s + (n - 1) * cf) / max(1, n) infos: List[TrackInfo] = [] for local_idx, track in enumerate(selected_tracks, start=1): tmp_wav = work_dir / f"src_{local_idx:02d}.wav" ffmpeg_to_wav(track, tmp_wav, sr=22050) y, sr = librosa.load(tmp_wav, sr=22050, mono=True) duration_s = float(len(y) / sr) score = compute_score(y, sr, hop_length=512) # robust energy score q = np.quantile(score, 0.90) if len(score) else 0.0 energy_score = float(np.mean(score[score >= q])) if np.any(score >= q) else float(np.mean(score) if len(score) else 0.0) search_clip = float(np.clip(avg_dur, 4.0, 12.0)) approx_start, highlight_score = pick_highlight_start( score=score, sr=sr, hop_length=512, clip_s=search_clip, avoid_intro_s=float(args.avoid_intro), avoid_outro_s=float(args.avoid_outro), duration_s=duration_s ) snapped_start, tempo, beat_times = snap_to_downbeat_like( y=y, sr=sr, approx_start=approx_start, bars=int(args.bars), beats_per_bar=int(args.bpb) ) snapped_start = apply_preroll(snapped_start, beat_times, int(args.preroll_bars), int(args.bpb)) if tempo and tempo > 1.0: dur = bars_to_seconds(tempo, int(args.bars), int(args.bpb)) else: dur = avg_dur dur = float(np.clip(dur, 2.5, avg_dur)) key_name, camelot, conf = estimate_key(y, sr) infos.append(TrackInfo( path=track, folder_index=int(selected_idxs[local_idx - 1] + 1), duration_s=duration_s, tempo_bpm=float(tempo), energy_score=energy_score, highlight_score=float(highlight_score), approx_start_s=float(approx_start), snapped_start_s=float(snapped_start), clip_dur_s=float(dur), key_name=key_name, camelot=camelot )) # Auto best-of if args.select.strip().lower() == "auto": auto_n = int(max(1, min(args.auto_n, len(infos)))) infos.sort(key=lambda t: (t.highlight_score, t.energy_score), reverse=True) infos = infos[:auto_n] print(f"\nAuto-selected best-of: {auto_n} tracks.") # Ordering if args.harmonic: # harmonic path, but keep tempo smooth-ish by pre-sorting with tempo clusters first pre = tempo_cluster_energy_ramp(infos, tempo_tol=float(args.tempo_tol)) ordered = harmonic_path_order(pre) print("\nOrdering: harmonic (Camelot neighbors) + tempo/energy heuristics") else: ordered = tempo_cluster_energy_ramp(infos, tempo_tol=float(args.tempo_tol)) print("\nOrdering: tempo clustering + energy ramp") print("\nFinal clip order:") for i, t in enumerate(ordered, start=1): print(f" {i:02d}. [{t.tempo_bpm:6.1f} BPM] [{t.camelot:>3}] (E={t.energy_score:.3f}) {t.path.name}") # Render clips (2-pass loudnorm) clip_paths: List[Path] = [] report_tracks = [] for i, t in enumerate(ordered, start=1): src = work_dir / f"ord_{i:02d}.wav" ffmpeg_to_wav(t.path, src, sr=22050) clip_out = work_dir / f"clip_{i:02d}.wav" render_clip_2pass( in_wav=src, out_path=clip_out, start=t.snapped_start_s, dur=t.clip_dur_s, fade_s=float(args.fade), target_lufs=float(args.target_lufs) ) clip_paths.append(clip_out) report_tracks.append({ "folder_index": t.folder_index, "filename": t.path.name, "tempo_bpm_est": round(t.tempo_bpm, 2), "key": t.key_name, "camelot": t.camelot, "energy_score": round(t.energy_score, 6), "highlight_score": round(t.highlight_score, 6), "approx_start_seconds": round(t.approx_start_s, 3), "snapped_start_seconds": round(t.snapped_start_s, 3), "clip_duration_seconds": round(t.clip_dur_s, 3), }) out_wav = out_dir / args.output_wav out_mp3 = out_dir / args.output_mp3 build_acrossfade_chain(clip_paths, out_wav, crossfade_s=float(args.crossfade)) export_mp3(out_wav, out_mp3, bitrate=str(args.mp3_bitrate)) report = { "version": "v3", "settings": { "teaser_seconds": float(args.teaser), "bars": int(args.bars), "beats_per_bar": int(args.bpb), "preroll_bars": int(args.preroll_bars), "harmonic": bool(args.harmonic), "tempo_tolerance_bpm": float(args.tempo_tol), "crossfade_seconds": float(args.crossfade), "fade_seconds": float(args.fade), "avoid_intro_seconds": float(args.avoid_intro), "avoid_outro_seconds": float(args.avoid_outro), "target_lufs": float(args.target_lufs), "mp3_bitrate": str(args.mp3_bitrate), }, "outputs": { "wav": str(out_wav.resolve()), "mp3": str(out_mp3.resolve()), }, "tracks": report_tracks } report_path = out_dir / "teaser_report.json" with open(report_path, "w", encoding="utf-8") as f: json.dump(report, f, ensure_ascii=False, indent=2) print(f"\nāœ… Teaser WAV: {out_wav.resolve()}") print(f"āœ… Teaser MP3: {out_mp3.resolve()}") print(f"šŸ“ Report: {report_path.resolve()}\n") if __name__ == "__main__": main()