update
This commit is contained in:
parent
6d70d54f44
commit
5cc29919e3
2 changed files with 286 additions and 8 deletions
49
nicesong
49
nicesong
|
|
@ -1,11 +1,25 @@
|
||||||
#!/bin/sh
|
#!/bin/sh
|
||||||
|
|
||||||
query_artist="$(playerctl -i firefox metadata --format '{{ artist }}')"
|
|
||||||
query_title="$(playerctl -i firefox metadata --format '{{ title }}')"
|
|
||||||
query="$query_artist - $query_title"
|
|
||||||
|
|
||||||
tmpdir="$(mktemp -d)"
|
tmpdir="$(mktemp -d)"
|
||||||
|
monitor="$(pactl get-default-sink).monitor"
|
||||||
|
|
||||||
|
echo "Listening on $monitor (Shazam)..."
|
||||||
|
result="$(songrec recognize -d "$monitor" -j)"
|
||||||
|
|
||||||
|
artist="$(printf '%s' "$result" | jq -r '.track.subtitle // empty')"
|
||||||
|
title="$(printf '%s' "$result" | jq -r '.track.title // empty')"
|
||||||
|
|
||||||
|
if [ -z "$artist" ] || [ -z "$title" ]; then
|
||||||
|
echo "Could not identify song. Even chaos has limits."
|
||||||
|
rm -rf "$tmpdir"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
query="$artist - $title"
|
||||||
|
|
||||||
|
echo
|
||||||
|
echo "Identified: $query"
|
||||||
|
echo
|
||||||
echo "Searching YouTube for:"
|
echo "Searching YouTube for:"
|
||||||
echo " $query"
|
echo " $query"
|
||||||
echo
|
echo
|
||||||
|
|
@ -24,19 +38,38 @@ read -p "Download this? [y/N] " confirm
|
||||||
if [[ "$confirm" =~ ^[Yy]$ ]]; then
|
if [[ "$confirm" =~ ^[Yy]$ ]]; then
|
||||||
yt-dlp -x --audio-format m4a -f bestaudio \
|
yt-dlp -x --audio-format m4a -f bestaudio \
|
||||||
"ytsearch1:$query" \
|
"ytsearch1:$query" \
|
||||||
--embed-metadata \
|
-o "$tmpdir/download.%(ext)s"
|
||||||
-o "$tmpdir/%(artist)s - %(albums)s - %(title)s.%(ext)s"
|
|
||||||
|
|
||||||
file="$(ls "$tmpdir"/* 2>/dev/null | head -n1)"
|
file="$(ls "$tmpdir"/download.* 2>/dev/null | head -n1)"
|
||||||
|
|
||||||
if [[ -n "$file" ]]; then
|
if [[ -n "$file" ]]; then
|
||||||
beet import --from-scratch "$file"
|
script_dir="$(cd "$(dirname "$0")" && pwd)"
|
||||||
|
python3 "$script_dir/retag.py" "$tmpdir" --commit --no-backup
|
||||||
|
|
||||||
|
tag_artist="$(ffprobe -v error -show_entries format_tags=artist -of default=nw=1:nk=1 "$file")"
|
||||||
|
tag_album="$(ffprobe -v error -show_entries format_tags=album -of default=nw=1:nk=1 "$file")"
|
||||||
|
tag_title="$(ffprobe -v error -show_entries format_tags=title -of default=nw=1:nk=1 "$file")"
|
||||||
|
|
||||||
|
a="${tag_artist:-$artist}"
|
||||||
|
b="${tag_album:-Unknown Album}"
|
||||||
|
t="${tag_title:-$title}"
|
||||||
|
|
||||||
|
# filesystem-safe: replace path separator
|
||||||
|
a="${a//\//_}"
|
||||||
|
b="${b//\//_}"
|
||||||
|
t="${t//\//_}"
|
||||||
|
|
||||||
|
dest="$HOME/Music/$a/$b"
|
||||||
|
mkdir -p "$dest"
|
||||||
|
mv "$file" "$dest/$t.m4a"
|
||||||
|
echo "Saved to: $dest/$t.m4a"
|
||||||
else
|
else
|
||||||
echo "Download failed. Even chaos has limits."
|
echo "Download failed. Even chaos has limits."
|
||||||
fi
|
fi
|
||||||
|
|
||||||
rm -rf "$tmpdir"
|
rm -rf "$tmpdir"
|
||||||
else
|
else
|
||||||
|
rm -rf "$tmpdir"
|
||||||
echo "Aborted."
|
echo "Aborted."
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
|
|
||||||
245
retag.py
Normal file
245
retag.py
Normal file
|
|
@ -0,0 +1,245 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
retag.py — recursively fingerprint a folder of audio and overwrite tags
|
||||||
|
with whatever AcoustID + MusicBrainz cough up.
|
||||||
|
|
||||||
|
Dependencies:
|
||||||
|
pip install pyacoustid musicbrainzngs mutagen
|
||||||
|
sudo apt install libchromaprint-tools # provides `fpcalc`
|
||||||
|
|
||||||
|
You also need a free AcoustID API key:
|
||||||
|
https://acoustid.org/new-application
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
export ACOUSTID_API_KEY=your_key_here
|
||||||
|
python3 retag.py /path/to/music # dry run by default
|
||||||
|
python3 retag.py /path/to/music --commit # actually writes tags
|
||||||
|
python3 retag.py /path/to/music --commit --no-backup # YOLO mode
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import acoustid
|
||||||
|
import musicbrainzngs
|
||||||
|
from mutagen import File as MutagenFile
|
||||||
|
from mutagen.easyid3 import EasyID3
|
||||||
|
from mutagen.flac import FLAC
|
||||||
|
from mutagen.mp4 import MP4
|
||||||
|
from mutagen.oggvorbis import OggVorbis
|
||||||
|
from mutagen.id3 import ID3NoHeaderError
|
||||||
|
|
||||||
|
# --- config ----------------------------------------------------------------
|
||||||
|
|
||||||
|
AUDIO_EXTS = {".mp3", ".flac", ".m4a", ".mp4", ".ogg", ".oga", ".opus", ".wav"}
|
||||||
|
MIN_SCORE = 0.85 # AcoustID match confidence floor. Below this, skip.
|
||||||
|
RATE_LIMIT_SLEEP = 1.0 # AcoustID free tier: 3 req/sec. Be a polite citizen.
|
||||||
|
|
||||||
|
musicbrainzngs.set_useragent("retag.py", "0.1", "https://example.invalid")
|
||||||
|
|
||||||
|
|
||||||
|
# --- helpers ---------------------------------------------------------------
|
||||||
|
|
||||||
|
def get_api_key() -> str:
|
||||||
|
key = os.environ.get("ACOUSTID_API_KEY")
|
||||||
|
if not key:
|
||||||
|
sys.exit("ERROR: set ACOUSTID_API_KEY env var. See acoustid.org/new-application")
|
||||||
|
return key
|
||||||
|
|
||||||
|
|
||||||
|
def iter_audio_files(root: Path):
|
||||||
|
for p in root.rglob("*"):
|
||||||
|
if p.is_file() and p.suffix.lower() in AUDIO_EXTS:
|
||||||
|
yield p
|
||||||
|
|
||||||
|
|
||||||
|
def fingerprint_lookup(api_key: str, path: Path) -> Optional[dict]:
|
||||||
|
"""Return best match dict {recording_id, score, title, artist} or None."""
|
||||||
|
try:
|
||||||
|
results = acoustid.match(api_key, str(path), parse=False)
|
||||||
|
except acoustid.NoBackendError:
|
||||||
|
sys.exit("ERROR: fpcalc not found. Install chromaprint-tools.")
|
||||||
|
except acoustid.FingerprintGenerationError as e:
|
||||||
|
print(f" ! fingerprint failed: {e}")
|
||||||
|
return None
|
||||||
|
except acoustid.WebServiceError as e:
|
||||||
|
print(f" ! acoustid web error: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
best = None
|
||||||
|
for result in results.get("results", []):
|
||||||
|
score = result.get("score", 0)
|
||||||
|
if score < MIN_SCORE:
|
||||||
|
continue
|
||||||
|
for recording in result.get("recordings", []) or []:
|
||||||
|
best = {
|
||||||
|
"recording_id": recording.get("id"),
|
||||||
|
"score": score,
|
||||||
|
"title": recording.get("title"),
|
||||||
|
"artists": recording.get("artists", []),
|
||||||
|
}
|
||||||
|
return best # first match above threshold wins
|
||||||
|
return best
|
||||||
|
|
||||||
|
|
||||||
|
def fetch_release_data(recording_id: str) -> Optional[dict]:
|
||||||
|
"""Pull richer metadata (album, date, track number) from MusicBrainz."""
|
||||||
|
try:
|
||||||
|
data = musicbrainzngs.get_recording_by_id(
|
||||||
|
recording_id,
|
||||||
|
includes=["releases", "artist-credits", "tags"],
|
||||||
|
)
|
||||||
|
except musicbrainzngs.WebServiceError as e:
|
||||||
|
print(f" ! musicbrainz error: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
rec = data.get("recording", {})
|
||||||
|
releases = rec.get("release-list", []) or []
|
||||||
|
# Prefer official album releases over compilations / singles when available.
|
||||||
|
releases.sort(key=lambda r: (
|
||||||
|
0 if r.get("status") == "Official" else 1,
|
||||||
|
0 if "Album" in (r.get("release-group", {}).get("primary-type", "") or "") else 1,
|
||||||
|
))
|
||||||
|
release = releases[0] if releases else {}
|
||||||
|
|
||||||
|
artist_credit = rec.get("artist-credit", [])
|
||||||
|
artist = "".join(
|
||||||
|
(ac.get("artist", {}).get("name", "") if isinstance(ac, dict) else str(ac))
|
||||||
|
for ac in artist_credit
|
||||||
|
) or None
|
||||||
|
|
||||||
|
return {
|
||||||
|
"title": rec.get("title"),
|
||||||
|
"artist": artist,
|
||||||
|
"album": release.get("title"),
|
||||||
|
"date": release.get("date"),
|
||||||
|
"albumartist": (release.get("artist-credit-phrase")
|
||||||
|
or (release.get("artist-credit", [{}])[0].get("artist", {}).get("name")
|
||||||
|
if release.get("artist-credit") else None)),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# --- tag IO ---------------------------------------------------------------
|
||||||
|
|
||||||
|
def read_existing_tags(path: Path) -> dict:
|
||||||
|
try:
|
||||||
|
audio = MutagenFile(str(path), easy=True)
|
||||||
|
if audio is None or audio.tags is None:
|
||||||
|
return {}
|
||||||
|
return {k: list(v) for k, v in audio.tags.items()}
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ! couldn't read tags: {e}")
|
||||||
|
return {}
|
||||||
|
|
||||||
|
|
||||||
|
def write_tags(path: Path, meta: dict) -> bool:
|
||||||
|
"""Overwrite tags. Returns True on success."""
|
||||||
|
ext = path.suffix.lower()
|
||||||
|
|
||||||
|
try:
|
||||||
|
if ext == ".mp3":
|
||||||
|
try:
|
||||||
|
audio = EasyID3(str(path))
|
||||||
|
except ID3NoHeaderError:
|
||||||
|
audio = EasyID3()
|
||||||
|
audio.save(str(path))
|
||||||
|
audio = EasyID3(str(path))
|
||||||
|
audio.delete()
|
||||||
|
elif ext == ".flac":
|
||||||
|
audio = FLAC(str(path))
|
||||||
|
audio.delete()
|
||||||
|
elif ext in (".m4a", ".mp4"):
|
||||||
|
audio = MP4(str(path))
|
||||||
|
# MP4 uses its own atom keys — translate.
|
||||||
|
mp4_map = {
|
||||||
|
"title": "\xa9nam",
|
||||||
|
"artist": "\xa9ART",
|
||||||
|
"album": "\xa9alb",
|
||||||
|
"date": "\xa9day",
|
||||||
|
"albumartist": "aART",
|
||||||
|
}
|
||||||
|
audio.clear()
|
||||||
|
for field, value in meta.items():
|
||||||
|
if value and field in mp4_map:
|
||||||
|
audio[mp4_map[field]] = [value]
|
||||||
|
audio.save()
|
||||||
|
return True
|
||||||
|
elif ext in (".ogg", ".oga", ".opus"):
|
||||||
|
audio = OggVorbis(str(path))
|
||||||
|
audio.delete()
|
||||||
|
else:
|
||||||
|
print(f" ! unsupported extension for writing: {ext}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
for field, value in meta.items():
|
||||||
|
if value:
|
||||||
|
audio[field] = [value]
|
||||||
|
audio.save()
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ! tag write failed: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
# --- main loop -------------------------------------------------------------
|
||||||
|
|
||||||
|
def process(path: Path, api_key: str, commit: bool, backup: bool) -> str:
|
||||||
|
print(f"\n→ {path}")
|
||||||
|
original = read_existing_tags(path)
|
||||||
|
|
||||||
|
match = fingerprint_lookup(api_key, path)
|
||||||
|
if not match:
|
||||||
|
return "no-match"
|
||||||
|
|
||||||
|
print(f" ✓ acoustid match (score={match['score']:.2f}): {match['title']}")
|
||||||
|
|
||||||
|
mb = fetch_release_data(match["recording_id"])
|
||||||
|
if not mb:
|
||||||
|
return "no-mb"
|
||||||
|
|
||||||
|
meta = {k: v for k, v in mb.items() if v}
|
||||||
|
print(f" → {meta.get('artist')} — {meta.get('title')} [{meta.get('album')}] ({meta.get('date')})")
|
||||||
|
|
||||||
|
if not commit:
|
||||||
|
return "dry-run"
|
||||||
|
|
||||||
|
if backup:
|
||||||
|
backup_path = path.with_suffix(path.suffix + ".tags.bak.json")
|
||||||
|
backup_path.write_text(json.dumps(original, indent=2, ensure_ascii=False))
|
||||||
|
|
||||||
|
return "written" if write_tags(path, meta) else "write-failed"
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
ap = argparse.ArgumentParser(description="Fingerprint-based retag, scorched-earth edition.")
|
||||||
|
ap.add_argument("folder", type=Path, help="Root folder to walk.")
|
||||||
|
ap.add_argument("--commit", action="store_true", help="Actually write tags. Default is dry run.")
|
||||||
|
ap.add_argument("--no-backup", action="store_true", help="Skip writing .tags.bak.json sidecars.")
|
||||||
|
args = ap.parse_args()
|
||||||
|
|
||||||
|
if not args.folder.is_dir():
|
||||||
|
sys.exit(f"Not a directory: {args.folder}")
|
||||||
|
|
||||||
|
api_key = get_api_key()
|
||||||
|
stats = {}
|
||||||
|
for path in iter_audio_files(args.folder):
|
||||||
|
outcome = process(path, api_key, args.commit, backup=not args.no_backup)
|
||||||
|
stats[outcome] = stats.get(outcome, 0) + 1
|
||||||
|
time.sleep(RATE_LIMIT_SLEEP)
|
||||||
|
|
||||||
|
print("\n--- summary ---")
|
||||||
|
for k, v in sorted(stats.items()):
|
||||||
|
print(f" {k}: {v}")
|
||||||
|
if not args.commit:
|
||||||
|
print("\n(this was a dry run. add --commit to actually write tags.)")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Loading…
Reference in a new issue