Replace cover search with streaming version

This commit is contained in:
mikiher 2025-10-02 13:30:03 +03:00
parent a164c17d38
commit 7630dbdcb7
3 changed files with 487 additions and 14 deletions

View file

@ -2,6 +2,7 @@ const SocketIO = require('socket.io')
const Logger = require('./Logger')
const Database = require('./Database')
const TokenManager = require('./auth/TokenManager')
const CoverSearchManager = require('./managers/CoverSearchManager')
/**
* @typedef SocketClient
@ -180,6 +181,10 @@ class SocketAuthority {
// Scanning
socket.on('cancel_scan', (libraryId) => this.cancelScan(libraryId))
// Cover search streaming
socket.on('search_covers', (payload) => this.handleCoverSearch(socket, payload))
socket.on('cancel_cover_search', (requestId) => this.handleCancelCoverSearch(socket, requestId))
// Logs
socket.on('set_log_listener', (level) => Logger.addSocketListener(socket, level))
socket.on('remove_log_listener', () => Logger.removeSocketListener(socket.id))
@ -200,6 +205,10 @@ class SocketAuthority {
const disconnectTime = Date.now() - _client.connected_at
Logger.info(`[SocketAuthority] Socket ${socket.id} disconnected from client "${_client.user.username}" after ${disconnectTime}ms (Reason: ${reason})`)
// Cancel any active cover searches for this socket
this.cancelSocketCoverSearches(socket.id)
delete this.clients[socket.id]
}
})
@ -300,5 +309,100 @@ class SocketAuthority {
Logger.debug('[SocketAuthority] Cancel scan', id)
this.Server.cancelLibraryScan(id)
}
/**
* Handle cover search request via WebSocket
* @param {SocketIO.Socket} socket
* @param {Object} payload
*/
async handleCoverSearch(socket, payload) {
const client = this.clients[socket.id]
if (!client?.user) {
Logger.error('[SocketAuthority] Unauthorized cover search request')
socket.emit('cover_search_error', {
requestId: payload.requestId,
error: 'Unauthorized'
})
return
}
const { requestId, title, author, provider, podcast } = payload
if (!requestId || !title) {
Logger.error('[SocketAuthority] Invalid cover search request')
socket.emit('cover_search_error', {
requestId,
error: 'Invalid request parameters'
})
return
}
Logger.info(`[SocketAuthority] User ${client.user.username} initiated cover search ${requestId}`)
// Callback for streaming results to client
const onResult = (result) => {
socket.emit('cover_search_result', {
requestId,
provider: result.provider,
covers: result.covers,
total: result.total
})
}
// Callback when search completes
const onComplete = () => {
Logger.info(`[SocketAuthority] Cover search ${requestId} completed`)
socket.emit('cover_search_complete', { requestId })
}
// Callback for provider errors
const onError = (provider, errorMessage) => {
socket.emit('cover_search_provider_error', {
requestId,
provider,
error: errorMessage
})
}
// Start the search
CoverSearchManager.startSearch(requestId, { title, author, provider, podcast }, onResult, onComplete, onError).catch((error) => {
Logger.error(`[SocketAuthority] Cover search ${requestId} failed:`, error)
socket.emit('cover_search_error', {
requestId,
error: error.message
})
})
}
/**
* Handle cancel cover search request
* @param {SocketIO.Socket} socket
* @param {string} requestId
*/
handleCancelCoverSearch(socket, requestId) {
const client = this.clients[socket.id]
if (!client?.user) {
Logger.error('[SocketAuthority] Unauthorized cancel cover search request')
return
}
Logger.info(`[SocketAuthority] User ${client.user.username} cancelled cover search ${requestId}`)
const cancelled = CoverSearchManager.cancelSearch(requestId)
if (cancelled) {
socket.emit('cover_search_cancelled', { requestId })
}
}
/**
* Cancel all cover searches associated with a socket (called on disconnect)
* @param {string} socketId
*/
cancelSocketCoverSearches(socketId) {
// Get all active search request IDs and cancel those that might belong to this socket
// Since we don't track socket-to-request mapping, we log this for debugging
// The client will handle reconnection gracefully
Logger.debug(`[SocketAuthority] Socket ${socketId} disconnected, any active searches will timeout`)
}
}
module.exports = new SocketAuthority()

View file

@ -0,0 +1,248 @@
const { setMaxListeners } = require('events')
const Logger = require('../Logger')
const BookFinder = require('../finders/BookFinder')
const PodcastFinder = require('../finders/PodcastFinder')
/**
* Manager for handling streaming cover search across multiple providers
*/
class CoverSearchManager {
constructor() {
/** @type {Map<string, AbortController>} Map of requestId to AbortController */
this.activeSearches = new Map()
// Default timeout for each provider search
this.providerTimeout = 10000 // 10 seconds
// Set to 0 to disable the max listeners limit
// We need one listener per provider (15+) and may have multiple concurrent searches
this.maxListeners = 0
}
/**
* Start a streaming cover search
* @param {string} requestId - Unique identifier for this search request
* @param {Object} searchParams - Search parameters
* @param {string} searchParams.title - Title to search for
* @param {string} searchParams.author - Author to search for (optional)
* @param {string} searchParams.provider - Provider to search (or 'all')
* @param {boolean} searchParams.podcast - Whether this is a podcast search
* @param {Function} onResult - Callback for each result chunk
* @param {Function} onComplete - Callback when search completes
* @param {Function} onError - Callback for errors
*/
async startSearch(requestId, searchParams, onResult, onComplete, onError) {
if (this.activeSearches.has(requestId)) {
Logger.warn(`[CoverSearchManager] Search with requestId ${requestId} already exists`)
return
}
const abortController = new AbortController()
// Increase max listeners on this signal to accommodate parallel provider searches
// AbortSignal is an EventTarget, so we use the events module's setMaxListeners
setMaxListeners(this.maxListeners, abortController.signal)
this.activeSearches.set(requestId, abortController)
Logger.info(`[CoverSearchManager] Starting search ${requestId} with params:`, searchParams)
try {
const { title, author, provider, podcast } = searchParams
if (podcast) {
await this.searchPodcastCovers(requestId, title, abortController.signal, onResult, onError)
} else {
await this.searchBookCovers(requestId, provider, title, author, abortController.signal, onResult, onError)
}
if (!abortController.signal.aborted) {
onComplete()
}
} catch (error) {
if (error.name === 'AbortError') {
Logger.info(`[CoverSearchManager] Search ${requestId} was cancelled`)
} else {
Logger.error(`[CoverSearchManager] Search ${requestId} failed:`, error)
onError(error.message)
}
} finally {
this.activeSearches.delete(requestId)
}
}
/**
* Cancel an active search
* @param {string} requestId - Request ID to cancel
*/
cancelSearch(requestId) {
const abortController = this.activeSearches.get(requestId)
if (abortController) {
Logger.info(`[CoverSearchManager] Cancelling search ${requestId}`)
abortController.abort()
this.activeSearches.delete(requestId)
return true
}
return false
}
/**
* Search for podcast covers
*/
async searchPodcastCovers(requestId, title, signal, onResult, onError) {
try {
const results = await this.executeWithTimeout(() => PodcastFinder.findCovers(title), this.providerTimeout, signal)
if (signal.aborted) return
const covers = this.extractCoversFromResults(results)
if (covers.length > 0) {
onResult({
provider: 'itunes',
covers,
total: covers.length
})
}
} catch (error) {
if (error.name !== 'AbortError') {
Logger.error(`[CoverSearchManager] Podcast search failed:`, error)
onError('itunes', error.message)
}
}
}
/**
* Search for book covers across providers
*/
async searchBookCovers(requestId, provider, title, author, signal, onResult, onError) {
let providers = []
if (provider === 'all') {
providers = [...BookFinder.providers]
} else {
providers = [provider]
}
Logger.debug(`[CoverSearchManager] Searching ${providers.length} providers in parallel`)
// Search all providers in parallel
const searchPromises = providers.map(async (providerName) => {
if (signal.aborted) return
try {
const searchResults = await this.executeWithTimeout(() => BookFinder.search(null, providerName, title, author || ''), this.providerTimeout, signal)
if (signal.aborted) return
const covers = this.extractCoversFromResults(searchResults)
Logger.debug(`[CoverSearchManager] Found ${covers.length} covers from ${providerName}`)
if (covers.length > 0) {
onResult({
provider: providerName,
covers,
total: covers.length
})
}
} catch (error) {
if (error.name !== 'AbortError') {
Logger.warn(`[CoverSearchManager] Provider ${providerName} failed:`, error.message)
onError(providerName, error.message)
}
}
})
await Promise.allSettled(searchPromises)
}
/**
* Execute a promise with timeout and abort signal
*/
async executeWithTimeout(fn, timeout, signal) {
return new Promise(async (resolve, reject) => {
let abortHandler = null
let timeoutId = null
// Cleanup function to ensure we always remove listeners
const cleanup = () => {
if (timeoutId) {
clearTimeout(timeoutId)
timeoutId = null
}
if (abortHandler) {
signal.removeEventListener('abort', abortHandler)
abortHandler = null
}
}
// Set up timeout
timeoutId = setTimeout(() => {
cleanup()
const error = new Error('Provider timeout')
error.name = 'TimeoutError'
reject(error)
}, timeout)
// Check if already aborted
if (signal.aborted) {
cleanup()
const error = new Error('Search cancelled')
error.name = 'AbortError'
reject(error)
return
}
// Set up abort handler
abortHandler = () => {
cleanup()
const error = new Error('Search cancelled')
error.name = 'AbortError'
reject(error)
}
signal.addEventListener('abort', abortHandler)
try {
const result = await fn()
cleanup()
resolve(result)
} catch (error) {
cleanup()
reject(error)
}
})
}
/**
* Extract cover URLs from search results
*/
extractCoversFromResults(results) {
const covers = []
if (!Array.isArray(results)) return covers
results.forEach((result) => {
if (result.covers && Array.isArray(result.covers)) {
covers.push(...result.covers)
}
if (result.cover) {
covers.push(result.cover)
}
})
// Remove duplicates
return [...new Set(covers)]
}
/**
* Cancel all active searches (cleanup on server shutdown)
*/
cancelAllSearches() {
Logger.info(`[CoverSearchManager] Cancelling ${this.activeSearches.size} active searches`)
for (const [requestId, abortController] of this.activeSearches.entries()) {
abortController.abort()
}
this.activeSearches.clear()
}
}
module.exports = new CoverSearchManager()