blog/lib/rx-utils.ts

48 lines
1.5 KiB
TypeScript

import { from, mergeMap, Observable, Subscriber } from "rxjs";
import { dirname, join } from "node:path";
import { existsSync } from "node:fs";
import { mkdir, writeFile } from "node:fs/promises";
import { Glob } from "glob";
import { VFile } from "vfile";
import { read } from "to-vfile";
export function dest(prefix: string) {
return async (file: VFile) => {
const actualPath = join(prefix, file.path);
if (!existsSync(dirname(actualPath))) {
await mkdir(dirname(actualPath), { recursive: true });
}
await writeFile(actualPath, file.value);
console.log("[-] Written", actualPath);
return file;
};
}
export function onComplete<T>(f: (sink: Subscriber<T>) => Promise<void>) {
return (observable: Observable<T>) =>
new Observable<T>((subscriber) =>
observable.subscribe({
next(value) {
subscriber.next(value);
},
error(err) {
subscriber.error(err);
},
complete() {
f(subscriber);
subscriber.complete();
},
})
);
}
const loadFile = (path: string): Promise<VFile> =>
read(join(Deno.cwd(), path)).then((vfile) => {
vfile.path = path;
return vfile;
});
export const fromGlob = (paths: string | string[]): Observable<VFile> =>
from(new Glob(paths, {})).pipe(mergeMap(loadFile));