big update - fully functional
This commit is contained in:
@@ -12,4 +12,8 @@ export interface ISOSnapshot extends Snapshot {
|
||||
|
||||
export interface UnixTimeSnapshot extends Snapshot {
|
||||
time: number,
|
||||
}
|
||||
}
|
||||
|
||||
export type SnapshotAttrTimeseries = Int32Array;
|
||||
|
||||
export type ClimateDataType = "temp" | "humidity" | "co2";
|
||||
@@ -1,5 +1,5 @@
|
||||
import {Connection, ResultSetHeader, RowDataPacket} from "mysql2/promise";
|
||||
import {Snapshot, ISOSnapshot} from "./Snapshot";
|
||||
import {Snapshot, ISOSnapshot, SnapshotAttrTimeseries, ClimateDataType} from "./Snapshot";
|
||||
import {isValidDatetime, toISOTime, toMySQLDatetime, toUnixTime} from "./utils";
|
||||
import {DatabaseConnection, tryQuery} from "./database";
|
||||
|
||||
@@ -52,6 +52,25 @@ class SnapshotCollection {
|
||||
});
|
||||
}
|
||||
|
||||
async getTimeseriesBytestreamSince(dataType: ClimateDataType, timeSince: number | string): Promise<SnapshotAttrTimeseries> {
|
||||
timeSince = toMySQLDatetime(timeSince);
|
||||
return tryQuery(async () => {
|
||||
const query = `SELECT \`id\`, DATE_FORMAT(\`time\`, '%Y-%m-%dT%TZ') \`time\`, \`${dataType}\` FROM \`snapshots\` WHERE TIMESTAMPDIFF(SECOND, \`time\`, ?) < 0 ORDER BY \`id\` ASC;`;
|
||||
const result = await this.db.query(query, [timeSince]);
|
||||
return SnapshotCollection.rowsToTimeseries(dataType, ...result[0] as RowDataPacket[]);
|
||||
});
|
||||
}
|
||||
|
||||
async getTimeseriesBytestreamInRange(dataType: ClimateDataType, start: number | string, stop: number | string): Promise<SnapshotAttrTimeseries> {
|
||||
start = toMySQLDatetime(start);
|
||||
stop = toMySQLDatetime(stop);
|
||||
return tryQuery(async () => {
|
||||
const query = `SELECT \`id\`, DATE_FORMAT(\`time\`, '%Y-%m-%dT%TZ') \`time\`, \`${dataType}\` FROM \`snapshots\` WHERE \`time\` BETWEEN ? AND ? ORDER BY \`id\` ASC;`;
|
||||
const result = await this.db.query(query, [start, stop]);
|
||||
return SnapshotCollection.rowsToTimeseries(dataType, ...result[0] as RowDataPacket[]);
|
||||
});
|
||||
}
|
||||
|
||||
static toUnixTime<T extends {time: string | number}>(...snapshots: T[]): (T & {time: number})[] {
|
||||
return snapshots.map(s => ({...s, time: toUnixTime(s.time)}));
|
||||
}
|
||||
@@ -61,7 +80,7 @@ class SnapshotCollection {
|
||||
}
|
||||
|
||||
private static toMySQLRows(...snapshots: Omit<Snapshot, "id">[]): (number | string | Date)[][] {
|
||||
return snapshots.map(s => [new Date(s.time), s.co2, s.humidity, s.temp]);
|
||||
return snapshots.map(s => [toMySQLDatetime(s.time), s.co2, s.humidity, s.temp]);
|
||||
}
|
||||
|
||||
static isSubmissibleSnapshot(potentialSnapshot: Record<string, unknown>): potentialSnapshot is Omit<Snapshot, "id"> {
|
||||
@@ -72,6 +91,15 @@ class SnapshotCollection {
|
||||
|| typeof potentialSnapshot.time === "string" && isValidDatetime(potentialSnapshot.time));
|
||||
}
|
||||
|
||||
private static rowsToTimeseries(dataType: ClimateDataType, ...rows: RowDataPacket[]): SnapshotAttrTimeseries {
|
||||
const timeseries = new Int32Array(rows.length * 2);
|
||||
for (let i = 0; i < rows.length; i++) {
|
||||
timeseries[i * 2] = Number(rows[i][dataType]);
|
||||
timeseries[i * 2 + 1] = toUnixTime(rows[i].time) / 1000;
|
||||
}
|
||||
return timeseries;
|
||||
}
|
||||
|
||||
private static rowsToSnapshots(...rows: RowDataPacket[]): ISOSnapshot[] {
|
||||
return rows.map(row => ({
|
||||
id: row.id,
|
||||
|
||||
52
server/src/byteSeriesRouter.ts
Normal file
52
server/src/byteSeriesRouter.ts
Normal file
@@ -0,0 +1,52 @@
|
||||
import {Router} from "express";
|
||||
import {CollectionRegistry} from "./Collections";
|
||||
import {ClayPIError} from "./errors";
|
||||
import {ClimateDataType, SnapshotAttrTimeseries} from "./Snapshot";
|
||||
|
||||
function newByteSeriesRouter(collections: CollectionRegistry) {
|
||||
const router = Router();
|
||||
|
||||
router.get("/:dataType", async (req, res) => {
|
||||
const query = req.query as Record<string, string>;
|
||||
const isMinutesQuery = typeof query["last-minutes"] !== "undefined" && !query.from && !query.to;
|
||||
const isFromToQuery = typeof query.from !== "undefined";
|
||||
const dataType = req.params.dataType;
|
||||
if (!isValidDataType(dataType)) {
|
||||
throw new ClayPIError(`The parameter 'data-type' must be one of the following: 'temp', 'humidity', 'co2'. Got: ${dataType}`);
|
||||
}
|
||||
let timeseries: SnapshotAttrTimeseries;
|
||||
if (!isMinutesQuery && !isFromToQuery) {
|
||||
if (query.to) {
|
||||
throw new ClayPIError("The parameter 'to' must always be accompanied by a 'from'.");
|
||||
}
|
||||
timeseries = await collections.snapshots.getTimeseriesBytestreamSince(dataType, new Date().getTime() - 60 * 60000);
|
||||
} else if (isMinutesQuery) {
|
||||
const lastMinutes = Math.floor(Number(query["last-minutes"]));
|
||||
if (isNaN(lastMinutes)) {
|
||||
throw new ClayPIError("The parameter 'last-minutes' must be a number.");
|
||||
} else {
|
||||
timeseries = await collections.snapshots.getTimeseriesBytestreamSince(dataType, new Date().getTime() - lastMinutes * 60000);
|
||||
}
|
||||
} else if (isFromToQuery) {
|
||||
const timeFrom = isNaN(Number(query.from)) ? query.from : Number(query.from);
|
||||
const timeTo = isNaN(Number(query.to)) ? query.to : Number(query.to);
|
||||
if (timeTo) {
|
||||
timeseries = await collections.snapshots.getTimeseriesBytestreamInRange(dataType, timeFrom, timeTo);
|
||||
} else {
|
||||
timeseries = await collections.snapshots.getTimeseriesBytestreamSince(dataType, timeFrom);
|
||||
}
|
||||
} else {
|
||||
throw new ClayPIError("Malformed request.");
|
||||
}
|
||||
res.type("application/octet-stream");
|
||||
res.end(Buffer.from(timeseries.buffer), "binary");
|
||||
});
|
||||
|
||||
return router;
|
||||
}
|
||||
|
||||
function isValidDataType(dataType: string | undefined): dataType is ClimateDataType {
|
||||
return typeof dataType !== "undefined" && (dataType === "temp" || dataType === "humidity" || dataType === "co2");
|
||||
}
|
||||
|
||||
export default newByteSeriesRouter;
|
||||
@@ -2,15 +2,18 @@ import express from "express";
|
||||
import {ClayPIError, GenericPersistenceError} from "./errors";
|
||||
import newSnapshotRouter from "./snapshotRouter";
|
||||
import {CollectionRegistry} from "./Collections";
|
||||
import newByteSeriesRouter from "./byteSeriesRouter";
|
||||
|
||||
export function newMainRouter(collections: CollectionRegistry) {
|
||||
const router = express.Router();
|
||||
const snapshotRouter = newSnapshotRouter(collections);
|
||||
const byteSeriesRouter = newByteSeriesRouter(collections);
|
||||
|
||||
router.get("/dashboard", (req, res) => {
|
||||
res.render("index.ejs", { rootUrl: req.app.locals.rootUrl });
|
||||
});
|
||||
router.use("/api/snapshots", snapshotRouter);
|
||||
router.use("/api/timeseries", byteSeriesRouter);
|
||||
router.use(topLevelErrorHandler);
|
||||
|
||||
return router;
|
||||
|
||||
@@ -6,7 +6,7 @@ import {ClayPIError} from "./errors";
|
||||
|
||||
async function pingSensors(): Promise<Omit<ISOSnapshot, "id">> {
|
||||
try {
|
||||
const process = await exec(`python3 ${path.resolve(__dirname + "/../scripts/pinger-test.py")}`);
|
||||
const process = await exec(`python3 ${path.resolve(__dirname + "/../scripts/climate-pinger.py")}`);
|
||||
const result = process.stdout;
|
||||
const snapshotArray = result.split("\t").map(piece => piece.trim());
|
||||
return {
|
||||
|
||||
@@ -3,7 +3,7 @@ import SnapshotCollection from "./SnapshotCollection";
|
||||
import express, {Router} from "express";
|
||||
import {CollectionRegistry} from "./Collections";
|
||||
import {ClayPIError} from "./errors";
|
||||
import {toMySQLDatetime} from "./utils";
|
||||
import {unixTimeParamMiddleware} from "./utils";
|
||||
|
||||
function newSnapshotRouter(collections: CollectionRegistry) {
|
||||
const router = Router();
|
||||
@@ -80,14 +80,4 @@ function newSnapshotRouter(collections: CollectionRegistry) {
|
||||
return router;
|
||||
}
|
||||
|
||||
const unixTimeParamMiddleware: express.Handler = (req, res, next) => {
|
||||
const timeFormat = req.query.timeFormat;
|
||||
if (typeof timeFormat !== "undefined" && timeFormat !== "iso" && timeFormat !== "unix") {
|
||||
throw new ClayPIError("Parameter 'timeFormat' must be either 'iso' or 'unix'");
|
||||
} else {
|
||||
res.locals.timeFormat = timeFormat;
|
||||
next();
|
||||
}
|
||||
};
|
||||
|
||||
export default newSnapshotRouter;
|
||||
@@ -1,4 +1,5 @@
|
||||
import {DataValidationError} from "./errors";
|
||||
import {ClayPIError, DataValidationError} from "./errors";
|
||||
import express from "express";
|
||||
|
||||
export function toMySQLDatetime(datetime: number | string) {
|
||||
try {
|
||||
@@ -23,4 +24,14 @@ export function toUnixTime(datetime: string | number) {
|
||||
|
||||
export function toISOTime(datetime: string | number) {
|
||||
return new Date(datetime).toISOString();
|
||||
}
|
||||
}
|
||||
|
||||
export const unixTimeParamMiddleware: express.Handler = (req, res, next) => {
|
||||
const timeFormat = req.query.timeFormat;
|
||||
if (typeof timeFormat !== "undefined" && timeFormat !== "iso" && timeFormat !== "unix") {
|
||||
throw new ClayPIError("Parameter 'timeFormat' must be either 'iso' or 'unix'");
|
||||
} else {
|
||||
res.locals.timeFormat = timeFormat;
|
||||
next();
|
||||
}
|
||||
};
|
||||
Reference in New Issue
Block a user