Files
DJ_Teaser_Clipper/V_2/dj_teaser.py
Thomas abf2109171 First Upload
All Files
2026-01-29 10:48:02 +01:00

554 lines
20 KiB
Python

#!/usr/bin/env python3
"""
DJ Teaser Builder v2 (local, offline-friendly)
- Scans folder for audio tracks (max N)
- Select tracks by index/range or auto-select "bestof"
- Finds highlight segments (energy + onset)
- Snaps to bar grid (DJ phrasing) + optional pre-roll
- Orders clips to minimize tempo jumps + ramp energy
- Renders clips + acrossfades via FFmpeg
- Exports WAV + MP3
- Writes JSON report
- Optional: generate README + promo text via Ollama
Requirements:
ffmpeg in PATH
pip install numpy librosa soundfile requests
Examples:
python dj_teaser_v2.py --tracks-dir ./tracks --select all --mode rollcall --teaser 60 --bars 2
python dj_teaser_v2.py --tracks-dir ./tracks --select 1-4,7,9 --teaser 60 --bars 2
python dj_teaser_v2.py --tracks-dir ./tracks --select auto --auto-n 8 --mode bestof --teaser 75 --bars 4
python dj_teaser_v2.py --tracks-dir ./tracks --select auto --auto-n 8 --ollama http://192.168.2.60:11434 --gen-readme
"""
import argparse
import json
import math
import shutil
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import List, Tuple, Optional, Dict
import numpy as np
import librosa
try:
import requests
except Exception:
requests = None # Ollama is optional
AUDIO_EXTS = {".wav", ".mp3", ".flac", ".m4a", ".aiff", ".aac", ".ogg", ".opus"}
@dataclass
class TrackInfo:
path: Path
index_in_folder: int # 1-based
duration_s: float
tempo_bpm: float
energy_score: float # overall (for ranking)
highlight_score: float
approx_start_s: float
snapped_start_s: float
clip_dur_s: float
def run(cmd: List[str]) -> None:
p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if p.returncode != 0:
raise RuntimeError(f"Command failed:\n{' '.join(cmd)}\n\nSTDERR:\n{p.stderr}")
def ensure_ffmpeg() -> None:
if shutil.which("ffmpeg") is None:
raise RuntimeError("ffmpeg not found in PATH. Install ffmpeg and try again.")
def list_tracks(tracks_dir: Path, max_tracks: int) -> List[Path]:
files = [p for p in sorted(tracks_dir.iterdir()) if p.is_file() and p.suffix.lower() in AUDIO_EXTS]
return files[:max_tracks]
def parse_selection(selection: str, num_tracks: int) -> List[int]:
"""
Returns 0-based indices.
selection examples:
"all"
"1,2,3,7"
"1-4,7,9-10"
"auto"
"""
s = selection.strip().lower()
if s in {"all", "auto"}:
return list(range(num_tracks))
parts = [p.strip() for p in s.split(",") if p.strip()]
out: List[int] = []
for part in parts:
if "-" in part:
a, b = part.split("-", 1)
a_i = int(a) - 1
b_i = int(b) - 1
if a_i > b_i:
a_i, b_i = b_i, a_i
out.extend(list(range(a_i, b_i + 1)))
else:
out.append(int(part) - 1)
seen = set()
filtered = []
for i in out:
if 0 <= i < num_tracks and i not in seen:
seen.add(i)
filtered.append(i)
if not filtered:
raise ValueError("Selection resulted in an empty track list. Check --select.")
return filtered
def ffmpeg_to_wav(in_path: Path, out_wav: Path, sr: int) -> None:
out_wav.parent.mkdir(parents=True, exist_ok=True)
run([
"ffmpeg", "-y",
"-i", str(in_path),
"-vn",
"-ac", "2",
"-ar", str(sr),
"-f", "wav",
str(out_wav),
])
def zscore(x: np.ndarray) -> np.ndarray:
x = np.asarray(x, dtype=np.float32)
mu = float(np.mean(x))
sd = float(np.std(x) + 1e-9)
return (x - mu) / sd
def compute_metrics(y: np.ndarray, sr: int, hop_length: int) -> Dict[str, np.ndarray]:
rms = librosa.feature.rms(y=y, frame_length=2048, hop_length=hop_length)[0]
onset = librosa.onset.onset_strength(y=y, sr=sr, hop_length=hop_length)
n = min(len(rms), len(onset))
rms, onset = rms[:n], onset[:n]
# clip negatives, normalize
score = 0.35 * zscore(rms) + 0.65 * zscore(onset)
score = np.maximum(score, 0.0)
return {
"rms": rms,
"onset": onset,
"score": score
}
def pick_highlight_start(
score: np.ndarray,
sr: int,
hop_length: int,
clip_s: float,
avoid_intro_s: float,
avoid_outro_s: float,
duration_s: float
) -> Tuple[float, float]:
"""
Sliding window max over score.
Returns (approx_start_s, highlight_score_sum).
"""
if duration_s <= (avoid_intro_s + avoid_outro_s + clip_s + 1.0):
return max(0.0, (duration_s - clip_s) / 2.0), float(np.sum(score))
n = len(score)
clip_frames = max(1, int(round((clip_s * sr) / hop_length)))
t_seconds = (np.arange(n) * hop_length) / sr
valid = (t_seconds >= avoid_intro_s) & (t_seconds <= (duration_s - avoid_outro_s - clip_s))
valid_idxs = np.where(valid)[0]
if len(valid_idxs) == 0:
return max(0.0, (duration_s - clip_s) / 2.0), float(np.sum(score))
window = np.ones(clip_frames, dtype=np.float32)
summed = np.convolve(score, window, mode="same")
best_idx = int(valid_idxs[np.argmax(summed[valid_idxs])])
center_t = float(t_seconds[best_idx])
start_t = center_t - (clip_s / 2.0)
start_t = float(max(avoid_intro_s, min(start_t, duration_s - avoid_outro_s - clip_s)))
return start_t, float(summed[best_idx])
def snap_to_bar_grid(y: np.ndarray, sr: int, approx_start: float, bars: int, beats_per_bar: int) -> Tuple[float, float, Optional[np.ndarray]]:
"""
Snap start to nearest bar grid based on beat tracking.
Returns (snapped_start_s, tempo_bpm, beat_times or None).
"""
try:
tempo, beat_frames = librosa.beat.beat_track(y=y, sr=sr)
tempo = float(tempo)
if beat_frames is None or len(beat_frames) < 8:
return approx_start, tempo, None
beat_times = librosa.frames_to_time(beat_frames, sr=sr)
i = int(np.argmin(np.abs(beat_times - approx_start)))
grid = max(1, bars * beats_per_bar) # beats per chunk
snapped_i = int(round(i / grid) * grid)
snapped_i = max(0, min(snapped_i, len(beat_times) - 1))
snapped_t = float(beat_times[snapped_i])
if abs(snapped_t - approx_start) <= 2.0:
return snapped_t, tempo, beat_times
return approx_start, tempo, beat_times
except Exception:
return approx_start, 0.0, None
def bars_to_seconds(tempo_bpm: float, bars: int, beats_per_bar: int) -> float:
beats = bars * beats_per_bar
return (60.0 / max(1e-6, tempo_bpm)) * beats
def apply_preroll(snapped_start: float, beat_times: Optional[np.ndarray], preroll_bars: int, beats_per_bar: int) -> float:
"""
Move start earlier by N bars if beat_times available. Otherwise, fallback to seconds guess.
"""
if preroll_bars <= 0:
return snapped_start
if beat_times is None or len(beat_times) < (preroll_bars * beats_per_bar + 2):
# fallback: 1 bar ~ 2 sec at 120 bpm; safe-ish
return max(0.0, snapped_start - preroll_bars * 2.0)
# Find nearest beat index to snapped_start
i = int(np.argmin(np.abs(beat_times - snapped_start)))
back_beats = preroll_bars * beats_per_bar
j = max(0, i - back_beats)
return float(beat_times[j])
def render_clip(in_wav: Path, out_path: Path, start: float, dur: float, fade_s: float, target_lufs: float) -> None:
out_path.parent.mkdir(parents=True, exist_ok=True)
af = (
f"atrim=start={start}:duration={dur},"
f"afade=t=in:st=0:d={fade_s},"
f"afade=t=out:st={max(0.0, dur - fade_s)}:d={fade_s},"
f"loudnorm=I={target_lufs}:TP=-1.5:LRA=11"
)
run(["ffmpeg", "-y", "-i", str(in_wav), "-vn", "-af", af, str(out_path)])
def build_acrossfade_chain(clips: List[Path], out_wav: Path, crossfade_s: float) -> None:
if len(clips) == 1:
shutil.copyfile(clips[0], out_wav)
return
cmd = ["ffmpeg", "-y"]
for c in clips:
cmd += ["-i", str(c)]
filter_parts = []
last = "[0:a]"
for i in range(1, len(clips)):
nxt = f"[{i}:a]"
out = f"[a{i}]"
filter_parts.append(f"{last}{nxt}acrossfade=d={crossfade_s}:c1=tri:c2=tri{out}")
last = out
cmd += ["-filter_complex", ";".join(filter_parts), "-map", last, str(out_wav)]
run(cmd)
def export_mp3(in_wav: Path, out_mp3: Path, bitrate: str = "320k") -> None:
out_mp3.parent.mkdir(parents=True, exist_ok=True)
run(["ffmpeg", "-y", "-i", str(in_wav), "-vn", "-codec:a", "libmp3lame", "-b:a", bitrate, str(out_mp3)])
def order_tracks_dj_style(track_infos: List[TrackInfo], tempo_tolerance: float, prefer_energy_ramp: bool = True) -> List[TrackInfo]:
"""
DJ ordering heuristic:
1) group by tempo clusters (within tolerance)
2) within each cluster, ramp by energy_score
3) order clusters by median tempo (ascending) and energy (ascending)
"""
if not track_infos:
return []
# cluster by tempo
sorted_by_tempo = sorted(track_infos, key=lambda t: (t.tempo_bpm if t.tempo_bpm > 0 else 1e9))
clusters: List[List[TrackInfo]] = []
for t in sorted_by_tempo:
placed = False
for c in clusters:
# compare to cluster median tempo
tempos = [x.tempo_bpm for x in c if x.tempo_bpm > 0]
med = float(np.median(tempos)) if tempos else t.tempo_bpm
if t.tempo_bpm > 0 and abs(t.tempo_bpm - med) <= tempo_tolerance:
c.append(t)
placed = True
break
if not placed:
clusters.append([t])
# sort within cluster
for c in clusters:
if prefer_energy_ramp:
c.sort(key=lambda x: x.energy_score)
else:
c.sort(key=lambda x: x.highlight_score, reverse=True)
# sort clusters by (median tempo, median energy)
def cluster_key(c: List[TrackInfo]):
tempos = [x.tempo_bpm for x in c if x.tempo_bpm > 0]
med_t = float(np.median(tempos)) if tempos else 9999.0
med_e = float(np.median([x.energy_score for x in c]))
return (med_t, med_e)
clusters.sort(key=cluster_key)
# flatten
ordered = [t for c in clusters for t in c]
return ordered
def ollama_generate(ollama_url: str, model: str, prompt: str) -> str:
if requests is None:
raise RuntimeError("requests not installed. Run: pip install requests")
url = ollama_url.rstrip("/") + "/api/generate"
payload = {"model": model, "prompt": prompt, "stream": False}
r = requests.post(url, json=payload, timeout=60)
r.raise_for_status()
data = r.json()
return data.get("response", "").strip()
def main():
parser = argparse.ArgumentParser(description="Local DJ Teaser Builder v2 (Python + FFmpeg)")
parser.add_argument("--tracks-dir", default="./tracks", help="Folder containing audio tracks")
parser.add_argument("--work-dir", default="./work", help="Temp working folder")
parser.add_argument("--out-dir", default="./out", help="Output folder")
parser.add_argument("--max-tracks", type=int, default=20, help="Max tracks to scan (default 20)")
parser.add_argument("--select", default="all", help='Selection: "all", "1,2,7", "1-4,9", or "auto"')
parser.add_argument("--auto-n", type=int, default=8, help="If --select auto: how many tracks to keep (best-of)")
parser.add_argument("--mode", choices=["rollcall", "bestof"], default="rollcall", help="Teaser style")
parser.add_argument("--teaser", type=float, default=60.0, help="Final teaser length (seconds)")
parser.add_argument("--bars", type=int, default=2, help="Bars per clip (DJ phrasing). rollcall=2 typical")
parser.add_argument("--bpb", type=int, default=4, help="Beats per bar (4 for trance)")
parser.add_argument("--preroll-bars", type=int, default=1, help="Start N bars before highlight (DJ lead-in)")
parser.add_argument("--crossfade", type=float, default=0.25, help="Acrossfade duration seconds")
parser.add_argument("--fade", type=float, default=0.08, help="Fade in/out per clip seconds")
parser.add_argument("--avoid-intro", type=float, default=30.0, help="Skip intro when searching highlights")
parser.add_argument("--avoid-outro", type=float, default=20.0, help="Skip outro when searching highlights")
parser.add_argument("--tempo-tol", type=float, default=4.0, help="Tempo clustering tolerance (BPM)")
parser.add_argument("--target-lufs", type=float, default=-14.0, help="Loudness target LUFS (approx)")
parser.add_argument("--output-wav", default="album_teaser.wav", help="Output teaser WAV filename")
parser.add_argument("--output-mp3", default="album_teaser.mp3", help="Output teaser MP3 filename")
parser.add_argument("--mp3-bitrate", default="320k", help="MP3 bitrate (e.g. 192k, 320k)")
# Ollama (optional)
parser.add_argument("--ollama", default="", help="Ollama base URL (e.g. http://192.168.2.60:11434)")
parser.add_argument("--ollama-model", default="qwen2.5:latest", help="Ollama model name")
parser.add_argument("--gen-readme", action="store_true", help="Generate README + promo text using Ollama")
args = parser.parse_args()
ensure_ffmpeg()
tracks_dir = Path(args.tracks_dir)
work_dir = Path(args.work_dir)
out_dir = Path(args.out_dir)
work_dir.mkdir(parents=True, exist_ok=True)
out_dir.mkdir(parents=True, exist_ok=True)
tracks = list_tracks(tracks_dir, args.max_tracks)
if not tracks:
raise SystemExit(f"No audio tracks found in: {tracks_dir.resolve()}")
print("\nDiscovered tracks:")
for i, t in enumerate(tracks, start=1):
print(f" {i:02d}. {t.name}")
selected_idxs = parse_selection(args.select, len(tracks))
selected_tracks = [tracks[i] for i in selected_idxs]
# math: avg duration per clip given acrossfades
n = len(selected_tracks)
teaser_s = float(args.teaser)
cf = float(args.crossfade)
avg_dur = (teaser_s + (n - 1) * cf) / max(1, n)
# Analyze each selected track into TrackInfo
infos: List[TrackInfo] = []
for local_idx, track in enumerate(selected_tracks, start=1):
tmp_wav = work_dir / f"track_{local_idx:02d}.wav"
ffmpeg_to_wav(track, tmp_wav, sr=22050)
y, sr = librosa.load(tmp_wav, sr=22050, mono=True)
duration_s = float(len(y) / sr)
m = compute_metrics(y, sr, hop_length=512)
score = m["score"]
# overall energy score: mean of top percentile of score (robust)
top = np.quantile(score, 0.90) if len(score) else 0.0
energy_score = float(np.mean(score[score >= top])) if np.any(score >= top) else float(np.mean(score) if len(score) else 0.0)
# choose a search window size (not necessarily final dur): use avg_dur-ish but safe
search_clip = float(np.clip(avg_dur, 4.0, 10.0))
approx_start, highlight_score = pick_highlight_start(
score=score,
sr=sr,
hop_length=512,
clip_s=search_clip,
avoid_intro_s=float(args.avoid_intro),
avoid_outro_s=float(args.avoid_outro),
duration_s=duration_s
)
snapped_start, tempo, beat_times = snap_to_bar_grid(
y=y, sr=sr,
approx_start=approx_start,
bars=int(args.bars),
beats_per_bar=int(args.bpb)
)
# apply preroll bars (DJ lead-in)
snapped_start = apply_preroll(snapped_start, beat_times, int(args.preroll_bars), int(args.bpb))
# duration based on bars + tempo, clamped to avg_dur
if tempo and tempo > 1.0:
dur = bars_to_seconds(tempo, int(args.bars), int(args.bpb))
else:
dur = avg_dur
dur = float(np.clip(dur, 2.5, avg_dur))
infos.append(TrackInfo(
path=track,
index_in_folder=int(selected_idxs[local_idx - 1] + 1),
duration_s=duration_s,
tempo_bpm=float(tempo),
energy_score=energy_score,
highlight_score=float(highlight_score),
approx_start_s=float(approx_start),
snapped_start_s=float(snapped_start),
clip_dur_s=float(dur),
))
# Auto best-of selection (if requested)
if args.select.strip().lower() == "auto":
auto_n = int(max(1, min(args.auto_n, len(infos))))
# rank by highlight_score primarily, then energy_score
infos_sorted = sorted(infos, key=lambda t: (t.highlight_score, t.energy_score), reverse=True)
infos = infos_sorted[:auto_n]
print(f"\nAuto-selected best-of: {auto_n} tracks (ranked by highlight score).")
# DJ ordering
ordered = order_tracks_dj_style(infos, tempo_tolerance=float(args.tempo_tol), prefer_energy_ramp=True)
print("\nFinal clip order:")
for i, t in enumerate(ordered, start=1):
print(f" {i:02d}. [{t.tempo_bpm:.1f} BPM] (E={t.energy_score:.3f}) {t.path.name}")
# Render clips
clip_paths: List[Path] = []
report_tracks = []
for i, t in enumerate(ordered, start=1):
tmp_wav = work_dir / f"track_{i:02d}.wav"
ffmpeg_to_wav(t.path, tmp_wav, sr=22050)
clip_out = work_dir / f"clip_{i:02d}.wav"
render_clip(
in_wav=tmp_wav,
out_path=clip_out,
start=t.snapped_start_s,
dur=t.clip_dur_s,
fade_s=float(args.fade),
target_lufs=float(args.target_lufs)
)
clip_paths.append(clip_out)
report_tracks.append({
"folder_index": t.index_in_folder,
"filename": t.path.name,
"tempo_bpm_est": round(t.tempo_bpm, 2),
"energy_score": round(t.energy_score, 6),
"highlight_score": round(t.highlight_score, 6),
"approx_start_seconds": round(t.approx_start_s, 3),
"snapped_start_seconds": round(t.snapped_start_s, 3),
"clip_duration_seconds": round(t.clip_dur_s, 3),
})
# Build teaser WAV then MP3
out_wav = out_dir / args.output_wav
out_mp3 = out_dir / args.output_mp3
build_acrossfade_chain(clip_paths, out_wav, crossfade_s=float(args.crossfade))
export_mp3(out_wav, out_mp3, bitrate=str(args.mp3_bitrate))
report = {
"version": "v2",
"inputs": {
"tracks_dir": str(tracks_dir.resolve()),
"select": args.select,
"auto_n": int(args.auto_n),
"mode": args.mode,
},
"settings": {
"teaser_seconds": teaser_s,
"bars": int(args.bars),
"beats_per_bar": int(args.bpb),
"preroll_bars": int(args.preroll_bars),
"crossfade_seconds": float(args.crossfade),
"fade_seconds": float(args.fade),
"avoid_intro_seconds": float(args.avoid_intro),
"avoid_outro_seconds": float(args.avoid_outro),
"tempo_tolerance_bpm": float(args.tempo_tol),
"target_lufs": float(args.target_lufs),
"mp3_bitrate": str(args.mp3_bitrate),
},
"outputs": {
"wav": str(out_wav.resolve()),
"mp3": str(out_mp3.resolve()),
},
"tracks": report_tracks
}
report_path = out_dir / "teaser_report.json"
with open(report_path, "w", encoding="utf-8") as f:
json.dump(report, f, ensure_ascii=False, indent=2)
print(f"\n✅ Teaser WAV: {out_wav.resolve()}")
print(f"✅ Teaser MP3: {out_mp3.resolve()}")
print(f"📝 Report: {report_path.resolve()}")
# Optional: generate README / promo text via Ollama
if args.gen_readme:
if not args.ollama:
raise SystemExit("--gen-readme requires --ollama http://host:11434")
prompt = (
"You are helping with a GitHub repo for a local DJ teaser builder.\n"
"Write a concise README in English with:\n"
"- What it does\n- Requirements\n- Install\n- Usage examples\n- Tips for old-school trance / DJ phrasing\n"
"Also write a short promo text (YouTube/Instagram) for an album teaser.\n\n"
f"Settings:\n{json.dumps(report['settings'], indent=2)}\n\n"
f"Tracks (order):\n{json.dumps(report_tracks, indent=2)}\n"
)
text = ollama_generate(args.ollama, args.ollama_model, prompt)
readme_path = out_dir / "README_generated.md"
with open(readme_path, "w", encoding="utf-8") as f:
f.write(text + "\n")
print(f"🧠 Ollama README generated: {readme_path.resolve()}")
if __name__ == "__main__":
main()