diff --git a/scripts/merge_book_parts.py b/scripts/merge_book_parts.py new file mode 100644 index 000000000..b3a733d93 --- /dev/null +++ b/scripts/merge_book_parts.py @@ -0,0 +1,124 @@ +import os +import shutil +import argparse +import logging +import re +import sys + +def get_base_name(name): + """ + Identifies the 'base' name of a folder by stripping trailing part/track identifiers + and handling redundant title repetitions. + """ + # 1. Clean trailing garbage and separators + base = re.sub(r'[^a-zA-Z0-9\]\)]+$', '', name).strip() + + # 2. Strip common part/track suffixes + patterns = [ + r'\s+Part\s?\d+$', + r'\s+CD\s?\d+$', + r'\s+Disc\s?\d+$', + r'\s+Disk\s?\d+$', + r'\s+\d+(?:-\d+)?$', + r'\s+-\s+\d+(?:-\d+)?$' + ] + + changed = True + while changed: + old_base = base + for pattern in patterns: + base = re.sub(pattern, '', base, flags=re.IGNORECASE).strip() + changed = old_base != base + + # 3. Handle redundancy: "Title - Title" or "Title - 1356 - 1356" + # Bernard Cornwell case: "BERNARD CORNWELL ~ [Grail Quest 04] - 1356 - 1356" + parts = [p.strip() for p in base.split(' - ') if p.strip()] + if len(parts) > 1: + last = parts[-1].lower() + for i in range(len(parts) - 1): + if parts[i].lower() == last: + # If we find a repeat, the base is everything up to that repeat + return " - ".join(parts[:i+1]) + + return base + +def merge_folders(root, dry_run=False): + root = os.path.abspath(root) + if not os.path.exists(root): + logging.error(f"Root path does not exist: {root}") + return + + try: + folders = [f for f in os.listdir(root) if os.path.isdir(os.path.join(root, f))] + except Exception as e: + logging.error(f"Failed to list directory {root}: {e}") + return + + groups = {} # base_name -> list of original_folder_names + + for f in folders: + base = get_base_name(f) + if base: + if base not in groups: + groups[base] = [] + groups[base].append(f) + + # Only keep groups that actually have more than one folder to merge + merge_groups = {k: v for k, v in groups.items() if len(v) > 1} + + if not merge_groups: + logging.info("No split folder groups detected.") + return + + logging.info(f"Found {len(merge_groups)} groups to merge.") + + for base, originals in merge_groups.items(): + target_dir = os.path.join(root, base) + logging.info(f"GROUP: '{base}' ({len(originals)} parts)") + + for folder in sorted(originals): + src_path = os.path.join(root, folder) + logging.info(f" - MERGE: '{folder}' -> '{base}'") + + if not dry_run: + if not os.path.exists(target_dir): + os.makedirs(target_dir) + + try: + for item in os.listdir(src_path): + src_item = os.path.join(src_path, item) + dst_item = os.path.join(target_dir, item) + + # Handle name collisions (e.g. multiple 'cover.jpg' or 'track.mp3') + if os.path.exists(dst_item): + # Extract the part identifier from the folder name to make file unique + part_id = folder[len(base):].strip(' -_') + name, ext = os.path.splitext(item) + new_name = f"{name} ({part_id}){ext}" + dst_item = os.path.join(target_dir, new_name) + logging.debug(f" Collision: Renaming '{item}' to '{new_name}'") + + shutil.move(src_item, dst_item) + + # Cleanup the now empty source folder + os.rmdir(src_path) + except Exception as e: + logging.error(f" Operation failed for '{folder}': {e}") + + if dry_run: + logging.info("Dry run complete. No files were moved.") + else: + logging.info("Merge operations completed successfully.") + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Merge split audiobook part-folders into single folders.") + parser.add_argument("path", help="Root directory containing the split folders") + parser.add_argument("--dry-run", action="store_true", help="Show planned merges without executing") + parser.add_argument("--verbose", action="store_true", help="Show detailed collision info") + + args = parser.parse_args() + + log_level = logging.DEBUG if args.verbose else logging.INFO + logging.basicConfig(level=log_level, format='%(levelname)s: %(message)s', handlers=[logging.StreamHandler(sys.stdout)]) + + merge_folders(args.path, args.dry_run) diff --git a/scripts/reorganize_library.py b/scripts/reorganize_library.py new file mode 100644 index 000000000..3b2cf9db3 --- /dev/null +++ b/scripts/reorganize_library.py @@ -0,0 +1,261 @@ +import os +import shutil +import argparse +import logging +import sys +import re + +# Supported audio extensions in Audiobookshelf +AUDIO_EXTENSIONS = { + '.m4b', '.mp3', '.m4a', '.flac', '.opus', '.ogg', '.oga', + '.mp4', '.aac', '.wma', '.aiff', '.aif', '.wav', '.webm', + '.webma', '.mka', '.awb', '.caf', '.mpg', '.mpeg' +} + +def clean_author_name(name): + """Strips common suffixes from author names for cleaner folder names.""" + suffixes = [" Collection", " Anthology", " Series", " Books", " Works", " Complete", " (All Chaptered)"] + clean = name.strip() + for suffix in suffixes: + if clean.lower().endswith(suffix.lower()): + clean = clean[:-len(suffix)].strip() + return clean + +def clean_title(title): + """ + Cleans a title/filename by removing common metadata suffixes. + e.g. "1997 - A Spy in Europa (Hauenstein) 32k 00.51.22 {11.8mb}" -> "1997 - A Spy in Europa" + """ + # Remove file extension if present + base = os.path.splitext(title)[0] + + # Remove metadata in curly braces {11.8mb} + base = re.sub(r'\{.*?\}', '', base) + # Remove metadata in parentheses (Hauenstein) + base = re.sub(r'\(.*?\)', '', base) + # Remove bitrates like 32k, 128kbps + base = re.sub(r'\b\d{1,3}\s?k(bps)?\b', '', base, flags=re.IGNORECASE) + # Remove durations like 00.51.22 or 12.34 + base = re.sub(r'\b\d{1,2}[\.:]\d{2}([\.:]\d{2})?\b', '', base) + + # Clean up extra spaces and dashes + base = base.replace('_', ' ').strip() + base = re.sub(r'\s+-\s+', ' - ', base) + base = re.sub(r'\s+', ' ', base) + return base.strip(' -') + +def get_target_name(rel_path, author_override=None, item_name=None): + """ + Generates a flat target name from path parts and optionally an item name. + """ + # 1. Determine Author + path_parts = [p.strip() for p in rel_path.replace('\\', '/').split('/') if p.strip()] + author = author_override or clean_author_name(path_parts[0]) + + # 2. Collect segments + # Start with author + segments = [author] + + # Add intermediate path segments (excluding author and if they aren't redundant) + for part in path_parts[1:]: + clean_part = part + if clean_part.lower().startswith(author.lower()): + clean_part = clean_part[len(author):].strip(' -_') + if clean_part: + segments.append(clean_part) + + # Add the specific item name if we are splitting + if item_name: + clean_item = clean_title(item_name) + if clean_item.lower().startswith(author.lower()): + clean_item = clean_item[len(author):].strip(' -_') + if clean_item: + segments.append(clean_item) + + # 3. Final cleanup and deduplication + final_segments = [] + for s in segments: + s_clean = s.strip(' -_') + if not s_clean: continue + + # Don't add if it's redundant with the previous segment + if final_segments: + prev = final_segments[-1].lower() + curr = s_clean.lower() + if curr == prev or curr.startswith(prev) or prev.endswith(curr): + # If current is longer and starts with prev, replace prev + if len(curr) > len(prev) and curr.startswith(prev): + final_segments[-1] = s_clean + continue + + final_segments.append(s_clean) + + return " - ".join(final_segments) + +def is_multi_work_dir(path): + """ + Heuristic to detect if a folder contains multiple independent books. + """ + try: + items = os.listdir(path) + except PermissionError: + return False + + audio_files = [i for i in items if os.path.isfile(os.path.join(path, i)) and os.path.splitext(i)[1].lower() in AUDIO_EXTENSIONS] + if len(audio_files) < 2: + return False + + # Check for sequential track numbering (01, 02, Part 1...) + # If most files start with a small number, it's likely a single book. + numbered_count = 0 + for f in audio_files: + if re.match(r'^(\d{1,3}|Part\s?\d+|Disc\s?\d+)\b', f, re.I): + numbered_count += 1 + + # If less than 50% are numbered, or they have very different names, treat as multi-work + if numbered_count / len(audio_files) < 0.5: + return True + + return False + +def is_book_dir(path): + """ + Checks if a directory is a single book folder. + """ + try: + items = os.listdir(path) + except PermissionError: + return False + + audio_files = [i for i in items if os.path.isfile(os.path.join(path, i)) and os.path.splitext(i)[1].lower() in AUDIO_EXTENSIONS] + subdirs = [i for i in items if os.path.isdir(os.path.join(path, i))] + + if audio_files: + # A folder with audio is a book if it's NOT a multi-work container + if is_multi_work_dir(path): + return False + + # Also check if it has deeper books + non_cd_subdirs = [s for s in subdirs if not s.lower().startswith(('cd', 'disc', 'disk'))] + for s in non_cd_subdirs: + # Recursive check: if a subdirectory has audio, this folder might just be a container + if any(os.path.splitext(f)[1].lower() in AUDIO_EXTENSIONS for dp, dn, filenames in os.walk(os.path.join(path, s)) for f in filenames): + return False + return True + + if subdirs: + # Check if it's a multi-disc book (only CD subfolders containing audio) + audio_subdirs = [] + for s in subdirs: + s_path = os.path.join(path, s) + if any(os.path.splitext(f)[1].lower() in AUDIO_EXTENSIONS for dp, dn, filenames in os.walk(s_path) for f in filenames): + audio_subdirs.append(s) + + if audio_subdirs and all(s.lower().startswith(('cd', 'disc', 'disk')) for s in audio_subdirs): + return True + + return False + +def migrate(root, dry_run=False, split=False): + root = os.path.abspath(root) + if not os.path.exists(root): + logging.error(f"Root path does not exist: {root}") + return + + logging.info(f"Scanning library at: {root}") + + book_dirs = [] + multi_work_dirs = [] + + for dirpath, dirnames, filenames in os.walk(root): + if dirpath == root: + continue + + rel_path = os.path.relpath(dirpath, root) + + if split and is_multi_work_dir(dirpath): + logging.debug(f"Found multi-work directory: {rel_path}") + multi_work_dirs.append(dirpath) + dirnames[:] = [] # Stop recursion + elif is_book_dir(dirpath): + logging.debug(f"Found book directory: {rel_path}") + book_dirs.append(dirpath) + dirnames[:] = [] # Stop recursion + + moves = [] # List of (src, dst, is_file) + + # 1. Plan Moves for Multi-Work files + for mw_path in multi_work_dirs: + rel_path = os.path.relpath(mw_path, root) + files = [f for f in os.listdir(mw_path) if os.path.isfile(os.path.join(mw_path, f)) and os.path.splitext(f)[1].lower() in AUDIO_EXTENSIONS] + + for f in files: + src = os.path.join(mw_path, f) + # Create a target folder name for this specific file + target_name = get_target_name(rel_path, item_name=f) + target_dir = os.path.join(root, target_name) + dst = os.path.join(target_dir, f) + moves.append((src, dst, True)) + + # 2. Plan Moves for Book Folders + for book_path in book_dirs: + rel_path = os.path.relpath(book_path, root) + target_name = get_target_name(rel_path) + target_path = os.path.join(root, target_name) + + if os.path.abspath(book_path) == os.path.abspath(target_path): + continue + + moves.append((book_path, target_path, False)) + + logging.info(f"Planned operations: {len(moves)}") + + # Execute moves + successful_moves = 0 + for src, dst, is_file_move in moves: + src_rel = os.path.relpath(src, root) + dst_rel = os.path.relpath(dst, root) + + if os.path.exists(dst): + if os.path.abspath(src) == os.path.abspath(dst): + continue + logging.warning(f"Conflict: Target already exists: {dst_rel}. Skipping {src_rel}") + continue + + logging.info(f"PLAN: '{src_rel}' -> '{dst_rel}'") + if not dry_run: + try: + if is_file_move: + os.makedirs(os.path.dirname(dst), exist_ok=True) + shutil.move(src, dst) + successful_moves += 1 + except Exception as e: + logging.error(f"Failed to move '{src_rel}': {e}") + + if not dry_run: + logging.info("Cleaning up empty directories...") + for dirpath, dirnames, filenames in os.walk(root, topdown=False): + if dirpath == root: continue + try: + if not os.listdir(dirpath): + os.rmdir(dirpath) + except: pass + else: + logging.info(f"Dry run complete. {len(moves)} operations would be performed.") + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Reorganize library to a flat structure.") + parser.add_argument("path", help="Root directory of the library") + parser.add_argument("--dry-run", action="store_true") + parser.add_argument("--verbose", action="store_true") + parser.add_argument("--split", action="store_true", help="Split folders with multiple independent books") + parser.add_argument("--log-file", help="Path to a file to write logs to") + + args = parser.parse_args() + + log_level = logging.DEBUG if args.verbose else logging.INFO + handlers = [logging.StreamHandler(sys.stdout)] + if args.log_file: handlers.append(logging.FileHandler(args.log_file)) + logging.basicConfig(level=log_level, format='%(levelname)s: %(message)s', handlers=handlers) + + migrate(args.path, args.dry_run, args.split)