Update:Remove proper-lockfile dependency

This commit is contained in:
advplyr 2022-06-07 20:15:00 -05:00
parent b7e546f2f5
commit e06a015d6e
17 changed files with 1038 additions and 49 deletions

489
server/libs/njodb/index.js Normal file
View file

@ -0,0 +1,489 @@
"use strict";
const {
existsSync,
mkdirSync,
readFileSync,
writeFileSync
} = require("graceful-fs");
const {
join,
resolve
} = require("path");
const {
aggregateStoreData,
aggregateStoreDataSync,
distributeStoreData,
distributeStoreDataSync,
deleteStoreData,
deleteStoreDataSync,
dropEverything,
dropEverythingSync,
getStoreNames,
getStoreNamesSync,
insertStoreData,
insertStoreDataSync,
insertFileData,
selectStoreData,
selectStoreDataSync,
statsStoreData,
statsStoreDataSync,
updateStoreData,
updateStoreDataSync
} = require("./njodb");
const {
Randomizer,
Reducer,
Result
} = require("./objects");
const {
validateArray,
validateFunction,
validateName,
validateObject,
validatePath,
validateSize
} = require("./validators");
const defaults = {
"datadir": "data",
"dataname": "data",
"datastores": 5,
"tempdir": "tmp",
"lockoptions": {
"stale": 5000,
"update": 1000,
"retries": {
"retries": 5000,
"minTimeout": 250,
"maxTimeout": 5000,
"factor": 0.15,
"randomize": false
}
}
};
const mergeProperties = (defaults, userProperties) => {
var target = Object.assign({}, defaults);
for (let key of Object.keys(userProperties)) {
if (Object.prototype.hasOwnProperty.call(target, key)) {
if (typeof userProperties[key] !== 'object' && !Array.isArray(userProperties[key])) {
Object.assign(target, { [key]: userProperties[key] });
} else {
target[key] = mergeProperties(target[key], userProperties[key]);
}
}
}
return target;
}
const saveProperties = (root, properties) => {
properties = {
"datadir": properties.datadir,
"dataname": properties.dataname,
"datastores": properties.datastores,
"tempdir": properties.tempdir,
"lockoptions": properties.lockoptions
};
const propertiesFile = join(root, "njodb.properties");
writeFileSync(propertiesFile, JSON.stringify(properties, null, 4));
return properties;
}
process.on("uncaughtException", error => {
if (error.code === "ECOMPROMISED") {
console.error(Object.assign(new Error("Stale lock or attempt to update it after release"), { code: error.code }));
} else {
throw error;
}
});
class Database {
constructor(root, properties = {}) {
validateObject(properties);
this.properties = {};
if (root !== undefined && root !== null) {
validateName(root);
this.properties.root = root;
} else {
this.properties.root = process.cwd();
}
if (!existsSync(this.properties.root)) mkdirSync(this.properties.root);
const propertiesFile = join(this.properties.root, "njodb.properties");
if (existsSync(propertiesFile)) {
this.setProperties(JSON.parse(readFileSync(propertiesFile)));
} else {
this.setProperties(mergeProperties(defaults, properties));
}
if (!existsSync(this.properties.datapath)) mkdirSync(this.properties.datapath);
if (!existsSync(this.properties.temppath)) mkdirSync(this.properties.temppath);
this.properties.storenames = getStoreNamesSync(this.properties.datapath, this.properties.dataname);
return this;
}
// Database management methods
getProperties() {
return this.properties;
}
setProperties(properties) {
validateObject(properties);
this.properties.datadir = (validateName(properties.datadir)) ? properties.datadir : defaults.datadir;
this.properties.dataname = (validateName(properties.dataname)) ? properties.dataname : defaults.dataname;
this.properties.datastores = (validateSize(properties.datastores)) ? properties.datastores : defaults.datastores;
this.properties.tempdir = (validateName(properties.tempdir)) ? properties.tempdir : defaults.tempdir;
this.properties.lockoptions = (validateObject(properties.lockoptions)) ? properties.lockoptions : defaults.lockoptions;
this.properties.datapath = join(this.properties.root, this.properties.datadir);
this.properties.temppath = join(this.properties.root, this.properties.tempdir);
saveProperties(this.properties.root, this.properties);
return this.properties;
}
async stats() {
var stats = {
root: resolve(this.properties.root),
data: resolve(this.properties.datapath),
temp: resolve(this.properties.temppath)
};
var promises = [];
for (const storename of this.properties.storenames) {
const storepath = join(this.properties.datapath, storename);
promises.push(statsStoreData(storepath, this.properties.lockoptions));
}
const results = await Promise.all(promises);
return Object.assign(stats, Reducer("stats", results));
}
statsSync() {
var stats = {
root: resolve(this.properties.root),
data: resolve(this.properties.datapath),
temp: resolve(this.properties.temppath)
};
var results = [];
for (const storename of this.properties.storenames) {
const storepath = join(this.properties.datapath, storename);
results.push(statsStoreDataSync(storepath));
}
return Object.assign(stats, Reducer("stats", results));
}
async grow() {
this.properties.datastores++;
const results = await distributeStoreData(this.properties);
this.properties.storenames = await getStoreNames(this.properties.datapath, this.properties.dataname);
saveProperties(this.properties.root, this.properties);
return results;
}
growSync() {
this.properties.datastores++;
const results = distributeStoreDataSync(this.properties);
this.properties.storenames = getStoreNamesSync(this.properties.datapath, this.properties.dataname);
saveProperties(this.properties.root, this.properties);
return results;
}
async shrink() {
if (this.properties.datastores > 1) {
this.properties.datastores--;
const results = await distributeStoreData(this.properties);
this.properties.storenames = await getStoreNames(this.properties.datapath, this.properties.dataname);
saveProperties(this.properties.root, this.properties);
return results;
} else {
throw new Error("Database cannot shrink any further");
}
}
shrinkSync() {
if (this.properties.datastores > 1) {
this.properties.datastores--;
const results = distributeStoreDataSync(this.properties);
this.properties.storenames = getStoreNamesSync(this.properties.datapath, this.properties.dataname);
saveProperties(this.properties.root, this.properties);
return results;
} else {
throw new Error("Database cannot shrink any further");
}
}
async resize(size) {
validateSize(size);
this.properties.datastores = size;
const results = await distributeStoreData(this.properties);
this.properties.storenames = await getStoreNames(this.properties.datapath, this.properties.dataname);
saveProperties(this.properties.root, this.properties);
return results;
}
resizeSync(size) {
validateSize(size);
this.properties.datastores = size;
const results = distributeStoreDataSync(this.properties);
this.properties.storenames = getStoreNamesSync(this.properties.datapath, this.properties.dataname);
saveProperties(this.properties.root, this.properties);
return results;
}
async drop() {
const results = await dropEverything(this.properties);
return Reducer("drop", results);
}
dropSync() {
const results = dropEverythingSync(this.properties);
return Reducer("drop", results);
}
// Data manipulation methods
async insert(data) {
validateArray(data);
var promises = [];
var records = [];
for (let i = 0; i < this.properties.datastores; i++) {
records[i] = "";
}
for (let i = 0; i < data.length; i++) {
records[i % this.properties.datastores] += JSON.stringify(data[i]) + "\n";
}
const randomizer = Randomizer(Array.from(Array(this.properties.datastores).keys()), false);
for (var j = 0; j < records.length; j++) {
if (records[j] !== "") {
const storenumber = randomizer.next();
const storename = [this.properties.dataname, storenumber, "json"].join(".");
const storepath = join(this.properties.datapath, storename)
promises.push(insertStoreData(storepath, records[j], this.properties.lockoptions));
}
}
const results = await Promise.all(promises);
this.properties.storenames = await getStoreNames(this.properties.datapath, this.properties.dataname);
return Reducer("insert", results);
}
insertSync(data) {
validateArray(data);
var results = [];
var records = [];
for (let i = 0; i < this.properties.datastores; i++) {
records[i] = "";
}
for (let i = 0; i < data.length; i++) {
records[i % this.properties.datastores] += JSON.stringify(data[i]) + "\n";
}
const randomizer = Randomizer(Array.from(Array(this.properties.datastores).keys()), false);
for (var j = 0; j < records.length; j++) {
if (records[j] !== "") {
const storenumber = randomizer.next();
const storename = [this.properties.dataname, storenumber, "json"].join(".");
const storepath = join(this.properties.datapath, storename)
results.push(insertStoreDataSync(storepath, records[j], this.properties.lockoptions));
}
}
this.properties.storenames = getStoreNamesSync(this.properties.datapath, this.properties.dataname);
return Reducer("insert", results);
}
async insertFile(file) {
validatePath(file);
const results = await insertFileData(file, this.properties.datapath, this.properties.storenames, this.properties.lockoptions);
return results;
}
insertFileSync(file) {
validatePath(file);
const data = readFileSync(file, "utf8").split("\n");
var records = [];
var results = Result("insertFile");
for (var record of data) {
record = record.trim()
results.lines++;
if (record.length > 0) {
try {
records.push(JSON.parse(record));
} catch (error) {
results.errors.push({ error: error.message, line: results.lines, data: record });
}
} else {
results.blanks++;
}
}
return Object.assign(results, this.insertSync(records));
}
async select(match, project) {
validateFunction(match);
if (project) validateFunction(project);
var promises = [];
for (const storename of this.properties.storenames) {
const storepath = join(this.properties.datapath, storename);
promises.push(selectStoreData(storepath, match, project, this.properties.lockoptions));
}
const results = await Promise.all(promises);
return Reducer("select", results);
}
selectSync(match, project) {
validateFunction(match);
if (project) validateFunction(project);
var results = [];
for (const storename of this.properties.storenames) {
const storepath = join(this.properties.datapath, storename);
results.push(selectStoreDataSync(storepath, match, project));
}
return Reducer("select", results);
}
async update(match, update) {
validateFunction(match);
validateFunction(update);
var promises = [];
for (const storename of this.properties.storenames) {
const storepath = join(this.properties.datapath, storename);
const tempstorename = [storename, Date.now(), "tmp"].join(".");
const tempstorepath = join(this.properties.temppath, tempstorename);
promises.push(updateStoreData(storepath, match, update, tempstorepath, this.properties.lockoptions));
}
const results = await Promise.all(promises);
return Reducer("update", results);
}
updateSync(match, update) {
validateFunction(match);
validateFunction(update);
var results = [];
for (const storename of this.properties.storenames) {
const storepath = join(this.properties.datapath, storename);
const tempstorename = [storename, Date.now(), "tmp"].join(".");
const tempstorepath = join(this.properties.temppath, tempstorename);
results.push(updateStoreDataSync(storepath, match, update, tempstorepath));
}
return Reducer("update", results);
}
async delete(match) {
validateFunction(match);
var promises = [];
for (const storename of this.properties.storenames) {
const storepath = join(this.properties.datapath, storename);
const tempstorename = [storename, Date.now(), "tmp"].join(".");
const tempstorepath = join(this.properties.temppath, tempstorename);
promises.push(deleteStoreData(storepath, match, tempstorepath, this.properties.lockoptions));
}
const results = await Promise.all(promises);
return Reducer("delete", results);
}
deleteSync(match) {
validateFunction(match);
var results = [];
for (const storename of this.properties.storenames) {
const storepath = join(this.properties.datapath, storename);
const tempstorename = [storename, Date.now(), "tmp"].join(".");
const tempstorepath = join(this.properties.temppath, tempstorename);
results.push(deleteStoreDataSync(storepath, match, tempstorepath));
}
return Reducer("delete", results);
}
async aggregate(match, index, project) {
validateFunction(match);
validateFunction(index);
if (project) validateFunction(project);
var promises = [];
for (const storename of this.properties.storenames) {
const storepath = join(this.properties.datapath, storename);
promises.push(aggregateStoreData(storepath, match, index, project, this.properties.lockoptions));
}
const results = await Promise.all(promises);
return Reducer("aggregate", results);
}
aggregateSync(match, index, project) {
validateFunction(match);
validateFunction(index);
if (project) validateFunction(project);
var results = [];
for (const storename of this.properties.storenames) {
const storepath = join(this.properties.datapath, storename);
results.push(aggregateStoreDataSync(storepath, match, index, project));
}
return Reducer("aggregate", results);
}
}
exports.Database = Database;

723
server/libs/njodb/njodb.js Normal file
View file

@ -0,0 +1,723 @@
"use strict";
const {
appendFile,
appendFileSync,
createReadStream,
createWriteStream,
readFileSync,
readdir,
readdirSync,
stat,
statSync,
writeFile
} = require("graceful-fs");
const {
join,
resolve
} = require("path");
const { createInterface } = require("readline");
const { promisify } = require("util");
const {
check,
checkSync,
lock,
lockSync
} = require("../properLockfile");
const {
deleteFile,
deleteFileSync,
deleteDirectory,
deleteDirectorySync,
fileExists,
fileExistsSync,
moveFile,
moveFileSync,
releaseLock,
releaseLockSync,
replaceFile,
replaceFileSync
} = require("./utils");
const {
Handler,
Randomizer,
Result
} = require("./objects");
const filterStoreNames = (files, dataname) => {
var storenames = [];
const re = new RegExp("^" + [dataname, "\\d+", "json"].join(".") + "$");
for (const file of files) {
if (re.test(file)) storenames.push(file);
}
return storenames;
};
const getStoreNames = async (datapath, dataname) => {
const files = await promisify(readdir)(datapath);
return filterStoreNames(files, dataname);
}
const getStoreNamesSync = (datapath, dataname) => {
const files = readdirSync(datapath);
return filterStoreNames(files, dataname);
};
// Database management
const statsStoreData = async (store, lockoptions) => {
var release, stats, results;
release = await lock(store, lockoptions);
const handlerResults = await new Promise((resolve, reject) => {
const reader = createInterface({ input: createReadStream(store), crlfDelay: Infinity });
const handler = Handler("stats");
reader.on("line", record => handler.next(record));
reader.on("close", () => resolve(handler.return()));
reader.on("error", error => reject(error));
});
if (await check(store, lockoptions)) await releaseLock(store, release);
results = Object.assign({ store: resolve(store) }, handlerResults)
stats = await promisify(stat)(store);
results.size = stats.size;
results.created = stats.birthtime;
results.modified = stats.mtime;
results.end = Date.now()
return results;
};
const statsStoreDataSync = (store) => {
var file, release, results;
release = lockSync(store);
file = readFileSync(store, "utf8");
if (checkSync(store)) releaseLockSync(store, release);
const data = file.split("\n");
const handler = Handler("stats");
for (var record of data) {
handler.next(record)
}
results = Object.assign({ store: resolve(store) }, handler.return());
const stats = statSync(store);
results.size = stats.size;
results.created = stats.birthtime;
results.modified = stats.mtime;
results.end = Date.now();
return results;
};
const distributeStoreData = async (properties) => {
var results = Result("distribute");
var storepaths = [];
var tempstorepaths = [];
var locks = [];
for (let storename of properties.storenames) {
const storepath = join(properties.datapath, storename);
storepaths.push(storepath);
locks.push(lock(storepath, properties.lockoptions));
}
const releases = await Promise.all(locks);
var writes = [];
var writers = [];
for (let i = 0; i < properties.datastores; i++) {
const tempstorepath = join(properties.temppath, [properties.dataname, i, results.start, "json"].join("."));
tempstorepaths.push(tempstorepath);
await promisify(writeFile)(tempstorepath, "");
writers.push(createWriteStream(tempstorepath, { flags: "r+" }));
}
for (let storename of properties.storenames) {
writes.push(new Promise((resolve, reject) => {
var line = 0;
const store = join(properties.datapath, storename);
const randomizer = Randomizer(Array.from(Array(properties.datastores).keys()), false);
const reader = createInterface({ input: createReadStream(store), crlfDelay: Infinity });
reader.on("line", record => {
const storenumber = randomizer.next();
line++;
try {
record = JSON.stringify(JSON.parse(record));
results.records++;
} catch {
results.errors.push({ line: line, data: record });
} finally {
writers[storenumber].write(record + "\n");
}
});
reader.on("close", () => {
resolve(true);
});
reader.on("error", error => {
reject(error);
});
}));
}
await Promise.all(writes);
for (let writer of writers) {
writer.end();
}
var deletes = [];
for (let storepath of storepaths) {
deletes.push(deleteFile(storepath));
}
await Promise.all(deletes);
for (const release of releases) {
release();
}
var moves = [];
for (let i = 0; i < tempstorepaths.length; i++) {
moves.push(moveFile(tempstorepaths[i], join(properties.datapath, [properties.dataname, i, "json"].join("."))))
}
await Promise.all(moves);
results.stores = tempstorepaths.length,
results.end = Date.now();
results.elapsed = results.end - results.start;
return results;
};
const distributeStoreDataSync = (properties) => {
var results = Result("distribute");
var storepaths = [];
var tempstorepaths = [];
var releases = [];
var data = [];
for (let storename of properties.storenames) {
const storepath = join(properties.datapath, storename);
storepaths.push(storepath);
releases.push(lockSync(storepath));
const file = readFileSync(storepath, "utf8").trimEnd();
if (file.length > 0) data = data.concat(file.split("\n"));
}
var records = [];
for (var i = 0; i < data.length; i++) {
try {
data[i] = JSON.stringify(JSON.parse(data[i]));
results.records++;
} catch (error) {
results.errors.push({ line: i, data: data[i] });
} finally {
if (i === i % properties.datastores) records[i] = [];
records[i % properties.datastores] += data[i] + "\n";
}
}
const randomizer = Randomizer(Array.from(Array(properties.datastores).keys()), false);
for (var j = 0; j < records.length; j++) {
const storenumber = randomizer.next();
const tempstorepath = join(properties.temppath, [properties.dataname, storenumber, results.start, "json"].join("."));
tempstorepaths.push(tempstorepath);
appendFileSync(tempstorepath, records[j]);
}
for (let storepath of storepaths) {
deleteFileSync(storepath);
}
for (const release of releases) {
release();
}
for (let i = 0; i < tempstorepaths.length; i++) {
moveFileSync(tempstorepaths[i], join(properties.datapath, [properties.dataname, i, "json"].join(".")));
}
results.stores = tempstorepaths.length,
results.end = Date.now();
results.elapsed = results.end - results.start;
return results;
};
const dropEverything = async (properties) => {
var locks = [];
for (let storename of properties.storenames) {
locks.push(lock(join(properties.datapath, storename), properties.lockoptions));
}
const releases = await Promise.all(locks);
var deletes = [];
for (let storename of properties.storenames) {
deletes.push(deleteFile(join(properties.datapath, storename)));
}
var results = await Promise.all(deletes);
for (const release of releases) {
release();
}
deletes = [
deleteDirectory(properties.temppath),
deleteDirectory(properties.datapath),
deleteFile(join(properties.root, "njodb.properties"))
];
results = results.concat(await Promise.all(deletes));
return results;
}
const dropEverythingSync = (properties) => {
var results = [];
var releases = [];
for (let storename of properties.storenames) {
releases.push(lockSync(join(properties.datapath, storename)));
}
for (let storename of properties.storenames) {
results.push(deleteFileSync(join(properties.datapath, storename)));
}
for (const release of releases) {
release();
}
results.push(deleteDirectorySync(properties.temppath));
results.push(deleteDirectorySync(properties.datapath));
results.push(deleteFileSync(join(properties.root, "njodb.properties")));
return results;
}
// Data manipulation
const insertStoreData = async (store, data, lockoptions) => {
let release, results;
results = Object.assign({ store: resolve(store) }, Result("insert"));
if (await fileExists(store)) release = await lock(store, lockoptions);
await promisify(appendFile)(store, data, "utf8");
if (await check(store, lockoptions)) await releaseLock(store, release);
results.inserted = (data.length > 0) ? data.split("\n").length - 1 : 0;
results.end = Date.now();
return results;
};
const insertStoreDataSync = (store, data) => {
let release, results;
results = Object.assign({ store: resolve(store) }, Result("insert"));
if (fileExistsSync(store)) release = lockSync(store);
appendFileSync(store, data, "utf8");
if (checkSync(store)) releaseLockSync(store, release);
results.inserted = (data.length > 0) ? data.split("\n").length - 1 : 0;
results.end = Date.now();
return results;
};
const insertFileData = async (file, datapath, storenames, lockoptions) => {
let datastores, locks, releases, writers, results;
results = Result("insertFile");
datastores = storenames.length;
locks = [];
writers = [];
for (let storename of storenames) {
const storepath = join(datapath, storename);
locks.push(lock(storepath, lockoptions));
writers.push(createWriteStream(storepath, { flags: "r+" }));
}
releases = await Promise.all(locks);
await new Promise((resolve, reject) => {
const randomizer = Randomizer(Array.from(Array(datastores).keys()), false);
const reader = createInterface({ input: createReadStream(file), crlfDelay: Infinity });
reader.on("line", record => {
record = record.trim();
const storenumber = randomizer.next();
results.lines++;
if (record.length > 0) {
try {
record = JSON.parse(record);
results.inserted++;
} catch (error) {
results.errors.push({ error: error.message, line: results.lines, data: record });
} finally {
writers[storenumber].write(JSON.stringify(record) + "\n");
}
} else {
results.blanks++;
}
});
reader.on("close", () => {
resolve(true);
});
reader.on("error", error => {
reject(error);
});
});
for (const writer of writers) {
writer.end();
}
for (const release of releases) {
release();
}
results.end = Date.now();
results.elapsed = results.end - results.start;
return results;
}
const selectStoreData = async (store, match, project, lockoptions) => {
let release, results;
release = await lock(store, lockoptions);
const handlerResults = await new Promise((resolve, reject) => {
const reader = createInterface({ input: createReadStream(store), crlfDelay: Infinity });
const handler = Handler("select", match, project);
reader.on("line", record => handler.next(record));
reader.on("close", () => resolve(handler.return()));
reader.on("error", error => reject(error));
});
if (await check(store, lockoptions)) await releaseLock(store, release);
results = Object.assign({ store: store }, handlerResults);
return results;
};
const selectStoreDataSync = (store, match, project) => {
let file, release, results;
release = lockSync(store);
file = readFileSync(store, "utf8");
if (checkSync(store)) releaseLockSync(store, release);
const records = file.split("\n");
const handler = Handler("select", match, project);
for (var record of records) {
handler.next(record);
}
results = Object.assign({ store: store }, handler.return());
return results;
};
const updateStoreData = async (store, match, update, tempstore, lockoptions) => {
let release, results;
release = await lock(store, lockoptions);
const handlerResults = await new Promise((resolve, reject) => {
const writer = createWriteStream(tempstore);
const handler = Handler("update", match, update);
writer.on("open", () => {
// Reader was opening and closing before writer ever opened
const reader = createInterface({ input: createReadStream(store), crlfDelay: Infinity });
reader.on("line", record => {
handler.next(record, writer)
});
reader.on("close", () => {
writer.end();
resolve(handler.return());
});
reader.on("error", error => reject(error));
});
writer.on("error", error => reject(error));
});
results = Object.assign({ store: store, tempstore: tempstore }, handlerResults);
if (results.updated > 0) {
if (!await replaceFile(store, tempstore)) {
results.errors = [...results.records];
results.updated = 0;
}
} else {
await deleteFile(tempstore);
}
if (await check(store, lockoptions)) await releaseLock(store, release);
results.end = Date.now();
delete results.data;
delete results.records;
return results;
};
const updateStoreDataSync = (store, match, update, tempstore) => {
let file, release, results;
release = lockSync(store);
file = readFileSync(store, "utf8").trimEnd();
if (checkSync(store)) releaseLockSync(store, release);
const records = file.split("\n");
const handler = Handler("update", match, update);
for (var record of records) {
handler.next(record);
}
results = Object.assign({ store: store, tempstore: tempstore }, handler.return());
if (results.updated > 0) {
let append, replace;
try {
appendFileSync(tempstore, results.data.join("\n") + "\n", "utf8");
append = true;
} catch {
append = false;
}
if (append) replace = replaceFileSync(store, tempstore);
if (!(append || replace)) {
results.errors = [...results.records];
results.updated = 0;
}
}
results.end = Date.now();
delete results.data;
delete results.records;
return results;
};
const deleteStoreData = async (store, match, tempstore, lockoptions) => {
let release, results;
release = await lock(store, lockoptions);
const handlerResults = await new Promise((resolve, reject) => {
const writer = createWriteStream(tempstore);
const handler = Handler("delete", match);
writer.on("open", () => {
// Create reader after writer opens otherwise the reader can sometimes close before the writer opens
const reader = createInterface({ input: createReadStream(store), crlfDelay: Infinity });
reader.on("line", record => handler.next(record, writer));
reader.on("close", () => {
writer.end();
resolve(handler.return());
});
reader.on("error", error => reject(error));
});
writer.on("error", error => reject(error));
});
results = Object.assign({ store: store, tempstore: tempstore }, handlerResults);
if (results.deleted > 0) {
if (!await replaceFile(store, tempstore)) {
results.errors = [...results.records];
results.deleted = 0;
}
} else {
await deleteFile(tempstore);
}
if (await check(store, lockoptions)) await releaseLock(store, release);
results.end = Date.now();
delete results.data;
delete results.records;
return results;
};
const deleteStoreDataSync = (store, match, tempstore) => {
let file, release, results;
release = lockSync(store);
file = readFileSync(store, "utf8");
if (checkSync(store)) releaseLockSync(store, release);
const records = file.split("\n");
const handler = Handler("delete", match);
for (var record of records) {
handler.next(record)
}
results = Object.assign({ store: store, tempstore: tempstore }, handler.return());
if (results.deleted > 0) {
let append, replace;
try {
appendFileSync(tempstore, results.data.join("\n") + "\n", "utf8");
append = true;
} catch {
append = false;
}
if (append) replace = replaceFileSync(store, tempstore);
if (!(append || replace)) {
results.errors = [...results.records];
results.updated = 0;
}
}
results.end = Date.now();
delete results.data;
delete results.records;
return results;
};
const aggregateStoreData = async (store, match, index, project, lockoptions) => {
let release, results;
release = await lock(store, lockoptions);
const handlerResults = await new Promise((resolve, reject) => {
const reader = createInterface({ input: createReadStream(store), crlfDelay: Infinity });
const handler = Handler("aggregate", match, index, project);
reader.on("line", record => handler.next(record));
reader.on("close", () => resolve(handler.return()));
reader.on("error", error => reject(error));
});
if (await check(store, lockoptions)) releaseLock(store, release);
results = Object.assign({ store: store }, handlerResults);
return results;
}
const aggregateStoreDataSync = (store, match, index, project) => {
let file, release, results;
release = lockSync(store);
file = readFileSync(store, "utf8");
if (checkSync(store)) releaseLockSync(store, release);
const records = file.split("\n");
const handler = Handler("aggregate", match, index, project);
for (var record of records) {
handler.next(record);
}
results = Object.assign({ store: store }, handler.return());
return results;
}
exports.getStoreNames = getStoreNames;
exports.getStoreNamesSync = getStoreNamesSync;
// Database management
exports.statsStoreData = statsStoreData;
exports.statsStoreDataSync = statsStoreDataSync;
exports.distributeStoreData = distributeStoreData;
exports.distributeStoreDataSync = distributeStoreDataSync;
exports.dropEverything = dropEverything;
exports.dropEverythingSync = dropEverythingSync;
// Data manipulation
exports.insertStoreData = insertStoreData;
exports.insertStoreDataSync = insertStoreDataSync;
exports.insertFileData = insertFileData;
exports.selectStoreData = selectStoreData;
exports.selectStoreDataSync = selectStoreDataSync;
exports.updateStoreData = updateStoreData;
exports.updateStoreDataSync = updateStoreDataSync;
exports.deleteStoreData = deleteStoreData;
exports.deleteStoreDataSync = deleteStoreDataSync;
exports.aggregateStoreData = aggregateStoreData;
exports.aggregateStoreDataSync = aggregateStoreDataSync;

View file

@ -0,0 +1,608 @@
"use strict";
const {
convertSize,
max,
min
} = require("./utils");
const Randomizer = (data, replacement) => {
var mutable = [...data];
if (replacement === undefined || typeof replacement !== "boolean") replacement = true;
function _next() {
var selection;
const index = Math.floor(Math.random() * mutable.length);
if (replacement) {
selection = mutable.slice(index, index + 1)[0];
} else {
selection = mutable.splice(index, 1)[0];
if (mutable.length === 0) mutable = [...data];
}
return selection;
}
return {
next: _next
};
};
const Result = (type) => {
var _result;
switch (type) {
case "stats":
_result = {
size: 0,
lines: 0,
records: 0,
errors: [],
blanks: 0,
created: undefined,
modified: undefined,
start: Date.now(),
end: undefined,
elapsed: 0
};
break;
case "distribute":
_result = {
stores: undefined,
records: 0,
errors: [],
start: Date.now(),
end: undefined,
elapsed: undefined
};
break;
case "insert":
_result = {
inserted: 0,
start: Date.now(),
end: undefined,
elapsed: 0
};
break;
case "insertFile":
_result = {
lines: 0,
inserted: 0,
errors: [],
blanks: 0,
start: Date.now(),
end: undefined
};
break;
case "select":
_result = {
lines: 0,
selected: 0,
ignored: 0,
errors: [],
blanks: 0,
start: Date.now(),
end: undefined,
elapsed: 0,
data: [],
};
break;
case "update":
_result = {
lines: 0,
selected: 0,
updated: 0,
unchanged: 0,
errors: [],
blanks: 0,
start: Date.now(),
end: undefined,
elapsed: 0,
data: [],
records: []
};
break;
case "delete":
_result = {
lines: 0,
deleted: 0,
retained: 0,
errors: [],
blanks: 0,
start: Date.now(),
end: undefined,
elapsed: 0,
data: [],
records: []
};
break;
case "aggregate":
_result = {
lines: 0,
aggregates: {},
indexed: 0,
unindexed: 0,
errors: [],
blanks: 0,
start: Date.now(),
end: undefined,
elapsed: 0
};
break;
}
return _result;
}
const Reduce = (type) => {
var _reduce;
switch (type) {
case "stats":
_reduce = Object.assign(Result("stats"), {
stores: 0,
min: undefined,
max: undefined,
mean: undefined,
var: undefined,
std: undefined,
m2: 0
});
break;
case "drop":
_reduce = {
dropped: false,
start: Date.now(),
end: 0,
elapsed: 0
};
break;
case "aggregate":
_reduce = Object.assign(Result("aggregate"), {
data: []
});
break;
default:
_reduce = Result(type);
break;
}
_reduce.details = undefined;
return _reduce;
};
const Handler = (type, ...functions) => {
var _results = Result(type);
const _next = (record, writer) => {
record = new Record(record);
_results.lines++;
if (record.length === 0) {
_results.blanks++;
} else {
if (record.data) {
switch (type) {
case "stats":
statsHandler(record, _results);
break;
case "select":
selectHandler(record, functions[0], functions[1], _results);
break;
case "update":
updateHandler(record, functions[0], functions[1], writer, _results);
break;
case "delete":
deleteHandler(record, functions[0], writer, _results);
break;
case "aggregate":
aggregateHandler(record, functions[0], functions[1], functions[2], _results);
break;
}
} else {
_results.errors.push({ error: record.error, line: _results.lines, data: record.source });
if (type === "update" || type === "delete") {
if (writer) {
writer.write(record.source + "\n");
} else {
_results.data.push(record.source);
}
}
}
}
};
const _return = () => {
_results.end = Date.now();
_results.elapsed = _results.end - _results.start;
return _results;
}
return {
next: _next,
return: _return
};
};
const statsHandler = (record, results) => {
results.records++;
return results;
};
const selectHandler = (record, selecter, projecter, results) => {
if (record.select(selecter)) {
if (projecter) {
results.data.push(record.project(projecter));
} else {
results.data.push(record.data);
}
results.selected++;
} else {
results.ignored++;
}
};
const updateHandler = (record, selecter, updater, writer, results) => {
if (record.select(selecter)) {
results.selected++;
if (record.update(updater)) {
results.updated++;
results.records.push(record.data);
} else {
results.unchanged++;
}
} else {
results.unchanged++;
}
if (writer) {
writer.write(JSON.stringify(record.data) + "\n");
} else {
results.data.push(JSON.stringify(record.data));
}
};
const deleteHandler = (record, selecter, writer, results) => {
if (record.select(selecter)) {
results.deleted++;
results.records.push(record.data);
} else {
results.retained++;
if (writer) {
writer.write(JSON.stringify(record.data) + "\n");
} else {
results.data.push(JSON.stringify(record.data));
}
}
};
const aggregateHandler = (record, selecter, indexer, projecter, results) => {
if (record.select(selecter)) {
const index = record.index(indexer);
if (!index) {
results.unindexed++;
} else {
var projection;
var fields;
if (results.aggregates[index]) {
results.aggregates[index].count++;
} else {
results.aggregates[index] = {
count: 1,
aggregates: {}
};
}
if (projecter) {
projection = record.project(projecter);
fields = Object.keys(projection);
} else {
projection = record.data;
fields = Object.keys(record.data);
}
for (const field of fields) {
if (projection[field] !== undefined) {
if (results.aggregates[index].aggregates[field]) {
accumulateAggregate(results.aggregates[index].aggregates[field], projection[field]);
} else {
results.aggregates[index].aggregates[field] = {
min: projection[field],
max: projection[field],
count: 1
};
if (typeof projection[field] === "number") {
results.aggregates[index].aggregates[field]["sum"] = projection[field];
results.aggregates[index].aggregates[field]["mean"] = projection[field];
results.aggregates[index].aggregates[field]["m2"] = 0;
}
}
}
}
results.indexed++;
}
}
}
const accumulateAggregate = (index, projection) => {
index["min"] = min(index["min"], projection);
index["max"] = max(index["max"], projection);
index["count"]++;
// Welford's algorithm
if (typeof projection === "number") {
const delta1 = projection - index["mean"];
index["sum"] += projection;
index["mean"] += delta1 / index["count"];
const delta2 = projection - index["mean"];
index["m2"] += delta1 * delta2;
}
return index;
};
class Record {
constructor(record) {
this.source = record.trim();
this.length = this.source.length
this.data = {};
this.error = "";
try {
this.data = JSON.parse(this.source)
} catch (e) {
this.data = undefined;
this.error = e.message;
}
}
}
Record.prototype.select = function (selecter) {
var result;
try {
result = selecter(this.data);
} catch {
return false;
}
if (typeof result !== "boolean") {
throw new TypeError("Selecter must return a boolean");
} else {
return result;
}
};
Record.prototype.update = function (updater) {
var result;
try {
result = updater(this.data);
} catch {
return false;
}
if (typeof result !== "object") {
throw new TypeError("Updater must return an object");
} else {
this.data = result;
return true;
}
}
Record.prototype.project = function (projecter) {
var result;
try {
result = projecter(this.data);
} catch {
return undefined;
}
if (Array.isArray(result) || typeof result !== "object") {
throw new TypeError("Projecter must return an object");
} else {
return result;
}
};
Record.prototype.index = function (indexer) {
try {
return indexer(this.data);
} catch {
return undefined;
}
};
const Reducer = (type, results) => {
var _reduce = Reduce(type);
var i = 0;
var aggregates = {};
for (const result of results) {
switch (type) {
case "stats":
statsReducer(_reduce, result, i);
break;
case "insert":
insertReducer(_reduce, result);
break;
case "select":
selectReducer(_reduce, result);
break;
case "update":
updateReducer(_reduce, result);
break;
case "delete":
deleteReducer(_reduce, result);
break;
case "aggregate":
aggregateReducer(_reduce, result, aggregates);
break
}
if (type === "stats") {
_reduce.stores++;
i++;
}
if (type === "drop") {
_reduce.dropped = true;
} else if (type !== "insert") {
_reduce.lines += result.lines;
_reduce.errors = _reduce.errors.concat(result.errors);
_reduce.blanks += result.blanks;
}
_reduce.start = min(_reduce.start, result.start);
_reduce.end = max(_reduce.end, result.end);
}
if (type === "stats") {
_reduce.size = convertSize(_reduce.size);
_reduce.var = _reduce.m2 / (results.length);
_reduce.std = Math.sqrt(_reduce.m2 / (results.length));
delete _reduce.m2;
} else if (type === "aggregate") {
for (const index of Object.keys(aggregates)) {
var aggregate = {
index: index,
count: aggregates[index].count,
aggregates: []
};
for (const field of Object.keys(aggregates[index].aggregates)) {
delete aggregates[index].aggregates[field].m2;
aggregate.aggregates.push({ field: field, data: aggregates[index].aggregates[field] });
}
_reduce.data.push(aggregate);
}
delete _reduce.aggregates;
}
_reduce.elapsed = _reduce.end - _reduce.start;
_reduce.details = results;
return _reduce;
};
const statsReducer = (reduce, result, i) => {
reduce.size += result.size;
reduce.records += result.records;
reduce.min = min(reduce.min, result.records);
reduce.max = max(reduce.max, result.records);
if (reduce.mean === undefined) reduce.mean = result.records;
const delta1 = result.records - reduce.mean;
reduce.mean += delta1 / (i + 2);
const delta2 = result.records - reduce.mean;
reduce.m2 += delta1 * delta2;
reduce.created = min(reduce.created, result.created);
reduce.modified = max(reduce.modified, result.modified);
};
const insertReducer = (reduce, result) => {
reduce.inserted += result.inserted;
};
const selectReducer = (reduce, result) => {
reduce.selected += result.selected;
reduce.ignored += result.ignored;
reduce.data = reduce.data.concat(result.data);
delete result.data;
};
const updateReducer = (reduce, result) => {
reduce.selected += result.selected;
reduce.updated += result.updated;
reduce.unchanged += result.unchanged;
};
const deleteReducer = (reduce, result) => {
reduce.deleted += result.deleted;
reduce.retained += result.retained;
};
const aggregateReducer = (reduce, result, aggregates) => {
reduce.indexed += result.indexed;
reduce.unindexed += result.unindexed;
const indexes = Object.keys(result.aggregates);
for (const index of indexes) {
if (aggregates[index]) {
aggregates[index].count += result.aggregates[index].count;
} else {
aggregates[index] = {
count: result.aggregates[index].count,
aggregates: {}
};
}
const fields = Object.keys(result.aggregates[index].aggregates);
for (const field of fields) {
const aggregateObject = aggregates[index].aggregates[field];
const resultObject = result.aggregates[index].aggregates[field];
if (aggregateObject) {
reduceAggregate(aggregateObject, resultObject);
} else {
aggregates[index].aggregates[field] = {
min: resultObject["min"],
max: resultObject["max"],
count: resultObject["count"]
};
if (resultObject["m2"] !== undefined) {
aggregates[index].aggregates[field]["sum"] = resultObject["sum"];
aggregates[index].aggregates[field]["mean"] = resultObject["mean"];
aggregates[index].aggregates[field]["varp"] = resultObject["m2"] / resultObject["count"];
aggregates[index].aggregates[field]["vars"] = resultObject["m2"] / (resultObject["count"] - 1);
aggregates[index].aggregates[field]["stdp"] = Math.sqrt(resultObject["m2"] / resultObject["count"]);
aggregates[index].aggregates[field]["stds"] = Math.sqrt(resultObject["m2"] / (resultObject["count"] - 1));
aggregates[index].aggregates[field]["m2"] = resultObject["m2"];
}
}
}
}
delete result.aggregates;
};
const reduceAggregate = (aggregate, result) => {
const n = aggregate["count"] + result["count"];
aggregate["min"] = min(aggregate["min"], result["min"]);
aggregate["max"] = max(aggregate["max"], result["max"]);
// Parallel version of Welford's algorithm
if (result["m2"] !== undefined) {
const delta = result["mean"] - aggregate["mean"];
const m2 = aggregate["m2"] + result["m2"] + (Math.pow(delta, 2) * ((aggregate["count"] * result["count"]) / n));
aggregate["m2"] = m2;
aggregate["varp"] = m2 / n;
aggregate["vars"] = m2 / (n - 1);
aggregate["stdp"] = Math.sqrt(m2 / n);
aggregate["stds"] = Math.sqrt(m2 / (n - 1));
}
if (result["sum"] !== undefined) {
aggregate["mean"] = (aggregate["sum"] + result["sum"]) / n;
aggregate["sum"] += result["sum"];
}
aggregate["count"] = n;
};
exports.Randomizer = Randomizer;
exports.Result = Result;
exports.Reduce = Reduce;
exports.Handler = Handler;
exports.Reducer = Reducer;

178
server/libs/njodb/utils.js Normal file
View file

@ -0,0 +1,178 @@
"use strict";
const {
access,
constants,
existsSync,
rename,
renameSync,
rmdir,
rmdirSync,
unlink,
unlinkSync
} = require("graceful-fs");
const { promisify } = require("util");
const min = (a, b) => {
if (b === undefined || a <= b) return a;
return b;
};
const max = (a, b) => {
if (b === undefined || a > b) return a;
return b;
};
const convertSize = (size) => {
const sizes = ["bytes", "KB", "MB", "GB"];
var index = Math.floor(Math.log2(size) / 10);
if (index > 3) index = 3;
return Math.round(((size / Math.pow(1024, index)) + Number.EPSILON) * 100) / 100 + " " + sizes[index];
};
const fileExists = async (a) => {
try {
await promisify(access)(a, constants.F_OK);
return true;
} catch (error) {
// console.error(error); file does not exist no need for error
return false;
}
}
const fileExistsSync = (a) => {
try {
return existsSync(a);
} catch (error) {
console.error(error);
return false;
}
}
const moveFile = async (a, b) => {
try {
await promisify(rename)(a, b);
return true;
} catch (error) {
console.error(error);
return false;
}
};
const moveFileSync = (a, b) => {
try {
renameSync(a, b);
return true;
} catch (error) {
console.error(error);
return false;
}
};
const deleteFile = async (filepath) => {
try {
await promisify(unlink)(filepath);
return true;
} catch (error) {
console.error(error);
return false;
}
};
const deleteFileSync = (filepath) => {
try {
unlinkSync(filepath);
return true;
} catch (error) {
console.error(error);
return false;
}
}
const replaceFile = async (a, b) => {
if (!await moveFile(a, a + ".old")) return false;
if (!await moveFile(b, a)) {
await moveFile(a + ".old", a);
return false;
}
await deleteFile(a + ".old");
return true;
};
const replaceFileSync = (a, b) => {
if (!moveFileSync(a, a + ".old")) return false;
if (!moveFileSync(b, a)) {
moveFile(a + ".old", a);
return false;
}
deleteFileSync(a + ".old");
return true;
};
const deleteDirectory = async (dirpath) => {
try {
await promisify(rmdir)(dirpath);
return true;
} catch {
return false;
}
};
const deleteDirectorySync = (dirpath) => {
try {
rmdirSync(dirpath);
return true;
} catch {
return false;
}
};
const releaseLock = async (store, release) => {
try {
await release();
} catch (error) {
if (!["ERELEASED", "ENOTACQUIRED"].includes(error.code)) {
error.store = store;
throw error;
}
}
}
const releaseLockSync = (store, release) => {
try {
release();
} catch (error) {
if (!["ERELEASED", "ENOTACQUIRED"].includes(error.code)) {
error.store = store;
throw error;
}
}
}
exports.min = min;
exports.max = max;
exports.convertSize = convertSize;
exports.fileExists = fileExists;
exports.fileExistsSync = fileExistsSync;
exports.moveFile = moveFile;
exports.moveFileSync = moveFileSync;
exports.replaceFile = replaceFile;
exports.replaceFileSync = replaceFileSync;
exports.deleteFile = deleteFile;
exports.deleteFileSync = deleteFileSync;
exports.deleteDirectory = deleteDirectory;
exports.deleteDirectorySync = deleteDirectorySync;
exports.releaseLock = releaseLock;
exports.releaseLockSync = releaseLockSync;

View file

@ -0,0 +1,70 @@
"use strict";
const { existsSync } = require("graceful-fs");
const validateSize = (s) => {
if (typeof s !== "number") {
throw new TypeError("Size must be a number");
} else if (s <= 0) {
throw new RangeError("Size must be greater than zero");
}
return s;
};
const validateName = (n) => {
if (typeof n !== "string") {
throw new TypeError("Name must be a string");
} else if (n.trim().length <= 0) {
throw new Error("Name must be a non-blank string")
}
return n;
};
const validatePath = (p) => {
if (typeof p !== "string") {
throw new TypeError("Path must be a string");
} else if (p.trim().length <= 0) {
throw new Error("Path must be a non-blank string");
} else if (!existsSync(p)) {
throw new Error("Path does not exist");
}
return p;
};
const validateArray = (a) => {
if (!Array.isArray(a)) {
throw new TypeError("Not an array");
}
return a;
};
const validateObject = (o) => {
if (typeof o !== "object") {
throw new TypeError("Not an object");
}
return o;
};
const validateFunction = (f) => {
if (typeof f !== "function") {
throw new TypeError("Not a function")
}
// } else {
// const fString = f.toString();
// if (/\s*function/.test(fString) && !/\W+return\W+/.test(fString)) throw new Error("Function must return a value");
// }
return f;
}
exports.validateSize = validateSize;
exports.validateName = validateName;
exports.validatePath = validatePath;
exports.validateArray = validateArray;
exports.validateObject = validateObject;
exports.validateFunction = validateFunction;

View file

@ -0,0 +1,40 @@
'use strict';
const lockfile = require('./lib/lockfile');
const { toPromise, toSync, toSyncOptions } = require('./lib/adapter');
async function lock(file, options) {
const release = await toPromise(lockfile.lock)(file, options);
return toPromise(release);
}
function lockSync(file, options) {
const release = toSync(lockfile.lock)(file, toSyncOptions(options));
return toSync(release);
}
function unlock(file, options) {
return toPromise(lockfile.unlock)(file, options);
}
function unlockSync(file, options) {
return toSync(lockfile.unlock)(file, toSyncOptions(options));
}
function check(file, options) {
return toPromise(lockfile.check)(file, options);
}
function checkSync(file, options) {
return toSync(lockfile.check)(file, toSyncOptions(options));
}
module.exports = lock;
module.exports.lock = lock;
module.exports.unlock = unlock;
module.exports.lockSync = lockSync;
module.exports.unlockSync = unlockSync;
module.exports.check = check;
module.exports.checkSync = checkSync;

View file

@ -0,0 +1,85 @@
'use strict';
const fs = require('graceful-fs');
function createSyncFs(fs) {
const methods = ['mkdir', 'realpath', 'stat', 'rmdir', 'utimes'];
const newFs = { ...fs };
methods.forEach((method) => {
newFs[method] = (...args) => {
const callback = args.pop();
let ret;
try {
ret = fs[`${method}Sync`](...args);
} catch (err) {
return callback(err);
}
callback(null, ret);
};
});
return newFs;
}
// ----------------------------------------------------------
function toPromise(method) {
return (...args) => new Promise((resolve, reject) => {
args.push((err, result) => {
if (err) {
reject(err);
} else {
resolve(result);
}
});
method(...args);
});
}
function toSync(method) {
return (...args) => {
let err;
let result;
args.push((_err, _result) => {
err = _err;
result = _result;
});
method(...args);
if (err) {
throw err;
}
return result;
};
}
function toSyncOptions(options) {
// Shallow clone options because we are oging to mutate them
options = { ...options };
// Transform fs to use the sync methods instead
options.fs = createSyncFs(options.fs || fs);
// Retries are not allowed because it requires the flow to be sync
if (
(typeof options.retries === 'number' && options.retries > 0) ||
(options.retries && typeof options.retries.retries === 'number' && options.retries.retries > 0)
) {
throw Object.assign(new Error('Cannot use retries with the sync api'), { code: 'ESYNC' });
}
return options;
}
module.exports = {
toPromise,
toSync,
toSyncOptions,
};

View file

@ -0,0 +1,342 @@
'use strict';
const path = require('path');
const fs = require('graceful-fs');
const retry = require('../../retry');
const onExit = require('../../signalExit');
const mtimePrecision = require('./mtime-precision');
const locks = {};
function getLockFile(file, options) {
return options.lockfilePath || `${file}.lock`;
}
function resolveCanonicalPath(file, options, callback) {
if (!options.realpath) {
return callback(null, path.resolve(file));
}
// Use realpath to resolve symlinks
// It also resolves relative paths
options.fs.realpath(file, callback);
}
function acquireLock(file, options, callback) {
const lockfilePath = getLockFile(file, options);
// Use mkdir to create the lockfile (atomic operation)
options.fs.mkdir(lockfilePath, (err) => {
if (!err) {
// At this point, we acquired the lock!
// Probe the mtime precision
return mtimePrecision.probe(lockfilePath, options.fs, (err, mtime, mtimePrecision) => {
// If it failed, try to remove the lock..
/* istanbul ignore if */
if (err) {
options.fs.rmdir(lockfilePath, () => { });
return callback(err);
}
callback(null, mtime, mtimePrecision);
});
}
// If error is not EEXIST then some other error occurred while locking
if (err.code !== 'EEXIST') {
return callback(err);
}
// Otherwise, check if lock is stale by analyzing the file mtime
if (options.stale <= 0) {
return callback(Object.assign(new Error('Lock file is already being held'), { code: 'ELOCKED', file }));
}
options.fs.stat(lockfilePath, (err, stat) => {
if (err) {
// Retry if the lockfile has been removed (meanwhile)
// Skip stale check to avoid recursiveness
if (err.code === 'ENOENT') {
return acquireLock(file, { ...options, stale: 0 }, callback);
}
return callback(err);
}
if (!isLockStale(stat, options)) {
return callback(Object.assign(new Error('Lock file is already being held'), { code: 'ELOCKED', file }));
}
// If it's stale, remove it and try again!
// Skip stale check to avoid recursiveness
removeLock(file, options, (err) => {
if (err) {
return callback(err);
}
acquireLock(file, { ...options, stale: 0 }, callback);
});
});
});
}
function isLockStale(stat, options) {
return stat.mtime.getTime() < Date.now() - options.stale;
}
function removeLock(file, options, callback) {
// Remove lockfile, ignoring ENOENT errors
options.fs.rmdir(getLockFile(file, options), (err) => {
if (err && err.code !== 'ENOENT') {
return callback(err);
}
callback();
});
}
function updateLock(file, options) {
const lock = locks[file];
// Just for safety, should never happen
/* istanbul ignore if */
if (lock.updateTimeout) {
return;
}
lock.updateDelay = lock.updateDelay || options.update;
lock.updateTimeout = setTimeout(() => {
lock.updateTimeout = null;
// Stat the file to check if mtime is still ours
// If it is, we can still recover from a system sleep or a busy event loop
options.fs.stat(lock.lockfilePath, (err, stat) => {
const isOverThreshold = lock.lastUpdate + options.stale < Date.now();
// If it failed to update the lockfile, keep trying unless
// the lockfile was deleted or we are over the threshold
if (err) {
if (err.code === 'ENOENT' || isOverThreshold) {
return setLockAsCompromised(file, lock, Object.assign(err, { code: 'ECOMPROMISED' }));
}
lock.updateDelay = 1000;
return updateLock(file, options);
}
const isMtimeOurs = lock.mtime.getTime() === stat.mtime.getTime();
if (!isMtimeOurs) {
return setLockAsCompromised(
file,
lock,
Object.assign(
new Error('Unable to update lock within the stale threshold'),
{ code: 'ECOMPROMISED' }
));
}
const mtime = mtimePrecision.getMtime(lock.mtimePrecision);
options.fs.utimes(lock.lockfilePath, mtime, mtime, (err) => {
const isOverThreshold = lock.lastUpdate + options.stale < Date.now();
// Ignore if the lock was released
if (lock.released) {
return;
}
// If it failed to update the lockfile, keep trying unless
// the lockfile was deleted or we are over the threshold
if (err) {
if (err.code === 'ENOENT' || isOverThreshold) {
return setLockAsCompromised(file, lock, Object.assign(err, { code: 'ECOMPROMISED' }));
}
lock.updateDelay = 1000;
return updateLock(file, options);
}
// All ok, keep updating..
lock.mtime = mtime;
lock.lastUpdate = Date.now();
lock.updateDelay = null;
updateLock(file, options);
});
});
}, lock.updateDelay);
// Unref the timer so that the nodejs process can exit freely
// This is safe because all acquired locks will be automatically released
// on process exit
// We first check that `lock.updateTimeout.unref` exists because some users
// may be using this module outside of NodeJS (e.g., in an electron app),
// and in those cases `setTimeout` return an integer.
/* istanbul ignore else */
if (lock.updateTimeout.unref) {
lock.updateTimeout.unref();
}
}
function setLockAsCompromised(file, lock, err) {
// Signal the lock has been released
lock.released = true;
// Cancel lock mtime update
// Just for safety, at this point updateTimeout should be null
/* istanbul ignore if */
if (lock.updateTimeout) {
clearTimeout(lock.updateTimeout);
}
if (locks[file] === lock) {
delete locks[file];
}
lock.options.onCompromised(err);
}
// ----------------------------------------------------------
function lock(file, options, callback) {
/* istanbul ignore next */
options = {
stale: 10000,
update: null,
realpath: true,
retries: 0,
fs,
onCompromised: (err) => { throw err; },
...options,
};
options.retries = options.retries || 0;
options.retries = typeof options.retries === 'number' ? { retries: options.retries } : options.retries;
options.stale = Math.max(options.stale || 0, 2000);
options.update = options.update == null ? options.stale / 2 : options.update || 0;
options.update = Math.max(Math.min(options.update, options.stale / 2), 1000);
// Resolve to a canonical file path
resolveCanonicalPath(file, options, (err, file) => {
if (err) {
return callback(err);
}
// Attempt to acquire the lock
const operation = retry.operation(options.retries);
operation.attempt(() => {
acquireLock(file, options, (err, mtime, mtimePrecision) => {
if (operation.retry(err)) {
return;
}
if (err) {
return callback(operation.mainError());
}
// We now own the lock
const lock = locks[file] = {
lockfilePath: getLockFile(file, options),
mtime,
mtimePrecision,
options,
lastUpdate: Date.now(),
};
// We must keep the lock fresh to avoid staleness
updateLock(file, options);
callback(null, (releasedCallback) => {
if (lock.released) {
return releasedCallback &&
releasedCallback(Object.assign(new Error('Lock is already released'), { code: 'ERELEASED' }));
}
// Not necessary to use realpath twice when unlocking
unlock(file, { ...options, realpath: false }, releasedCallback);
});
});
});
});
}
function unlock(file, options, callback) {
options = {
fs,
realpath: true,
...options,
};
// Resolve to a canonical file path
resolveCanonicalPath(file, options, (err, file) => {
if (err) {
return callback(err);
}
// Skip if the lock is not acquired
const lock = locks[file];
if (!lock) {
return callback(Object.assign(new Error('Lock is not acquired/owned by you'), { code: 'ENOTACQUIRED' }));
}
lock.updateTimeout && clearTimeout(lock.updateTimeout); // Cancel lock mtime update
lock.released = true; // Signal the lock has been released
delete locks[file]; // Delete from locks
removeLock(file, options, callback);
});
}
function check(file, options, callback) {
options = {
stale: 10000,
realpath: true,
fs,
...options,
};
options.stale = Math.max(options.stale || 0, 2000);
// Resolve to a canonical file path
resolveCanonicalPath(file, options, (err, file) => {
if (err) {
return callback(err);
}
// Check if lockfile exists
options.fs.stat(getLockFile(file, options), (err, stat) => {
if (err) {
// If does not exist, file is not locked. Otherwise, callback with error
return err.code === 'ENOENT' ? callback(null, false) : callback(err);
}
// Otherwise, check if lock is stale by analyzing the file mtime
return callback(null, !isLockStale(stat, options));
});
});
}
function getLocks() {
return locks;
}
// Remove acquired locks on exit
/* istanbul ignore next */
onExit(() => {
for (const file in locks) {
const options = locks[file].options;
try { options.fs.rmdirSync(getLockFile(file, options)); } catch (e) { /* Empty */ }
}
});
module.exports.lock = lock;
module.exports.unlock = unlock;
module.exports.check = check;
module.exports.getLocks = getLocks;

View file

@ -0,0 +1,55 @@
'use strict';
const cacheSymbol = Symbol();
function probe(file, fs, callback) {
const cachedPrecision = fs[cacheSymbol];
if (cachedPrecision) {
return fs.stat(file, (err, stat) => {
/* istanbul ignore if */
if (err) {
return callback(err);
}
callback(null, stat.mtime, cachedPrecision);
});
}
// Set mtime by ceiling Date.now() to seconds + 5ms so that it's "not on the second"
const mtime = new Date((Math.ceil(Date.now() / 1000) * 1000) + 5);
fs.utimes(file, mtime, mtime, (err) => {
/* istanbul ignore if */
if (err) {
return callback(err);
}
fs.stat(file, (err, stat) => {
/* istanbul ignore if */
if (err) {
return callback(err);
}
const precision = stat.mtime.getTime() % 1000 === 0 ? 's' : 'ms';
// Cache the precision in a non-enumerable way
Object.defineProperty(fs, cacheSymbol, { value: precision });
callback(null, stat.mtime, precision);
});
});
}
function getMtime(precision) {
let now = Date.now();
if (precision === 's') {
now = Math.ceil(now / 1000) * 1000;
}
return new Date(now);
}
module.exports.probe = probe;
module.exports.getMtime = getMtime;

100
server/libs/retry/index.js Normal file
View file

@ -0,0 +1,100 @@
var RetryOperation = require('./retry_operation');
exports.operation = function(options) {
var timeouts = exports.timeouts(options);
return new RetryOperation(timeouts, {
forever: options && options.forever,
unref: options && options.unref,
maxRetryTime: options && options.maxRetryTime
});
};
exports.timeouts = function(options) {
if (options instanceof Array) {
return [].concat(options);
}
var opts = {
retries: 10,
factor: 2,
minTimeout: 1 * 1000,
maxTimeout: Infinity,
randomize: false
};
for (var key in options) {
opts[key] = options[key];
}
if (opts.minTimeout > opts.maxTimeout) {
throw new Error('minTimeout is greater than maxTimeout');
}
var timeouts = [];
for (var i = 0; i < opts.retries; i++) {
timeouts.push(this.createTimeout(i, opts));
}
if (options && options.forever && !timeouts.length) {
timeouts.push(this.createTimeout(i, opts));
}
// sort the array numerically ascending
timeouts.sort(function(a,b) {
return a - b;
});
return timeouts;
};
exports.createTimeout = function(attempt, opts) {
var random = (opts.randomize)
? (Math.random() + 1)
: 1;
var timeout = Math.round(random * opts.minTimeout * Math.pow(opts.factor, attempt));
timeout = Math.min(timeout, opts.maxTimeout);
return timeout;
};
exports.wrap = function(obj, options, methods) {
if (options instanceof Array) {
methods = options;
options = null;
}
if (!methods) {
methods = [];
for (var key in obj) {
if (typeof obj[key] === 'function') {
methods.push(key);
}
}
}
for (var i = 0; i < methods.length; i++) {
var method = methods[i];
var original = obj[method];
obj[method] = function retryWrapper(original) {
var op = exports.operation(options);
var args = Array.prototype.slice.call(arguments, 1);
var callback = args.pop();
args.push(function(err) {
if (op.retry(err)) {
return;
}
if (err) {
arguments[0] = op.mainError();
}
callback.apply(this, arguments);
});
op.attempt(function() {
original.apply(obj, args);
});
}.bind(obj, original);
obj[method].options = options;
}
};

View file

@ -0,0 +1,158 @@
function RetryOperation(timeouts, options) {
// Compatibility for the old (timeouts, retryForever) signature
if (typeof options === 'boolean') {
options = { forever: options };
}
this._originalTimeouts = JSON.parse(JSON.stringify(timeouts));
this._timeouts = timeouts;
this._options = options || {};
this._maxRetryTime = options && options.maxRetryTime || Infinity;
this._fn = null;
this._errors = [];
this._attempts = 1;
this._operationTimeout = null;
this._operationTimeoutCb = null;
this._timeout = null;
this._operationStart = null;
if (this._options.forever) {
this._cachedTimeouts = this._timeouts.slice(0);
}
}
module.exports = RetryOperation;
RetryOperation.prototype.reset = function() {
this._attempts = 1;
this._timeouts = this._originalTimeouts;
}
RetryOperation.prototype.stop = function() {
if (this._timeout) {
clearTimeout(this._timeout);
}
this._timeouts = [];
this._cachedTimeouts = null;
};
RetryOperation.prototype.retry = function(err) {
if (this._timeout) {
clearTimeout(this._timeout);
}
if (!err) {
return false;
}
var currentTime = new Date().getTime();
if (err && currentTime - this._operationStart >= this._maxRetryTime) {
this._errors.unshift(new Error('RetryOperation timeout occurred'));
return false;
}
this._errors.push(err);
var timeout = this._timeouts.shift();
if (timeout === undefined) {
if (this._cachedTimeouts) {
// retry forever, only keep last error
this._errors.splice(this._errors.length - 1, this._errors.length);
this._timeouts = this._cachedTimeouts.slice(0);
timeout = this._timeouts.shift();
} else {
return false;
}
}
var self = this;
var timer = setTimeout(function() {
self._attempts++;
if (self._operationTimeoutCb) {
self._timeout = setTimeout(function() {
self._operationTimeoutCb(self._attempts);
}, self._operationTimeout);
if (self._options.unref) {
self._timeout.unref();
}
}
self._fn(self._attempts);
}, timeout);
if (this._options.unref) {
timer.unref();
}
return true;
};
RetryOperation.prototype.attempt = function(fn, timeoutOps) {
this._fn = fn;
if (timeoutOps) {
if (timeoutOps.timeout) {
this._operationTimeout = timeoutOps.timeout;
}
if (timeoutOps.cb) {
this._operationTimeoutCb = timeoutOps.cb;
}
}
var self = this;
if (this._operationTimeoutCb) {
this._timeout = setTimeout(function() {
self._operationTimeoutCb();
}, self._operationTimeout);
}
this._operationStart = new Date().getTime();
this._fn(this._attempts);
};
RetryOperation.prototype.try = function(fn) {
console.log('Using RetryOperation.try() is deprecated');
this.attempt(fn);
};
RetryOperation.prototype.start = function(fn) {
console.log('Using RetryOperation.start() is deprecated');
this.attempt(fn);
};
RetryOperation.prototype.start = RetryOperation.prototype.try;
RetryOperation.prototype.errors = function() {
return this._errors;
};
RetryOperation.prototype.attempts = function() {
return this._attempts;
};
RetryOperation.prototype.mainError = function() {
if (this._errors.length === 0) {
return null;
}
var counts = {};
var mainError = null;
var mainErrorCount = 0;
for (var i = 0; i < this._errors.length; i++) {
var error = this._errors[i];
var message = error.message;
var count = (counts[message] || 0) + 1;
counts[message] = count;
if (count >= mainErrorCount) {
mainError = error;
mainErrorCount = count;
}
}
return mainError;
};

View file

@ -0,0 +1,202 @@
// Note: since nyc uses this module to output coverage, any lines
// that are in the direct sync flow of nyc's outputCoverage are
// ignored, since we can never get coverage for them.
// grab a reference to node's real process object right away
var process = global.process
const processOk = function (process) {
return process &&
typeof process === 'object' &&
typeof process.removeListener === 'function' &&
typeof process.emit === 'function' &&
typeof process.reallyExit === 'function' &&
typeof process.listeners === 'function' &&
typeof process.kill === 'function' &&
typeof process.pid === 'number' &&
typeof process.on === 'function'
}
// some kind of non-node environment, just no-op
/* istanbul ignore if */
if (!processOk(process)) {
module.exports = function () {
return function () {}
}
} else {
var assert = require('assert')
var signals = require('./signals.js')
var isWin = /^win/i.test(process.platform)
var EE = require('events')
/* istanbul ignore if */
if (typeof EE !== 'function') {
EE = EE.EventEmitter
}
var emitter
if (process.__signal_exit_emitter__) {
emitter = process.__signal_exit_emitter__
} else {
emitter = process.__signal_exit_emitter__ = new EE()
emitter.count = 0
emitter.emitted = {}
}
// Because this emitter is a global, we have to check to see if a
// previous version of this library failed to enable infinite listeners.
// I know what you're about to say. But literally everything about
// signal-exit is a compromise with evil. Get used to it.
if (!emitter.infinite) {
emitter.setMaxListeners(Infinity)
emitter.infinite = true
}
module.exports = function (cb, opts) {
/* istanbul ignore if */
if (!processOk(global.process)) {
return function () {}
}
assert.equal(typeof cb, 'function', 'a callback must be provided for exit handler')
if (loaded === false) {
load()
}
var ev = 'exit'
if (opts && opts.alwaysLast) {
ev = 'afterexit'
}
var remove = function () {
emitter.removeListener(ev, cb)
if (emitter.listeners('exit').length === 0 &&
emitter.listeners('afterexit').length === 0) {
unload()
}
}
emitter.on(ev, cb)
return remove
}
var unload = function unload () {
if (!loaded || !processOk(global.process)) {
return
}
loaded = false
signals.forEach(function (sig) {
try {
process.removeListener(sig, sigListeners[sig])
} catch (er) {}
})
process.emit = originalProcessEmit
process.reallyExit = originalProcessReallyExit
emitter.count -= 1
}
module.exports.unload = unload
var emit = function emit (event, code, signal) {
/* istanbul ignore if */
if (emitter.emitted[event]) {
return
}
emitter.emitted[event] = true
emitter.emit(event, code, signal)
}
// { <signal>: <listener fn>, ... }
var sigListeners = {}
signals.forEach(function (sig) {
sigListeners[sig] = function listener () {
/* istanbul ignore if */
if (!processOk(global.process)) {
return
}
// If there are no other listeners, an exit is coming!
// Simplest way: remove us and then re-send the signal.
// We know that this will kill the process, so we can
// safely emit now.
var listeners = process.listeners(sig)
if (listeners.length === emitter.count) {
unload()
emit('exit', null, sig)
/* istanbul ignore next */
emit('afterexit', null, sig)
/* istanbul ignore next */
if (isWin && sig === 'SIGHUP') {
// "SIGHUP" throws an `ENOSYS` error on Windows,
// so use a supported signal instead
sig = 'SIGINT'
}
/* istanbul ignore next */
process.kill(process.pid, sig)
}
}
})
module.exports.signals = function () {
return signals
}
var loaded = false
var load = function load () {
if (loaded || !processOk(global.process)) {
return
}
loaded = true
// This is the number of onSignalExit's that are in play.
// It's important so that we can count the correct number of
// listeners on signals, and don't wait for the other one to
// handle it instead of us.
emitter.count += 1
signals = signals.filter(function (sig) {
try {
process.on(sig, sigListeners[sig])
return true
} catch (er) {
return false
}
})
process.emit = processEmit
process.reallyExit = processReallyExit
}
module.exports.load = load
var originalProcessReallyExit = process.reallyExit
var processReallyExit = function processReallyExit (code) {
/* istanbul ignore if */
if (!processOk(global.process)) {
return
}
process.exitCode = code || /* istanbul ignore next */ 0
emit('exit', process.exitCode, null)
/* istanbul ignore next */
emit('afterexit', process.exitCode, null)
/* istanbul ignore next */
originalProcessReallyExit.call(process, process.exitCode)
}
var originalProcessEmit = process.emit
var processEmit = function processEmit (ev, arg) {
if (ev === 'exit' && processOk(global.process)) {
/* istanbul ignore else */
if (arg !== undefined) {
process.exitCode = arg
}
var ret = originalProcessEmit.apply(this, arguments)
/* istanbul ignore next */
emit('exit', process.exitCode, null)
/* istanbul ignore next */
emit('afterexit', process.exitCode, null)
/* istanbul ignore next */
return ret
} else {
return originalProcessEmit.apply(this, arguments)
}
}
}

View file

@ -0,0 +1,53 @@
// This is not the set of all possible signals.
//
// It IS, however, the set of all signals that trigger
// an exit on either Linux or BSD systems. Linux is a
// superset of the signal names supported on BSD, and
// the unknown signals just fail to register, so we can
// catch that easily enough.
//
// Don't bother with SIGKILL. It's uncatchable, which
// means that we can't fire any callbacks anyway.
//
// If a user does happen to register a handler on a non-
// fatal signal like SIGWINCH or something, and then
// exit, it'll end up firing `process.emit('exit')`, so
// the handler will be fired anyway.
//
// SIGBUS, SIGFPE, SIGSEGV and SIGILL, when not raised
// artificially, inherently leave the process in a
// state from which it is not safe to try and enter JS
// listeners.
module.exports = [
'SIGABRT',
'SIGALRM',
'SIGHUP',
'SIGINT',
'SIGTERM'
]
if (process.platform !== 'win32') {
module.exports.push(
'SIGVTALRM',
'SIGXCPU',
'SIGXFSZ',
'SIGUSR2',
'SIGTRAP',
'SIGSYS',
'SIGQUIT',
'SIGIOT'
// should detect profiler and enable/disable accordingly.
// see #21
// 'SIGPROF'
)
}
if (process.platform === 'linux') {
module.exports.push(
'SIGIO',
'SIGPOLL',
'SIGPWR',
'SIGSTKFLT',
'SIGUNUSED'
)
}