blog/lib/rx-utils.ts

43 lines
1.4 KiB
TypeScript
Raw Normal View History

2024-06-26 02:21:31 +00:00
import { Observable, from, map, mergeAll } from "rxjs";
import File from "vinyl";
import { Glob } from "glob";
import { readFile } from "node:fs/promises";
export function src(glob: string | string[]): Observable<File> {
return from(new Glob(glob, {})).pipe(
map(async (path) => new File({ path, contents: await readFile(path) })),
map(from),
mergeAll(),
);
}
export function synchronise<T>() {
return (observable: Observable<Promise<T>>): Observable<T> =>
new Observable<T>((subscriber) => {
const promiseArray = [];
let done = false;
observable.subscribe({
next(value) {
const promise = value.then((v) => {
const i = promiseArray.findIndex((i) => i == promise);
promiseArray.splice(i, 1);
subscriber.next(v);
if (promiseArray.length === 0 && done) {
subscriber.complete();
}
});
promiseArray.push(promise);
},
complete() {
done = true;
if (promiseArray.length === 0) {
console.log("[synchronise] complete (from complete)");
subscriber.complete();
}
},
});
});
}