mirror of
https://github.com/advplyr/audiobookshelf.git
synced 2026-03-01 05:29:41 +00:00
Add scripts to allow some library merging
This commit is contained in:
parent
1cd6473b26
commit
771f8c586f
2 changed files with 385 additions and 0 deletions
124
scripts/merge_book_parts.py
Normal file
124
scripts/merge_book_parts.py
Normal file
|
|
@ -0,0 +1,124 @@
|
|||
import os
|
||||
import shutil
|
||||
import argparse
|
||||
import logging
|
||||
import re
|
||||
import sys
|
||||
|
||||
def get_base_name(name):
|
||||
"""
|
||||
Identifies the 'base' name of a folder by stripping trailing part/track identifiers
|
||||
and handling redundant title repetitions.
|
||||
"""
|
||||
# 1. Clean trailing garbage and separators
|
||||
base = re.sub(r'[^a-zA-Z0-9\]\)]+$', '', name).strip()
|
||||
|
||||
# 2. Strip common part/track suffixes
|
||||
patterns = [
|
||||
r'\s+Part\s?\d+$',
|
||||
r'\s+CD\s?\d+$',
|
||||
r'\s+Disc\s?\d+$',
|
||||
r'\s+Disk\s?\d+$',
|
||||
r'\s+\d+(?:-\d+)?$',
|
||||
r'\s+-\s+\d+(?:-\d+)?$'
|
||||
]
|
||||
|
||||
changed = True
|
||||
while changed:
|
||||
old_base = base
|
||||
for pattern in patterns:
|
||||
base = re.sub(pattern, '', base, flags=re.IGNORECASE).strip()
|
||||
changed = old_base != base
|
||||
|
||||
# 3. Handle redundancy: "Title - Title" or "Title - 1356 - 1356"
|
||||
# Bernard Cornwell case: "BERNARD CORNWELL ~ [Grail Quest 04] - 1356 - 1356"
|
||||
parts = [p.strip() for p in base.split(' - ') if p.strip()]
|
||||
if len(parts) > 1:
|
||||
last = parts[-1].lower()
|
||||
for i in range(len(parts) - 1):
|
||||
if parts[i].lower() == last:
|
||||
# If we find a repeat, the base is everything up to that repeat
|
||||
return " - ".join(parts[:i+1])
|
||||
|
||||
return base
|
||||
|
||||
def merge_folders(root, dry_run=False):
|
||||
root = os.path.abspath(root)
|
||||
if not os.path.exists(root):
|
||||
logging.error(f"Root path does not exist: {root}")
|
||||
return
|
||||
|
||||
try:
|
||||
folders = [f for f in os.listdir(root) if os.path.isdir(os.path.join(root, f))]
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to list directory {root}: {e}")
|
||||
return
|
||||
|
||||
groups = {} # base_name -> list of original_folder_names
|
||||
|
||||
for f in folders:
|
||||
base = get_base_name(f)
|
||||
if base:
|
||||
if base not in groups:
|
||||
groups[base] = []
|
||||
groups[base].append(f)
|
||||
|
||||
# Only keep groups that actually have more than one folder to merge
|
||||
merge_groups = {k: v for k, v in groups.items() if len(v) > 1}
|
||||
|
||||
if not merge_groups:
|
||||
logging.info("No split folder groups detected.")
|
||||
return
|
||||
|
||||
logging.info(f"Found {len(merge_groups)} groups to merge.")
|
||||
|
||||
for base, originals in merge_groups.items():
|
||||
target_dir = os.path.join(root, base)
|
||||
logging.info(f"GROUP: '{base}' ({len(originals)} parts)")
|
||||
|
||||
for folder in sorted(originals):
|
||||
src_path = os.path.join(root, folder)
|
||||
logging.info(f" - MERGE: '{folder}' -> '{base}'")
|
||||
|
||||
if not dry_run:
|
||||
if not os.path.exists(target_dir):
|
||||
os.makedirs(target_dir)
|
||||
|
||||
try:
|
||||
for item in os.listdir(src_path):
|
||||
src_item = os.path.join(src_path, item)
|
||||
dst_item = os.path.join(target_dir, item)
|
||||
|
||||
# Handle name collisions (e.g. multiple 'cover.jpg' or 'track.mp3')
|
||||
if os.path.exists(dst_item):
|
||||
# Extract the part identifier from the folder name to make file unique
|
||||
part_id = folder[len(base):].strip(' -_')
|
||||
name, ext = os.path.splitext(item)
|
||||
new_name = f"{name} ({part_id}){ext}"
|
||||
dst_item = os.path.join(target_dir, new_name)
|
||||
logging.debug(f" Collision: Renaming '{item}' to '{new_name}'")
|
||||
|
||||
shutil.move(src_item, dst_item)
|
||||
|
||||
# Cleanup the now empty source folder
|
||||
os.rmdir(src_path)
|
||||
except Exception as e:
|
||||
logging.error(f" Operation failed for '{folder}': {e}")
|
||||
|
||||
if dry_run:
|
||||
logging.info("Dry run complete. No files were moved.")
|
||||
else:
|
||||
logging.info("Merge operations completed successfully.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Merge split audiobook part-folders into single folders.")
|
||||
parser.add_argument("path", help="Root directory containing the split folders")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Show planned merges without executing")
|
||||
parser.add_argument("--verbose", action="store_true", help="Show detailed collision info")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
log_level = logging.DEBUG if args.verbose else logging.INFO
|
||||
logging.basicConfig(level=log_level, format='%(levelname)s: %(message)s', handlers=[logging.StreamHandler(sys.stdout)])
|
||||
|
||||
merge_folders(args.path, args.dry_run)
|
||||
261
scripts/reorganize_library.py
Normal file
261
scripts/reorganize_library.py
Normal file
|
|
@ -0,0 +1,261 @@
|
|||
import os
|
||||
import shutil
|
||||
import argparse
|
||||
import logging
|
||||
import sys
|
||||
import re
|
||||
|
||||
# Supported audio extensions in Audiobookshelf
|
||||
AUDIO_EXTENSIONS = {
|
||||
'.m4b', '.mp3', '.m4a', '.flac', '.opus', '.ogg', '.oga',
|
||||
'.mp4', '.aac', '.wma', '.aiff', '.aif', '.wav', '.webm',
|
||||
'.webma', '.mka', '.awb', '.caf', '.mpg', '.mpeg'
|
||||
}
|
||||
|
||||
def clean_author_name(name):
|
||||
"""Strips common suffixes from author names for cleaner folder names."""
|
||||
suffixes = [" Collection", " Anthology", " Series", " Books", " Works", " Complete", " (All Chaptered)"]
|
||||
clean = name.strip()
|
||||
for suffix in suffixes:
|
||||
if clean.lower().endswith(suffix.lower()):
|
||||
clean = clean[:-len(suffix)].strip()
|
||||
return clean
|
||||
|
||||
def clean_title(title):
|
||||
"""
|
||||
Cleans a title/filename by removing common metadata suffixes.
|
||||
e.g. "1997 - A Spy in Europa (Hauenstein) 32k 00.51.22 {11.8mb}" -> "1997 - A Spy in Europa"
|
||||
"""
|
||||
# Remove file extension if present
|
||||
base = os.path.splitext(title)[0]
|
||||
|
||||
# Remove metadata in curly braces {11.8mb}
|
||||
base = re.sub(r'\{.*?\}', '', base)
|
||||
# Remove metadata in parentheses (Hauenstein)
|
||||
base = re.sub(r'\(.*?\)', '', base)
|
||||
# Remove bitrates like 32k, 128kbps
|
||||
base = re.sub(r'\b\d{1,3}\s?k(bps)?\b', '', base, flags=re.IGNORECASE)
|
||||
# Remove durations like 00.51.22 or 12.34
|
||||
base = re.sub(r'\b\d{1,2}[\.:]\d{2}([\.:]\d{2})?\b', '', base)
|
||||
|
||||
# Clean up extra spaces and dashes
|
||||
base = base.replace('_', ' ').strip()
|
||||
base = re.sub(r'\s+-\s+', ' - ', base)
|
||||
base = re.sub(r'\s+', ' ', base)
|
||||
return base.strip(' -')
|
||||
|
||||
def get_target_name(rel_path, author_override=None, item_name=None):
|
||||
"""
|
||||
Generates a flat target name from path parts and optionally an item name.
|
||||
"""
|
||||
# 1. Determine Author
|
||||
path_parts = [p.strip() for p in rel_path.replace('\\', '/').split('/') if p.strip()]
|
||||
author = author_override or clean_author_name(path_parts[0])
|
||||
|
||||
# 2. Collect segments
|
||||
# Start with author
|
||||
segments = [author]
|
||||
|
||||
# Add intermediate path segments (excluding author and if they aren't redundant)
|
||||
for part in path_parts[1:]:
|
||||
clean_part = part
|
||||
if clean_part.lower().startswith(author.lower()):
|
||||
clean_part = clean_part[len(author):].strip(' -_')
|
||||
if clean_part:
|
||||
segments.append(clean_part)
|
||||
|
||||
# Add the specific item name if we are splitting
|
||||
if item_name:
|
||||
clean_item = clean_title(item_name)
|
||||
if clean_item.lower().startswith(author.lower()):
|
||||
clean_item = clean_item[len(author):].strip(' -_')
|
||||
if clean_item:
|
||||
segments.append(clean_item)
|
||||
|
||||
# 3. Final cleanup and deduplication
|
||||
final_segments = []
|
||||
for s in segments:
|
||||
s_clean = s.strip(' -_')
|
||||
if not s_clean: continue
|
||||
|
||||
# Don't add if it's redundant with the previous segment
|
||||
if final_segments:
|
||||
prev = final_segments[-1].lower()
|
||||
curr = s_clean.lower()
|
||||
if curr == prev or curr.startswith(prev) or prev.endswith(curr):
|
||||
# If current is longer and starts with prev, replace prev
|
||||
if len(curr) > len(prev) and curr.startswith(prev):
|
||||
final_segments[-1] = s_clean
|
||||
continue
|
||||
|
||||
final_segments.append(s_clean)
|
||||
|
||||
return " - ".join(final_segments)
|
||||
|
||||
def is_multi_work_dir(path):
|
||||
"""
|
||||
Heuristic to detect if a folder contains multiple independent books.
|
||||
"""
|
||||
try:
|
||||
items = os.listdir(path)
|
||||
except PermissionError:
|
||||
return False
|
||||
|
||||
audio_files = [i for i in items if os.path.isfile(os.path.join(path, i)) and os.path.splitext(i)[1].lower() in AUDIO_EXTENSIONS]
|
||||
if len(audio_files) < 2:
|
||||
return False
|
||||
|
||||
# Check for sequential track numbering (01, 02, Part 1...)
|
||||
# If most files start with a small number, it's likely a single book.
|
||||
numbered_count = 0
|
||||
for f in audio_files:
|
||||
if re.match(r'^(\d{1,3}|Part\s?\d+|Disc\s?\d+)\b', f, re.I):
|
||||
numbered_count += 1
|
||||
|
||||
# If less than 50% are numbered, or they have very different names, treat as multi-work
|
||||
if numbered_count / len(audio_files) < 0.5:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def is_book_dir(path):
|
||||
"""
|
||||
Checks if a directory is a single book folder.
|
||||
"""
|
||||
try:
|
||||
items = os.listdir(path)
|
||||
except PermissionError:
|
||||
return False
|
||||
|
||||
audio_files = [i for i in items if os.path.isfile(os.path.join(path, i)) and os.path.splitext(i)[1].lower() in AUDIO_EXTENSIONS]
|
||||
subdirs = [i for i in items if os.path.isdir(os.path.join(path, i))]
|
||||
|
||||
if audio_files:
|
||||
# A folder with audio is a book if it's NOT a multi-work container
|
||||
if is_multi_work_dir(path):
|
||||
return False
|
||||
|
||||
# Also check if it has deeper books
|
||||
non_cd_subdirs = [s for s in subdirs if not s.lower().startswith(('cd', 'disc', 'disk'))]
|
||||
for s in non_cd_subdirs:
|
||||
# Recursive check: if a subdirectory has audio, this folder might just be a container
|
||||
if any(os.path.splitext(f)[1].lower() in AUDIO_EXTENSIONS for dp, dn, filenames in os.walk(os.path.join(path, s)) for f in filenames):
|
||||
return False
|
||||
return True
|
||||
|
||||
if subdirs:
|
||||
# Check if it's a multi-disc book (only CD subfolders containing audio)
|
||||
audio_subdirs = []
|
||||
for s in subdirs:
|
||||
s_path = os.path.join(path, s)
|
||||
if any(os.path.splitext(f)[1].lower() in AUDIO_EXTENSIONS for dp, dn, filenames in os.walk(s_path) for f in filenames):
|
||||
audio_subdirs.append(s)
|
||||
|
||||
if audio_subdirs and all(s.lower().startswith(('cd', 'disc', 'disk')) for s in audio_subdirs):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def migrate(root, dry_run=False, split=False):
|
||||
root = os.path.abspath(root)
|
||||
if not os.path.exists(root):
|
||||
logging.error(f"Root path does not exist: {root}")
|
||||
return
|
||||
|
||||
logging.info(f"Scanning library at: {root}")
|
||||
|
||||
book_dirs = []
|
||||
multi_work_dirs = []
|
||||
|
||||
for dirpath, dirnames, filenames in os.walk(root):
|
||||
if dirpath == root:
|
||||
continue
|
||||
|
||||
rel_path = os.path.relpath(dirpath, root)
|
||||
|
||||
if split and is_multi_work_dir(dirpath):
|
||||
logging.debug(f"Found multi-work directory: {rel_path}")
|
||||
multi_work_dirs.append(dirpath)
|
||||
dirnames[:] = [] # Stop recursion
|
||||
elif is_book_dir(dirpath):
|
||||
logging.debug(f"Found book directory: {rel_path}")
|
||||
book_dirs.append(dirpath)
|
||||
dirnames[:] = [] # Stop recursion
|
||||
|
||||
moves = [] # List of (src, dst, is_file)
|
||||
|
||||
# 1. Plan Moves for Multi-Work files
|
||||
for mw_path in multi_work_dirs:
|
||||
rel_path = os.path.relpath(mw_path, root)
|
||||
files = [f for f in os.listdir(mw_path) if os.path.isfile(os.path.join(mw_path, f)) and os.path.splitext(f)[1].lower() in AUDIO_EXTENSIONS]
|
||||
|
||||
for f in files:
|
||||
src = os.path.join(mw_path, f)
|
||||
# Create a target folder name for this specific file
|
||||
target_name = get_target_name(rel_path, item_name=f)
|
||||
target_dir = os.path.join(root, target_name)
|
||||
dst = os.path.join(target_dir, f)
|
||||
moves.append((src, dst, True))
|
||||
|
||||
# 2. Plan Moves for Book Folders
|
||||
for book_path in book_dirs:
|
||||
rel_path = os.path.relpath(book_path, root)
|
||||
target_name = get_target_name(rel_path)
|
||||
target_path = os.path.join(root, target_name)
|
||||
|
||||
if os.path.abspath(book_path) == os.path.abspath(target_path):
|
||||
continue
|
||||
|
||||
moves.append((book_path, target_path, False))
|
||||
|
||||
logging.info(f"Planned operations: {len(moves)}")
|
||||
|
||||
# Execute moves
|
||||
successful_moves = 0
|
||||
for src, dst, is_file_move in moves:
|
||||
src_rel = os.path.relpath(src, root)
|
||||
dst_rel = os.path.relpath(dst, root)
|
||||
|
||||
if os.path.exists(dst):
|
||||
if os.path.abspath(src) == os.path.abspath(dst):
|
||||
continue
|
||||
logging.warning(f"Conflict: Target already exists: {dst_rel}. Skipping {src_rel}")
|
||||
continue
|
||||
|
||||
logging.info(f"PLAN: '{src_rel}' -> '{dst_rel}'")
|
||||
if not dry_run:
|
||||
try:
|
||||
if is_file_move:
|
||||
os.makedirs(os.path.dirname(dst), exist_ok=True)
|
||||
shutil.move(src, dst)
|
||||
successful_moves += 1
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to move '{src_rel}': {e}")
|
||||
|
||||
if not dry_run:
|
||||
logging.info("Cleaning up empty directories...")
|
||||
for dirpath, dirnames, filenames in os.walk(root, topdown=False):
|
||||
if dirpath == root: continue
|
||||
try:
|
||||
if not os.listdir(dirpath):
|
||||
os.rmdir(dirpath)
|
||||
except: pass
|
||||
else:
|
||||
logging.info(f"Dry run complete. {len(moves)} operations would be performed.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Reorganize library to a flat structure.")
|
||||
parser.add_argument("path", help="Root directory of the library")
|
||||
parser.add_argument("--dry-run", action="store_true")
|
||||
parser.add_argument("--verbose", action="store_true")
|
||||
parser.add_argument("--split", action="store_true", help="Split folders with multiple independent books")
|
||||
parser.add_argument("--log-file", help="Path to a file to write logs to")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
log_level = logging.DEBUG if args.verbose else logging.INFO
|
||||
handlers = [logging.StreamHandler(sys.stdout)]
|
||||
if args.log_file: handlers.append(logging.FileHandler(args.log_file))
|
||||
logging.basicConfig(level=log_level, format='%(levelname)s: %(message)s', handlers=handlers)
|
||||
|
||||
migrate(args.path, args.dry_run, args.split)
|
||||
Loading…
Add table
Add a link
Reference in a new issue