fireactions/agent/index.js
2023-06-04 13:08:11 -03:00

185 lines
5.3 KiB
JavaScript

import { nanoid } from "nanoid";
// import { WebSocketServer } from "ws";
import { execFile as _execFile, spawn } from "node:child_process";
import { open, readFile, rm } from "node:fs/promises";
import { createServer } from "node:http";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { promisify } from "node:util";
const execFile = promisify(_execFile);
// const spawn = promisify(_spawn);
const secret = await parseSecret();
// const wsServer = new WebSocketServer({ noServer: true });
const server = createServer(async (req, res) => {
try {
await handler(req, res);
} catch (error) {
console.error("Error manejando", req.url, error);
res.destroy();
}
});
/** @type {Map<string, {listeners: ((data: Buffer | null) => void)[], prev: Buffer, statusCode: null | number}>} */
let runs = new Map();
// server.on("upgrade", (req, socket, head) => {
// const url = getUrl(req)
// let matches;
// if ((matches = url.pathname.match(runWebSocketRegexp))) {
// const runId = matches[1];
// if (!listeners.has(runId)) {
// return
// }
// wsServer.handleUpgrade(req, socket,head,(client,request)=>{
// })
// }
// });
/**
* @param {string} runId
* @param {Buffer} chunk
*/
function writeToRun(runId, chunk) {
const run = runs.get(runId);
if (!run) throw new Error(`runId ${runId} doesn't exist`);
run.prev = Buffer.concat([run.prev, chunk]);
for (const listener of run.listeners) {
listener(chunk);
}
}
/**
* @param {string} runId
*/
function closeRun(runId) {
const run = runs.get(runId);
if (!run) throw new Error(`runId ${runId} doesn't exist`);
for (const listener of run.listeners) {
listener(null);
}
}
const runWebSocketRegexp = /^\/run\/(.+)$/;
/**
* @param {import("http").IncomingMessage} req
* @param {import("http").ServerResponse<import("http").IncomingMessage> & { req: import("http").IncomingMessage; }} res
*/
async function handler(req, res) {
// prettier-ignore
const url = getUrl(req);
auth(secret, req, res);
let matches;
if (url.pathname === "/hello") {
if (req.method === "GET") {
return res.writeHead(200).end("Ey!");
} else return res.writeHead(405).end("method not allowed");
} else if (url.pathname === "/run") {
if (req.method === "POST") {
const runId = nanoid();
runs.set(runId, {
listeners: [],
prev: Buffer.from([]),
statusCode: null,
});
const execPath = join(tmpdir(), "fireactions-agent-exec-" + nanoid());
{
const execFile = await open(execPath, "w", 0o700);
await new Promise((resolve, reject) =>
req.pipe(execFile.createWriteStream()).addListener("finish", () => {
resolve(void 0);
})
);
}
const proc = spawn(execPath, [], {});
proc.stdout.on("data", async (/** @type {Buffer} */ chunk) => {
writeToRun(runId, chunk);
});
proc.stderr.on("data", async (/** @type {Buffer} */ chunk) => {
writeToRun(runId, chunk);
});
proc.on("close", async (code) => {
const run = runs.get(runId);
if (!run) return panic();
runs.set(runId, { ...run, statusCode: code });
closeRun(runId);
await rm(execPath);
});
return res.writeHead(200).end(JSON.stringify({ runId }));
} else return res.writeHead(405).end("method not allowed");
} else if ((matches = url.pathname.match(runWebSocketRegexp))) {
const runId = matches[1];
const run = runs.get(runId);
if (!run) {
return res.writeHead(404).end("run not found");
}
res.writeHead(200, {
"content-type": "text/event-stream",
Connection: "keep-alive",
"Cache-Control": "no-cache",
"Access-Control-Allow-Origin": "*",
});
const listener = (/** @type {Buffer | null} */ data) => {
if (data === null) return res.end();
res.write(
"event: stdsmth\n" + `data: ${JSON.stringify(data.toString())}\n\n`
);
};
listener(run.prev);
// TODO: remove listener when request closed
runs.set(runId, { ...run, listeners: [...run.listeners, listener] });
} else {
return res.writeHead(404).end("not found");
}
}
/**
* @param {import("http").IncomingMessage} req
*/
function getUrl(req) {
// prettier-ignore
const urlString = /** @type {string} */(req.url);
const url = new URL(urlString, `http://${req.headers.host}`);
return url;
}
/**
* @returns {Promise<string>}
*/
async function parseSecret() {
const cmdline = await readFile("/proc/cmdline", "utf-8");
for (const opt of cmdline.split(" ")) {
const [key, value] = opt.split("=");
if (key === "fireactions.secret") {
if (value.length < 5) throw new Error("valor muy corto");
return value;
}
}
throw new Error("no hay secreto");
}
/**
* @param {string} secret
* @param {import("http").IncomingMessage} req
* @param {import("http").ServerResponse<import("http").IncomingMessage> & { req: import("http").IncomingMessage; }} res
*/
function auth(secret, req, res) {
const url = getUrl(req);
const auth = req.headers.authorization
? req.headers.authorization.slice("Bearer ".length)
: url.searchParams.get("token");
if (auth !== secret) {
res.writeHead(401).end("wrong secret");
throw new Error("unauthorized");
}
}
server.listen("8080");
function panic() {
throw new Error("Function not implemented.");
}