453 lines
18 KiB
JavaScript
453 lines
18 KiB
JavaScript
module.exports = {
|
|
|
|
insert: async function (options) {
|
|
const connection = this.parseRequired(options.connection, 'string', 'dbconnector.insert: connection is required.');
|
|
const sql = this.parseSQL(options.sql);
|
|
const db = this.getDbConnection(connection);
|
|
|
|
if (!db) throw new Error(`Connection "${connection}" doesn't exist.`);
|
|
if (!sql) throw new Error('dbconnector.insert: sql is required.');
|
|
if (!sql.table) throw new Error('dbconnector.insert: sql.table is required.');
|
|
|
|
sql.type = 'insert';
|
|
|
|
if (db.client == 'couchdb') {
|
|
const doc = {};
|
|
|
|
for (const value of sql.values) {
|
|
doc[value.column] = value.value;
|
|
}
|
|
|
|
const result = await db.insert(doc, sql.table + '/' + Date.now());
|
|
|
|
if (result.ok) {
|
|
return { affected: 1, identity: result.id };
|
|
} else {
|
|
//throw new Error('dbconnector.insert: error inserting document into couchdb.');
|
|
return { affected: 0 };
|
|
}
|
|
}
|
|
|
|
if (options.test) {
|
|
return {
|
|
options: options,
|
|
query: sql.toString()
|
|
};
|
|
}
|
|
|
|
if (sql.sub) {
|
|
return db.transaction(async trx => {
|
|
// TODO: test how identity is returned for each database
|
|
// main insert, returns inserted id
|
|
const [identity] = (await trx.fromJSON(sql)).map(value => value[sql.returning] || value);
|
|
|
|
// loop sub (relation table)
|
|
for (let { table, key, value, values } of Object.values(sql.sub)) {
|
|
if (!Array.isArray(value)) break;
|
|
|
|
for (const current of value) {
|
|
if (typeof current == 'object') {
|
|
current[key] = identity;
|
|
await trx(table).insert(current);
|
|
} else {
|
|
if (values.length != 1) throw new Error('Invalid value mapping');
|
|
await trx(table).insert({
|
|
[key]: identity,
|
|
[values[0].column]: current
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
return { affected: 1, identity };
|
|
});
|
|
}
|
|
|
|
let identity = await db.fromJSON(sql);
|
|
|
|
if (identity) {
|
|
if (Array.isArray(identity)) {
|
|
identity = identity[0];
|
|
}
|
|
|
|
if (typeof identity == 'object') {
|
|
identity = identity[Object.keys(identity)[0]];
|
|
}
|
|
}
|
|
|
|
return { affected: 1, identity };
|
|
},
|
|
|
|
update: async function (options) {
|
|
const connection = this.parseRequired(options.connection, 'string', 'dbconnector.update: connection is required.');
|
|
const sql = this.parseSQL(options.sql);
|
|
const db = this.getDbConnection(connection);
|
|
|
|
if (!db) throw new Error(`Connection "${connection}" doesn't exist.`);
|
|
if (!sql) throw new Error('dbconnector.update: sql is required.');
|
|
if (!sql.table) throw new Error('dbconnector.update: sql.table is required.');
|
|
|
|
sql.type = 'update';
|
|
|
|
if (db.client == 'couchdb') {
|
|
let { rows } = await db.list({ include_docs: true, startkey: sql.table + '/', endkey: sql.table + '0' });
|
|
|
|
rows = rows.map(row => row.doc);
|
|
|
|
if (sql.wheres) {
|
|
const validate = (row, rule) => {
|
|
if (rule.operator) {
|
|
let a = row[rule.data.column];
|
|
let b = rule.value;
|
|
|
|
switch (rule.operator) {
|
|
case 'equal': return a == b;
|
|
case 'not_equal': return a != b;
|
|
case 'in': return b.includes(a);
|
|
case 'not_in': return !b.includes(a);
|
|
case 'less': return a < b;
|
|
case 'less_or_equal': return a <= b;
|
|
case 'greater': return a > b;
|
|
case 'greater_or_equal': return a >= b;
|
|
case 'between': return b[0] <= a <= b[1];
|
|
case 'not_between': return !(b[0] <= a <= b[1]);
|
|
case 'begins_with': return String(a).startsWith(String(b));
|
|
case 'not_begins_with': return !String(a).startsWith(String(b));
|
|
case 'contains': return String(a).includes(String(b));
|
|
case 'not_contains': return !String(a).includes(String(b));
|
|
case 'ends_with': return String(a).endsWith(String(b));
|
|
case 'not_ends_with': return !String(a).endsWith(String(b));
|
|
case 'is_empty': return a == null || a == '';
|
|
case 'is_not_empty': return a != null && a != '';
|
|
case 'is_null': return a == null;
|
|
case 'is_not_null': return a != null;
|
|
}
|
|
}
|
|
|
|
if (rule.condition && rule.rules.length) {
|
|
for (const _rule of rule.rules) {
|
|
const valid = validate(row, _rule);
|
|
if (!valid && rule.condition == 'AND') return false;
|
|
if (valid && rule.condition == 'OR') return true;
|
|
}
|
|
|
|
return rule.condition == 'OR' ? false : true;
|
|
}
|
|
|
|
return true;
|
|
};
|
|
|
|
rows = rows.filter(row => {
|
|
return validate(row, sql.wheres);
|
|
});
|
|
}
|
|
|
|
let result = []
|
|
if (rows.length) {
|
|
result = await db.bulk({
|
|
docs: rows.map(doc => {
|
|
for (const value of sql.values) {
|
|
doc[value.column] = value.value;
|
|
}
|
|
return doc;
|
|
})
|
|
});
|
|
}
|
|
|
|
return { affected: Array.isArray(result) ? result.filter(result => result.ok).length : rows.length };
|
|
}
|
|
|
|
if (options.test) {
|
|
return {
|
|
options: options,
|
|
query: sql.toString()
|
|
};
|
|
}
|
|
|
|
if (sql.sub) {
|
|
return db.transaction(async trx => {
|
|
let updated = await trx.fromJSON(sql);
|
|
|
|
if (!Array.isArray(updated)) {
|
|
// check if is single update
|
|
const single = (
|
|
sql.wheres &&
|
|
sql.wheres.rules &&
|
|
sql.wheres.rules.length == 1 &&
|
|
sql.wheres.rules[0].field == sql.returning &&
|
|
sql.wheres.rules[0].operation == '='
|
|
);
|
|
|
|
if (single) {
|
|
// get id from where condition
|
|
updated = [sql.wheres.rules[0].value];
|
|
} else {
|
|
// create a select with same where conditions
|
|
updated = await trx.fromJSON({
|
|
...sql,
|
|
type: 'select',
|
|
columns: [sql.returning]
|
|
});
|
|
|
|
updated = updated.map(value => value[sql.returning]);
|
|
}
|
|
} else {
|
|
updated = updated.map(value => value[sql.returning]);
|
|
}
|
|
|
|
// loop sub
|
|
for (let { table, key, value, values } of Object.values(sql.sub)) {
|
|
if (!Array.isArray(value)) continue;
|
|
|
|
// delete old related data first
|
|
await trx(table).whereIn(key, updated).del();
|
|
|
|
// for each updated item
|
|
for (const identity of updated) {
|
|
// insert value
|
|
for (const current of value) {
|
|
if (typeof current == 'object') {
|
|
current[key] = identity;
|
|
await trx(table).insert(current);
|
|
} else {
|
|
if (values.length != 1) throw new Error('Invalid value mapping');
|
|
await trx(table).insert({
|
|
[key]: identity,
|
|
[values[0].column]: current
|
|
});
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return { affected: updated.length };
|
|
});
|
|
}
|
|
|
|
let affected = await db.fromJSON(sql);
|
|
|
|
return { affected };
|
|
},
|
|
|
|
delete: async function (options) {
|
|
const connection = this.parseRequired(options.connection, 'string', 'dbconnector.delete: connection is required.');
|
|
const sql = this.parseSQL(options.sql);
|
|
|
|
const db = this.getDbConnection(connection);
|
|
|
|
if (!db) throw new Error(`Connection "${connection}" doesn't exist.`);
|
|
if (!sql) throw new Error('dbconnector.delete: sql is required.');
|
|
if (!sql.table) throw new Error('dbconnector.delete: sql.table is required.');
|
|
|
|
sql.type = 'del';
|
|
|
|
if (db.client == 'couchdb') {
|
|
|
|
let { rows } = await db.list({ include_docs: true, startkey: sql.table + '/', endkey: sql.table + '0' });
|
|
|
|
rows = rows.map(row => row.doc);
|
|
|
|
if (sql.wheres) {
|
|
const validate = (row, rule) => {
|
|
if (rule.operator) {
|
|
let a = row[rule.data.column];
|
|
let b = rule.value;
|
|
|
|
switch (rule.operator) {
|
|
case 'equal': return a == b;
|
|
case 'not_equal': return a != b;
|
|
case 'in': return b.includes(a);
|
|
case 'not_in': return !b.includes(a);
|
|
case 'less': return a < b;
|
|
case 'less_or_equal': return a <= b;
|
|
case 'greater': return a > b;
|
|
case 'greater_or_equal': return a >= b;
|
|
case 'between': return b[0] <= a <= b[1];
|
|
case 'not_between': return !(b[0] <= a <= b[1]);
|
|
case 'begins_with': return String(a).startsWith(String(b));
|
|
case 'not_begins_with': return !String(a).startsWith(String(b));
|
|
case 'contains': return String(a).includes(String(b));
|
|
case 'not_contains': return !String(a).includes(String(b));
|
|
case 'ends_with': return String(a).endsWith(String(b));
|
|
case 'not_ends_with': return !String(a).endsWith(String(b));
|
|
case 'is_empty': return a == null || a == '';
|
|
case 'is_not_empty': return a != null && a != '';
|
|
case 'is_null': return a == null;
|
|
case 'is_not_null': return a != null;
|
|
}
|
|
}
|
|
|
|
if (rule.condition && rule.rules.length) {
|
|
for (const _rule of rule.rules) {
|
|
const valid = validate(row, _rule);
|
|
if (!valid && rule.condition == 'AND') return false;
|
|
if (valid && rule.condition == 'OR') return true;
|
|
}
|
|
|
|
return rule.condition == 'OR' ? false : true;
|
|
}
|
|
|
|
return true;
|
|
};
|
|
|
|
rows = rows.filter(row => {
|
|
return validate(row, sql.wheres);
|
|
});
|
|
}
|
|
|
|
let result = []
|
|
if (rows.length) {
|
|
result = await db.bulk({
|
|
docs: rows.map(doc => {
|
|
doc._deleted = true;
|
|
return doc;
|
|
})
|
|
});
|
|
}
|
|
|
|
return { affected: Array.isArray(result) ? result.filter(result => result.ok).length : rows.length };
|
|
}
|
|
|
|
if (options.test) {
|
|
return {
|
|
options: options,
|
|
query: sql.toString()
|
|
};
|
|
}
|
|
|
|
if (sql.sub) {
|
|
return db.transaction(async trx => {
|
|
const deleted = (await trx.fromJSON(sql)).map(value => value[sql.returning] || value);
|
|
|
|
// loop sub
|
|
for (let { table, key } of Object.values(sql.sub)) {
|
|
// delete related data
|
|
await trx(table).whereIn(key, deleted).del();
|
|
}
|
|
|
|
return { affected: deleted.length };
|
|
});
|
|
}
|
|
|
|
let affected = await db.fromJSON(sql);
|
|
|
|
return { affected };
|
|
},
|
|
|
|
custom: async function (options) {
|
|
const connection = this.parseRequired(options.connection, 'string', 'dbupdater.custom: connection is required.');
|
|
const sql = this.parseSQL(options.sql);
|
|
const db = this.getDbConnection(connection);
|
|
|
|
if (!db) throw new Error(`Connection "${connection}" doesn't exist.`);
|
|
if (!sql) throw new Error('dbconnector.custom: sql is required.');
|
|
if (typeof sql.query != 'string') throw new Error('dbupdater.custom: sql.query is required.');
|
|
if (!Array.isArray(sql.params)) throw new Error('dbupdater.custom: sql.params is required.');
|
|
|
|
if (db.client == 'couchdb') {
|
|
throw new Error('dbupdater.custom: couchdb is not supported.');
|
|
}
|
|
|
|
const params = [];
|
|
const query = sql.query.replace(/([:@][a-zA-Z_]\w*|\?)/g, param => {
|
|
if (param == '?') {
|
|
params.push(sql.params[params.length].value);
|
|
return '?';
|
|
}
|
|
|
|
let p = sql.params.find(p => p.name == param);
|
|
if (p) {
|
|
params.push(p.value);
|
|
return '?';
|
|
}
|
|
|
|
return param;
|
|
});
|
|
|
|
let results = await db.raw(query, params);
|
|
|
|
if (db.client.config.client == 'mysql' || db.client.config.client == 'mysql2') {
|
|
results = results[0];
|
|
} else if (db.client.config.client == 'postgres' || db.client.config.client == 'redshift') {
|
|
results = results.rows;
|
|
}
|
|
|
|
return results;
|
|
},
|
|
|
|
execute: async function (options) {
|
|
const connection = this.parseRequired(options.connection, 'string', 'dbupdater.execute: connection is required.');
|
|
const query = this.parseRequired(options.query, 'string', 'dbupdater.execute: query is required.');
|
|
const params = this.parseOptional(options.params, 'object', []);
|
|
const db = this.getDbConnection(connection);
|
|
|
|
if (!db) throw new Error(`Connection "${connection}" doesn't exist.`);
|
|
|
|
if (db.client == 'couchdb') {
|
|
throw new Error('dbupdater.execute: couchdb is not supported.');
|
|
}
|
|
|
|
let results = await db.raw(query, params);
|
|
|
|
if (db.client.config.client == 'mysql' || db.client.config.client == 'mysql2') {
|
|
results = results[0];
|
|
} else if (db.client.config.client == 'postgres' || db.client.config.client == 'redshift') {
|
|
results = results.rows;
|
|
}
|
|
|
|
return results;
|
|
},
|
|
|
|
// bulk insert
|
|
bulkinsert: async function (options) {
|
|
const connection = this.parseRequired(options.connection, 'string', 'dbupdater.bulkinsert: connection is required.');
|
|
const sql = this.parseSQL(options.sql);
|
|
const source = this.parseRequired(options.source, 'object', 'dbupdater.bulkinsert: source is required.');
|
|
const batchsize = this.parseOptional(options.batchsize, 'number', 100);
|
|
const db = this.getDbConnection(connection);
|
|
|
|
if (db.client == 'couchdb') {
|
|
throw new Error('dbupdater.bulkinsert: couchdb is not supported.');
|
|
}
|
|
|
|
return await db.transaction(async transaction => {
|
|
for (let i = 0; i < source.length; i += batchsize) {
|
|
let batch = source.slice(i, i + batchsize).map(data => {
|
|
let values = {};
|
|
|
|
for (let value of sql.values) {
|
|
if (value.type == 'json') {
|
|
values[value.column] = JSON.stringify(data[value.value]);
|
|
} else {
|
|
values[value.column] = data[value.value];
|
|
}
|
|
}
|
|
|
|
return values;
|
|
});
|
|
|
|
await transaction(sql.table.name || sql.table).insert(batch);
|
|
}
|
|
|
|
return { affected: source.length };
|
|
});
|
|
},
|
|
|
|
transaction: async function (options) {
|
|
const connection = this.parseRequired(options.connection, 'string', 'dbupdater.transaction: connection is required.');
|
|
//const exec = this.parseRequired(options.exec, 'object', 'dbupdater.transaction: exec is required.');
|
|
const db = this.getDbConnection(connection);
|
|
|
|
if (db.client == 'couchdb') {
|
|
throw new Error('dbupdater.transaction: couchdb is not supported.');
|
|
}
|
|
|
|
return await db.transaction(async trx => {
|
|
this.trx[connection] = trx;
|
|
return await this.exec(options.exec, true).finally(() => {
|
|
this.trx[connection] = null;
|
|
});
|
|
});
|
|
},
|
|
|
|
}; |