blog/lib/rx-utils.ts

66 lines
2.2 KiB
TypeScript

import { Observable, from, map, mergeAll } from "rxjs";
import File from "vinyl";
import { Glob } from "glob";
import { readFile } from "node:fs/promises";
import { join, dirname } from "node:path";
import { existsSync, mkdirSync, writeFileSync } from "node:fs";
export function src(glob: string | string[]): Observable<File> {
return from(new Glob(glob, {})).pipe(
map(async (path) => new File({ path, contents: await readFile(path) })),
map(from),
mergeAll(),
);
}
export function dist(prefix: string) {
return (observable: Observable<File>): Observable<File> =>
new Observable((subscriber) =>
observable.subscribe({
next(value) {
const actualPath = join(prefix, value.path);
if (!existsSync(dirname(actualPath))) {
mkdirSync(dirname(actualPath), { recursive: true });
}
writeFileSync(actualPath, value.contents as Buffer);
console.log("[-] Written", actualPath);
subscriber.next(value);
},
complete() {
subscriber.complete();
},
}),
);
}
export function synchronise<T>() {
return (observable: Observable<Promise<T>>): Observable<T> =>
new Observable<T>((subscriber) => {
const promiseArray = [];
let done = false;
observable.subscribe({
next(value) {
const promise = value.then((v) => {
const i = promiseArray.findIndex((i) => i == promise);
promiseArray.splice(i, 1);
subscriber.next(v);
if (promiseArray.length === 0 && done) {
subscriber.complete();
}
});
promiseArray.push(promise);
},
complete() {
done = true;
if (promiseArray.length === 0) {
console.log("[synchronise] complete (from complete)");
subscriber.complete();
}
},
});
});
}