Update:Scanner adjustable number of parallel audio probes to use less CPU

This commit is contained in:
advplyr 2022-07-16 18:54:34 -05:00
parent 277a5fa37c
commit 86ee4dcff2
10 changed files with 372 additions and 71 deletions

View file

@ -3,6 +3,8 @@ const Path = require('path')
const AudioFile = require('../objects/files/AudioFile')
const VideoFile = require('../objects/files/VideoFile')
const MediaProbePool = require('./MediaProbePool')
const prober = require('../utils/prober')
const Logger = require('../Logger')
const { LogLevel } = require('../utils/constants')
@ -100,19 +102,38 @@ class MediaFileScanner {
}
// Returns array of { MediaFile, elapsed, averageScanDuration } from audio file scan objects
async executeMediaFileScans(mediaType, mediaLibraryFiles, scanData) {
var mediaMetadataFromScan = scanData.media.metadata || null
var proms = []
for (let i = 0; i < mediaLibraryFiles.length; i++) {
proms.push(this.scan(mediaType, mediaLibraryFiles[i], mediaMetadataFromScan))
}
var scanStart = Date.now()
var results = await Promise.all(proms).then((scanResults) => scanResults.filter(sr => sr))
return {
audioFiles: results.filter(r => r.audioFile).map(r => r.audioFile),
videoFiles: results.filter(r => r.videoFile).map(r => r.videoFile),
elapsed: Date.now() - scanStart,
averageScanDuration: this.getAverageScanDurationMs(results)
async executeMediaFileScans(libraryItem, mediaLibraryFiles, scanData) {
const mediaType = libraryItem.mediaType
if (!global.ServerSettings.scannerUseSingleThreadedProber) { // New multi-threaded scanner
var scanStart = Date.now()
const probeResults = await new Promise((resolve) => {
// const probePool = new MediaProbePool(mediaType, mediaLibraryFiles, scanData, global.ServerSettings.scannerMaxThreads)
const itemBatch = MediaProbePool.initBatch(libraryItem, mediaLibraryFiles, scanData)
itemBatch.on('done', resolve)
MediaProbePool.runBatch(itemBatch)
})
return {
audioFiles: probeResults.audioFiles || [],
videoFiles: probeResults.videoFiles || [],
elapsed: Date.now() - scanStart,
averageScanDuration: probeResults.averageTimePerMb
}
} else { // Old single threaded scanner
var scanStart = Date.now()
var mediaMetadataFromScan = scanData.media.metadata || null
var proms = []
for (let i = 0; i < mediaLibraryFiles.length; i++) {
proms.push(this.scan(mediaType, mediaLibraryFiles[i], mediaMetadataFromScan))
}
var results = await Promise.all(proms).then((scanResults) => scanResults.filter(sr => sr))
return {
audioFiles: results.filter(r => r.audioFile).map(r => r.audioFile),
videoFiles: results.filter(r => r.videoFile).map(r => r.videoFile),
elapsed: Date.now() - scanStart,
averageScanDuration: this.getAverageScanDurationMs(results)
}
}
}
@ -149,7 +170,6 @@ class MediaFileScanner {
if (af.discNumFromMeta !== null) discsFromMeta.push(af.discNumFromMeta)
if (af.trackNumFromFilename !== null) tracksFromFilename.push(af.trackNumFromFilename)
if (af.trackNumFromMeta !== null) tracksFromMeta.push(af.trackNumFromMeta)
af.validateTrackIndex() // Sets error if no valid track number
})
discsFromFilename.sort((a, b) => a - b)
discsFromMeta.sort((a, b) => a - b)
@ -198,7 +218,8 @@ class MediaFileScanner {
async scanMediaFiles(mediaLibraryFiles, scanData, libraryItem, preferAudioMetadata, preferOverdriveMediaMarker, libraryScan = null) {
var hasUpdated = false
var mediaScanResult = await this.executeMediaFileScans(libraryItem.mediaType, mediaLibraryFiles, scanData)
var mediaScanResult = await this.executeMediaFileScans(libraryItem, mediaLibraryFiles, scanData)
if (libraryItem.mediaType === 'video') {
if (mediaScanResult.videoFiles.length) {
// TODO: Check for updates etc
@ -207,9 +228,9 @@ class MediaFileScanner {
}
} else if (mediaScanResult.audioFiles.length) {
if (libraryScan) {
libraryScan.addLog(LogLevel.DEBUG, `Library Item "${scanData.path}" Audio file scan took ${mediaScanResult.elapsed}ms for ${mediaScanResult.audioFiles.length} with average time of ${mediaScanResult.averageScanDuration}ms`)
Logger.debug(`Library Item "${scanData.path}" Audio file scan took ${mediaScanResult.elapsed}ms for ${mediaScanResult.audioFiles.length} with average time of ${mediaScanResult.averageScanDuration}ms`)
libraryScan.addLog(LogLevel.DEBUG, `Library Item "${scanData.path}" Media file scan took ${mediaScanResult.elapsed}ms for ${mediaScanResult.audioFiles.length} with average time of ${mediaScanResult.averageScanDuration}ms per MB`)
}
Logger.debug(`Library Item "${scanData.path}" Media file scan took ${mediaScanResult.elapsed}ms with ${mediaScanResult.audioFiles.length} audio files averaging ${mediaScanResult.averageScanDuration}ms per MB`)
var newAudioFiles = mediaScanResult.audioFiles.filter(af => {
return !libraryItem.media.findFileWithInode(af.ino)

View file

@ -1,7 +1,7 @@
const AudioFileMetadata = require('../objects/metadata/AudioMetaTags')
class MediaProbeData {
constructor() {
constructor(probeData) {
this.embeddedCoverArt = null
this.format = null
this.duration = null
@ -26,6 +26,20 @@ class MediaProbeData {
this.discNumber = null
this.discTotal = null
if (probeData) {
this.construct(probeData)
}
}
construct(probeData) {
for (const key in probeData) {
if (key === 'audioFileMetadata' && probeData[key]) {
this[key] = new AudioFileMetadata(probeData[key])
} else if (this[key] !== undefined) {
this[key] = probeData[key]
}
}
}
getEmbeddedCoverArt(videoStream) {

View file

@ -0,0 +1,209 @@
const os = require('os')
const Path = require('path')
const { EventEmitter } = require('events')
const { Worker } = require("worker_threads")
const Logger = require('../Logger')
const AudioFile = require('../objects/files/AudioFile')
const VideoFile = require('../objects/files/VideoFile')
const MediaProbeData = require('./MediaProbeData')
class LibraryItemBatch extends EventEmitter {
constructor(libraryItem, libraryFiles, scanData) {
super()
this.id = libraryItem.id
this.mediaType = libraryItem.mediaType
this.mediaMetadataFromScan = scanData.media.metadata || null
this.libraryFilesToScan = libraryFiles
// Results
this.totalElapsed = 0
this.totalProbed = 0
this.audioFiles = []
this.videoFiles = []
}
done() {
this.emit('done', {
videoFiles: this.videoFiles,
audioFiles: this.audioFiles,
averageTimePerMb: Math.round(this.totalElapsed / this.totalProbed)
})
}
}
class MediaProbePool {
constructor() {
this.MaxThreads = 0
this.probeWorkerScript = null
this.itemBatchMap = {}
this.probesRunning = []
this.probeQueue = []
}
tick() {
if (this.probesRunning.length < this.MaxThreads) {
if (this.probeQueue.length > 0) {
const pw = this.probeQueue.shift()
// console.log('Unqueued probe - Remaining is', this.probeQueue.length, 'Currently running is', this.probesRunning.length)
this.startTask(pw)
} else if (!this.probesRunning.length) {
// console.log('No more probes to run')
}
}
}
async startTask(task) {
this.probesRunning.push(task)
const itemBatch = this.itemBatchMap[task.batchId]
await task.start().then((taskResult) => {
itemBatch.libraryFilesToScan = itemBatch.libraryFilesToScan.filter(lf => lf.ino !== taskResult.libraryFile.ino)
var fileSizeMb = taskResult.libraryFile.metadata.size / (1024 * 1024)
var elapsedPerMb = Math.round(taskResult.elapsed / fileSizeMb)
const probeData = new MediaProbeData(taskResult.data)
if (itemBatch.mediaType === 'video') {
if (!probeData.videoStream) {
Logger.error('[MediaProbePool] Invalid video file no video stream')
} else {
itemBatch.totalElapsed += elapsedPerMb
itemBatch.totalProbed++
var videoFile = new VideoFile()
videoFile.setDataFromProbe(libraryFile, probeData)
itemBatch.videoFiles.push(videoFile)
}
} else {
if (!probeData.audioStream) {
Logger.error('[MediaProbePool] Invalid audio file no audio stream')
} else {
itemBatch.totalElapsed += elapsedPerMb
itemBatch.totalProbed++
var audioFile = new AudioFile()
audioFile.trackNumFromMeta = probeData.trackNumber
audioFile.discNumFromMeta = probeData.discNumber
if (itemBatch.mediaType === 'book') {
const { trackNumber, discNumber } = this.getTrackAndDiscNumberFromFilename(itemBatch.mediaMetadataFromScan, taskResult.libraryFile)
audioFile.trackNumFromFilename = trackNumber
audioFile.discNumFromFilename = discNumber
}
audioFile.setDataFromProbe(taskResult.libraryFile, probeData)
itemBatch.audioFiles.push(audioFile)
}
}
this.probesRunning = this.probesRunning.filter(tq => tq.mediaPath !== task.mediaPath)
this.tick()
}).catch((error) => {
itemBatch.libraryFilesToScan = itemBatch.libraryFilesToScan.filter(lf => lf.ino !== taskResult.libraryFile.ino)
Logger.error('[MediaProbePool] Task failed', error)
this.probesRunning = this.probesRunning.filter(tq => tq.mediaPath !== task.mediaPath)
this.tick()
})
if (!itemBatch.libraryFilesToScan.length) {
itemBatch.done()
delete this.itemBatchMap[itemBatch.id]
}
}
buildTask(libraryFile, batchId) {
return {
batchId,
mediaPath: libraryFile.metadata.path,
start: () => {
return new Promise((resolve, reject) => {
const startTime = Date.now()
const worker = new Worker(this.probeWorkerScript)
worker.on("message", ({ data }) => {
if (data.error) {
reject(data.error)
} else {
resolve({
data,
elapsed: Date.now() - startTime,
libraryFile
})
}
})
worker.postMessage({
mediaPath: libraryFile.metadata.path
})
})
}
}
}
initBatch(libraryItem, libraryFiles, scanData) {
this.MaxThreads = global.ServerSettings.scannerMaxThreads || (os.cpus().length * 2)
this.probeWorkerScript = Path.join(global.appRoot, 'server/utils/probeWorker.js')
Logger.debug(`[MediaProbePool] Run item batch ${libraryItem.id} with`, libraryFiles.length, 'files and max concurrent of', this.MaxThreads)
const itemBatch = new LibraryItemBatch(libraryItem, libraryFiles, scanData)
this.itemBatchMap[itemBatch.id] = itemBatch
return itemBatch
}
runBatch(itemBatch) {
for (const libraryFile of itemBatch.libraryFilesToScan) {
const probeTask = this.buildTask(libraryFile, itemBatch.id)
if (this.probesRunning.length < this.MaxThreads) {
this.startTask(probeTask)
} else {
this.probeQueue.push(probeTask)
}
}
}
getTrackAndDiscNumberFromFilename(mediaMetadataFromScan, audioLibraryFile) {
const { title, author, series, publishedYear } = mediaMetadataFromScan
const { filename, path } = audioLibraryFile.metadata
var partbasename = Path.basename(filename, Path.extname(filename))
// Remove title, author, series, and publishedYear from filename if there
if (title) partbasename = partbasename.replace(title, '')
if (author) partbasename = partbasename.replace(author, '')
if (series) partbasename = partbasename.replace(series, '')
if (publishedYear) partbasename = partbasename.replace(publishedYear)
// Look for disc number
var discNumber = null
var discMatch = partbasename.match(/\b(disc|cd) ?(\d\d?)\b/i)
if (discMatch && discMatch.length > 2 && discMatch[2]) {
if (!isNaN(discMatch[2])) {
discNumber = Number(discMatch[2])
}
// Remove disc number from filename
partbasename = partbasename.replace(/\b(disc|cd) ?(\d\d?)\b/i, '')
}
// Look for disc number in folder path e.g. /Book Title/CD01/audiofile.mp3
var pathdir = Path.dirname(path).split('/').pop()
if (pathdir && /^cd\d{1,3}$/i.test(pathdir)) {
var discFromFolder = Number(pathdir.replace(/cd/i, ''))
if (!isNaN(discFromFolder) && discFromFolder !== null) discNumber = discFromFolder
}
var numbersinpath = partbasename.match(/\d{1,4}/g)
var trackNumber = numbersinpath && numbersinpath.length ? parseInt(numbersinpath[0]) : null
return {
trackNumber,
discNumber
}
}
}
module.exports = new MediaProbePool()

View file

@ -205,20 +205,27 @@ class Scanner {
checkRes.libraryItem = libraryItem
checkRes.scanData = dataFound
// If this item will go over max size then push current chunk
if (libraryItem.audioFileTotalSize + itemDataToRescanSize > MaxSizePerChunk && itemDataToRescan.length > 0) {
itemDataToRescanChunks.push(itemDataToRescan)
itemDataToRescanSize = 0
itemDataToRescan = []
console.log('Has New Library Files', libraryItem.media.metadata.title, 'num new', checkRes.newLibraryFiles.length)
if (global.ServerSettings.scannerUseSingleThreadedProber) {
// If this item will go over max size then push current chunk
if (libraryItem.audioFileTotalSize + itemDataToRescanSize > MaxSizePerChunk && itemDataToRescan.length > 0) {
itemDataToRescanChunks.push(itemDataToRescan)
itemDataToRescanSize = 0
itemDataToRescan = []
}
itemDataToRescan.push(checkRes)
itemDataToRescanSize += libraryItem.audioFileTotalSize
if (itemDataToRescanSize >= MaxSizePerChunk) {
itemDataToRescanChunks.push(itemDataToRescan)
itemDataToRescanSize = 0
itemDataToRescan = []
}
} else {
itemDataToRescan.push(checkRes)
}
itemDataToRescan.push(checkRes)
itemDataToRescanSize += libraryItem.audioFileTotalSize
if (itemDataToRescanSize >= MaxSizePerChunk) {
itemDataToRescanChunks.push(itemDataToRescan)
itemDataToRescanSize = 0
itemDataToRescan = []
}
} else if (libraryScan.findCovers && libraryItem.media.shouldSearchForCover) { // Search cover
libraryScan.resultsUpdated++
itemsToFindCovers.push(libraryItem)
@ -235,27 +242,31 @@ class Scanner {
// Potential NEW Library Items
for (let i = 0; i < libraryItemDataFound.length; i++) {
var dataFound = libraryItemDataFound[i]
console.log('Potential new library item data')
var hasMediaFile = dataFound.libraryFiles.some(lf => lf.isMediaFile)
if (!hasMediaFile) {
libraryScan.addLog(LogLevel.WARN, `Item found "${libraryItemDataFound.path}" has no media files`)
} else {
var mediaFileSize = 0
dataFound.libraryFiles.filter(lf => lf.fileType === 'audio' || lf.fileType === 'video').forEach(lf => mediaFileSize += lf.metadata.size)
if (global.ServerSettings.scannerUseSingleThreadedProber) {
// If this item will go over max size then push current chunk
var mediaFileSize = 0
dataFound.libraryFiles.filter(lf => lf.fileType === 'audio' || lf.fileType === 'video').forEach(lf => mediaFileSize += lf.metadata.size)
if (mediaFileSize + newItemDataToScanSize > MaxSizePerChunk && newItemDataToScan.length > 0) {
newItemDataToScanChunks.push(newItemDataToScan)
newItemDataToScanSize = 0
newItemDataToScan = []
}
// If this item will go over max size then push current chunk
if (mediaFileSize + newItemDataToScanSize > MaxSizePerChunk && newItemDataToScan.length > 0) {
newItemDataToScanChunks.push(newItemDataToScan)
newItemDataToScanSize = 0
newItemDataToScan = []
}
newItemDataToScan.push(dataFound)
newItemDataToScanSize += mediaFileSize
newItemDataToScan.push(dataFound)
newItemDataToScanSize += mediaFileSize
if (newItemDataToScanSize >= MaxSizePerChunk) {
newItemDataToScanChunks.push(newItemDataToScan)
newItemDataToScanSize = 0
newItemDataToScan = []
if (newItemDataToScanSize >= MaxSizePerChunk) {
newItemDataToScanChunks.push(newItemDataToScan)
newItemDataToScanSize = 0
newItemDataToScan = []
}
} else { // Chunking is not necessary for new scanner
newItemDataToScan.push(dataFound)
}
}
}
@ -272,14 +283,14 @@ class Scanner {
await this.updateLibraryItemChunk(itemsToUpdate)
if (this.cancelLibraryScan[libraryScan.libraryId]) return true
}
// Chunking will be removed when legacy single threaded scanner is removed
for (let i = 0; i < itemDataToRescanChunks.length; i++) {
await this.rescanLibraryItemDataChunk(itemDataToRescanChunks[i], libraryScan)
if (this.cancelLibraryScan[libraryScan.libraryId]) return true
// console.log('Rescan chunk done', i, 'of', itemDataToRescanChunks.length)
}
for (let i = 0; i < newItemDataToScanChunks.length; i++) {
await this.scanNewLibraryItemDataChunk(newItemDataToScanChunks[i], libraryScan)
// console.log('New scan chunk done', i, 'of', newItemDataToScanChunks.length)
if (this.cancelLibraryScan[libraryScan.libraryId]) return true
}
}