245 lines
8 KiB
Python
Executable file
245 lines
8 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
retag.py — recursively fingerprint a folder of audio and overwrite tags
|
|
with whatever AcoustID + MusicBrainz cough up.
|
|
|
|
Dependencies:
|
|
pip install pyacoustid musicbrainzngs mutagen
|
|
sudo apt install libchromaprint-tools # provides `fpcalc`
|
|
|
|
You also need a free AcoustID API key:
|
|
https://acoustid.org/new-application
|
|
|
|
Usage:
|
|
export ACOUSTID_API_KEY=your_key_here
|
|
python3 retag.py /path/to/music # dry run by default
|
|
python3 retag.py /path/to/music --commit # actually writes tags
|
|
python3 retag.py /path/to/music --commit --no-backup # YOLO mode
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import acoustid
|
|
import musicbrainzngs
|
|
from mutagen import File as MutagenFile
|
|
from mutagen.easyid3 import EasyID3
|
|
from mutagen.flac import FLAC
|
|
from mutagen.mp4 import MP4
|
|
from mutagen.oggvorbis import OggVorbis
|
|
from mutagen.id3 import ID3NoHeaderError
|
|
|
|
# --- config ----------------------------------------------------------------
|
|
|
|
AUDIO_EXTS = {".mp3", ".flac", ".m4a", ".mp4", ".ogg", ".oga", ".opus", ".wav"}
|
|
MIN_SCORE = 0.85 # AcoustID match confidence floor. Below this, skip.
|
|
RATE_LIMIT_SLEEP = 1.0 # AcoustID free tier: 3 req/sec. Be a polite citizen.
|
|
|
|
musicbrainzngs.set_useragent("retag.py", "0.1", "https://example.invalid")
|
|
|
|
|
|
# --- helpers ---------------------------------------------------------------
|
|
|
|
def get_api_key() -> str:
|
|
key = os.environ.get("ACOUSTID_API_KEY")
|
|
if not key:
|
|
sys.exit("ERROR: set ACOUSTID_API_KEY env var. See acoustid.org/new-application")
|
|
return key
|
|
|
|
|
|
def iter_audio_files(root: Path):
|
|
for p in root.rglob("*"):
|
|
if p.is_file() and p.suffix.lower() in AUDIO_EXTS:
|
|
yield p
|
|
|
|
|
|
def fingerprint_lookup(api_key: str, path: Path) -> Optional[dict]:
|
|
"""Return best match dict {recording_id, score, title, artist} or None."""
|
|
try:
|
|
results = acoustid.match(api_key, str(path), parse=False)
|
|
except acoustid.NoBackendError:
|
|
sys.exit("ERROR: fpcalc not found. Install chromaprint-tools.")
|
|
except acoustid.FingerprintGenerationError as e:
|
|
print(f" ! fingerprint failed: {e}")
|
|
return None
|
|
except acoustid.WebServiceError as e:
|
|
print(f" ! acoustid web error: {e}")
|
|
return None
|
|
|
|
best = None
|
|
for result in results.get("results", []):
|
|
score = result.get("score", 0)
|
|
if score < MIN_SCORE:
|
|
continue
|
|
for recording in result.get("recordings", []) or []:
|
|
best = {
|
|
"recording_id": recording.get("id"),
|
|
"score": score,
|
|
"title": recording.get("title"),
|
|
"artists": recording.get("artists", []),
|
|
}
|
|
return best # first match above threshold wins
|
|
return best
|
|
|
|
|
|
def fetch_release_data(recording_id: str) -> Optional[dict]:
|
|
"""Pull richer metadata (album, date, track number) from MusicBrainz."""
|
|
try:
|
|
data = musicbrainzngs.get_recording_by_id(
|
|
recording_id,
|
|
includes=["releases", "artist-credits", "tags"],
|
|
)
|
|
except musicbrainzngs.WebServiceError as e:
|
|
print(f" ! musicbrainz error: {e}")
|
|
return None
|
|
|
|
rec = data.get("recording", {})
|
|
releases = rec.get("release-list", []) or []
|
|
# Prefer official album releases over compilations / singles when available.
|
|
releases.sort(key=lambda r: (
|
|
0 if r.get("status") == "Official" else 1,
|
|
0 if "Album" in (r.get("release-group", {}).get("primary-type", "") or "") else 1,
|
|
))
|
|
release = releases[0] if releases else {}
|
|
|
|
artist_credit = rec.get("artist-credit", [])
|
|
artist = "".join(
|
|
(ac.get("artist", {}).get("name", "") if isinstance(ac, dict) else str(ac))
|
|
for ac in artist_credit
|
|
) or None
|
|
|
|
return {
|
|
"title": rec.get("title"),
|
|
"artist": artist,
|
|
"album": release.get("title"),
|
|
"date": release.get("date"),
|
|
"albumartist": (release.get("artist-credit-phrase")
|
|
or (release.get("artist-credit", [{}])[0].get("artist", {}).get("name")
|
|
if release.get("artist-credit") else None)),
|
|
}
|
|
|
|
|
|
# --- tag IO ---------------------------------------------------------------
|
|
|
|
def read_existing_tags(path: Path) -> dict:
|
|
try:
|
|
audio = MutagenFile(str(path), easy=True)
|
|
if audio is None or audio.tags is None:
|
|
return {}
|
|
return {k: list(v) for k, v in audio.tags.items()}
|
|
except Exception as e:
|
|
print(f" ! couldn't read tags: {e}")
|
|
return {}
|
|
|
|
|
|
def write_tags(path: Path, meta: dict) -> bool:
|
|
"""Overwrite tags. Returns True on success."""
|
|
ext = path.suffix.lower()
|
|
|
|
try:
|
|
if ext == ".mp3":
|
|
try:
|
|
audio = EasyID3(str(path))
|
|
except ID3NoHeaderError:
|
|
audio = EasyID3()
|
|
audio.save(str(path))
|
|
audio = EasyID3(str(path))
|
|
audio.delete()
|
|
elif ext == ".flac":
|
|
audio = FLAC(str(path))
|
|
audio.delete()
|
|
elif ext in (".m4a", ".mp4"):
|
|
audio = MP4(str(path))
|
|
# MP4 uses its own atom keys — translate.
|
|
mp4_map = {
|
|
"title": "\xa9nam",
|
|
"artist": "\xa9ART",
|
|
"album": "\xa9alb",
|
|
"date": "\xa9day",
|
|
"albumartist": "aART",
|
|
}
|
|
audio.clear()
|
|
for field, value in meta.items():
|
|
if value and field in mp4_map:
|
|
audio[mp4_map[field]] = [value]
|
|
audio.save()
|
|
return True
|
|
elif ext in (".ogg", ".oga", ".opus"):
|
|
audio = OggVorbis(str(path))
|
|
audio.delete()
|
|
else:
|
|
print(f" ! unsupported extension for writing: {ext}")
|
|
return False
|
|
|
|
for field, value in meta.items():
|
|
if value:
|
|
audio[field] = [value]
|
|
audio.save()
|
|
return True
|
|
except Exception as e:
|
|
print(f" ! tag write failed: {e}")
|
|
return False
|
|
|
|
|
|
# --- main loop -------------------------------------------------------------
|
|
|
|
def process(path: Path, api_key: str, commit: bool, backup: bool) -> str:
|
|
print(f"\n→ {path}")
|
|
original = read_existing_tags(path)
|
|
|
|
match = fingerprint_lookup(api_key, path)
|
|
if not match:
|
|
return "no-match"
|
|
|
|
print(f" ✓ acoustid match (score={match['score']:.2f}): {match['title']}")
|
|
|
|
mb = fetch_release_data(match["recording_id"])
|
|
if not mb:
|
|
return "no-mb"
|
|
|
|
meta = {k: v for k, v in mb.items() if v}
|
|
print(f" → {meta.get('artist')} — {meta.get('title')} [{meta.get('album')}] ({meta.get('date')})")
|
|
|
|
if not commit:
|
|
return "dry-run"
|
|
|
|
if backup:
|
|
backup_path = path.with_suffix(path.suffix + ".tags.bak.json")
|
|
backup_path.write_text(json.dumps(original, indent=2, ensure_ascii=False))
|
|
|
|
return "written" if write_tags(path, meta) else "write-failed"
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(description="Fingerprint-based retag, scorched-earth edition.")
|
|
ap.add_argument("folder", type=Path, help="Root folder to walk.")
|
|
ap.add_argument("--commit", action="store_true", help="Actually write tags. Default is dry run.")
|
|
ap.add_argument("--no-backup", action="store_true", help="Skip writing .tags.bak.json sidecars.")
|
|
args = ap.parse_args()
|
|
|
|
if not args.folder.is_dir():
|
|
sys.exit(f"Not a directory: {args.folder}")
|
|
|
|
api_key = get_api_key()
|
|
stats = {}
|
|
for path in iter_audio_files(args.folder):
|
|
outcome = process(path, api_key, args.commit, backup=not args.no_backup)
|
|
stats[outcome] = stats.get(outcome, 0) + 1
|
|
time.sleep(RATE_LIMIT_SLEEP)
|
|
|
|
print("\n--- summary ---")
|
|
for k, v in sorted(stats.items()):
|
|
print(f" {k}: {v}")
|
|
if not args.commit:
|
|
print("\n(this was a dry run. add --commit to actually write tags.)")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|