Article: Gulp to RxJS

This commit is contained in:
Thibault “Adædra” Hamel 2024-06-27 19:40:12 +02:00
parent 8ad05aae80
commit 66c4e1ce4b
5 changed files with 295 additions and 10 deletions

View File

@ -0,0 +1,287 @@
= RxJS as a Gulp replacement
:docdate: 2024-06-27
:description: Gulps file streams are nice, but we can reproduce them rather easily using RxJS streams.
:keywords: javascript, typescript, rxjs, deno
When writing the tooling to generate this blog, I picked up Gulp as a task executor/file processor. However, with the development, I had some issues, including incompatibility with Deno that I wanted to move onto. Gulp streams reminded me of another library I used in the past, RxJS. I decided to see how I could use it to replace Gulp with a bit of custom code, and it was rather easy.
It was a nice practical exercise, and made me learn more about how RxJS works. I focus myself here on the file processing aspect, but things shown here are valid for RxJS in other workflows too. On the other hand, I will be focusing more on the file processing/stream aspect here than the task runner aspect of Gulp, as RxJS is not relevant for that part.
[.note]
As this was made to move into Deno, this article is all based on Deno. Deno is a new JavaScript runner that bundles a lot of nice features like built-in TypeScript support. While it should be mostly compatible with the classical NodeJS environment, there will be some adjustments to do to match with it, including using a `+package.json+` instead of using URLs in `+import+` statements, and some library mappings from `+@std+` to their node equivalent.
== The basics
Gulp is a JavaScript tool that allows to write and execute tasks to transform files into others by applying a series of stream transformations. Tasks basically look like that:
[source,ts]
----
export const transform = src("**/*.js").pipe(someTransformer()).dest("dist");
----
Gulp is leveraging object streams of NodeJS, passing instance of `+vinyl+` objects (virtual files) along different transformers. Gulp has a wide array of "`plugins`" available usable as transformers. In the case you need something custom, you can simply leverage `+Transformer+`.
RxJS is a library that supports a programming style called "`Reactive programming`". Basically, it provides data streams and a wide array of transformer functions to work with these streams. The stream format and intermediate transforms seemed to be replicating Gulp streams well enough to be a suitable replacement.
== Sourcing files
The first thing we will need to replicate is the `+gulp.src+` function. It is a function that takes a glob pattern, matching files and creating a stream of virtual files from them. For the files, it leverages `+vinyl+` to provide for virtual files. These objects contain a path, file contents, and history of paths. Transformers work on these files, altering the contents and paths at will.
To do the glob step, we can simply rely on the `+glob+` library. Its `+Glob+` class is directly pluggable into RxJS streams:
[source,ts]
----
import { Glob } from "https://esm.sh/glob@^10.4.2";
import { from } from "https://esm.sh/rxjs@^7.8.1";
from(new Glob("**/*.ts", {})).subscribe((file) => console.log(file));
----
`+vinyl+` is perfectly usable outside of the Gulp environment, but in this I will favour the `+vfile+` library, notably because it is based on `+UInt8Array+` instead of `+Buffer+` as file contents, which play better with modern JavaScript and especially Deno.
So now that we have an observable of paths, we need to actually load those files into `+VFile+` objects. This can simply done by inserting a `+map+` operation in our stream:
[source,ts]
----
import { Glob } from "https://esm.sh/glob@^10.4.2";
import { from, map } from "https://esm.sh/rxjs@^7.8.1";
import { readSync } from "https://esm.sh/to-vfile@^8.0.0";
import { resolve } from "jsr:@std/path@^0.225.2";
from(new Glob("**/*.ts", {}))
.pipe(map((file) => readSync(resolve(file))))
.subscribe((file) => console.log(file));
----
I leveraged the `+to-vfile+` library that provides helpers to bridge virtual files to the actual filesystem. The `+readSync+` method will take an absolute path (that we get through `+resolve+`) and returns a `+VFile+`.
== Going async
Modern JavaScript leverages promises a lot, and RxJS is completely compatible with them. Actually, you can see Promises as streams with a single element, and that is exactly how RxJS handles them. So RxJS helpers that handle observables can also be called transparently with promises.
In our previous example, I used `+readSync+` that does a blocking operation, but we can totally make it async using the `+read+` function. However, `+map+` is not fit for this, as using it with a function that returns a promise will create a stream of promises (`+Observable<Promise<VFile>>+`), while we want to continue working with virtual files directly. For this, there is the `+mergeMap+` operator that takes functions returning streams and merge them into a single stream. Since promises are valid streams, we can directly use this.
[source,ts]
----
import { Glob } from "https://esm.sh/glob@^10.4.2";
import { from, mergeMap } from "https://esm.sh/rxjs@^7.8.1";
import { read } from "https://esm.sh/to-vfile@^8.0.0";
import { resolve } from "jsr:@std/path@^0.225.2";
from(new Glob("**/*.ts", {}))
.pipe(mergeMap((file) => read(resolve(file))))
.subscribe((file) => console.log(file));
----
Not much is changed, but we are now using async code, which can provide a performance boost in some workflows.
`+map+` and `+readMap+` are going to be the primary operators to work with to do transformations, depending on if your transformation is `+async+` or not.
We can go one step further and make our code a little bit more reusable. The combination of `+Glob+` and `+readSync+` can be abstracted away in a simple function. Observables are fully composable, and `+.pipe+` can be called several times if needed, yielding a new observable. So let us make a function to create our source observable, mimicking the origin `+src+` from Gulp:
[source,ts]
----
const src = (globs: string | string[]) =>
from(new Glob(globs, {})).pipe(
mergeMap((file) => read(resolve(file))),
);
----
Now that we have a `+src+` function, we can use it to simplify our pipeline:
[source,ts]
----
src("**/*.ts").subscribe((file) => console.log(file));
----
== Outputting files
We now need to have the counterpart to `+src+`, `+dest+`, in order to write our files. This time it requires a bit more code, but nothing too difficult. We simply need to handle file writing at the right location.
[source,ts]
----
const dest = (prefix: string) => async (file: VFile) => {
----
We declare a simple `+dest+` function that takes a prefix, which will be where all files will be output. It returns a new function that has the right prototype to be fed into our pipeline through `+mergeAll+`.
[source,ts]
----
const relativePath = relative(file.cwd, file.path);
const finalPath = resolve(prefix, relative);
----
We then resolve the final path where to write the file. We have an absolute path that we transform into a relative path, then again into an absolute path injecting our prefix in to obtain the final filename. Note however that `+VFile+` can also have a relative path, but for this simple example this is enough.
[source,ts]
----
await Deno.mkdir(dirname(finalPath), { recursive: true });
await Deno.writeFile(finalPath, file.value as UInt8Array);
----
The next part of the function simply ensures the parent directory is created, then uses `+writeFile+` to output the file data. Once again, I take a small shortcut for demonstration by using `+as UInt8Array+`, as `+file.value+` can also be a `+string+` but I am ignoring this case for the moment.
[source,ts]
----
console.log("Written", finalPath);
return file;
}
----
The end of the function simply logs that we wrote the file, then returns it in case we want to do more stuff down the pipeline.
We also need to update our `+src+` function a bit: by default, `+VFile+` will set its `+cwd+` to `+/+`, while we want to have it set to the actual current working directory. This can be done simply with a `+tap+` operation:
[source,ts]
----
const src = (globs: string | string[]) =>
from(new Glob(globs, {})).pipe(
mergeMap((file) => read(resolve(file))),
tap((file) => file.cwd = Deno.cwd()),
);
----
You can then use `+mergeMap+` to inject this function into our pipeline. Here is the complete file:
[source,ts]
----
import { VFile } from "https://esm.sh/vfile@^6.0.1";
import { read } from "https://esm.sh/to-vfile@^8.0.0";
import { dirname, relative, resolve } from "jsr:@std/path@^0.225.2";
const dest = (prefix: string) => async (file: VFile) => {
const relativePath = relative(file.cwd, file.path);
const finalPath = resolve(prefix, relativePath);
await Deno.mkdir(dirname(finalPath), { recursive: true });
await Deno.writeFile(finalPath, file.value as Uint8Array);
console.log("Written", finalPath);
return file;
};
src("**/*.ts")
.pipe(mergeMap(dest("../dist")))
.subscribe({});
----
Note that I use `+.subscribe({})+` to make the pipeline process as it is lazy by default.
== Make it a promise
If we want to have tasks that we can chain, we need to know when the task is finished. By default, subscribing to a stream just has it running in the background without way of tracking it. So we are going to make it into a promise.
RxJS provides a `+.toPromise()+` method on the pipeline, but this function is deprecated in favour of using the `+lastValueFrom+`. This gives you a promise that resolves to the last `+VFile+` processed.
I am going to use a slightly longer way but a bit cleaner for me, that returns a proper `+Promise<void>+`. We are going to pair the `+Promise+` constructor with the `+subscribe+` method.
[source,ts]
----
const runPipeline = (observable: Observable<VFile>): Promise<void> =>
new Promise((complete, error) => observable.subscribe({ complete, error }));
runPipeline(
src("**/*.ts").pipe(mergeMap(dest("../dist"))),
).then(() => console.log("Done."));
----
The `+runPipeline+` function we define is rather simple: It create a promise, and binds the `+resolve+` and `+reject+` (here named `+complete+` and `+error+` to correspond to RxJS naming) functions passed by the promise constructor into a subscription to the stream, then returning the new promise.
Using `+runPipeline+` you can turn any RxJS pipeline into an await-able task.
== Inject files
For now, we have 1:1 transformations, but sometimes, we want to inject new files during the pipeline, for example a file name mapping manifest or assets included by a file.
We already have the tools for this here. For now, we used `+mergeMap+` to return a `+Promise<VFile>+`, but it handles returning an `+Observable<VFile>+` just as easily.
As a first example, we are going to emit a copy for every file we are passed:
[source,ts]
----
const copyFile = (file: VFile) =>
new Observable<VFile>((subscriber) => {
subscriber.next(file);
const copy = new VFile(file);
copy.stem = copy.stem + "-copy";
subscriber.next(copy);
subscriber.complete();
});
----
This function creates a new observable to which it passes the given file unmodified, then creates a copy, changes its path, and passes it to the subscriber. It ends by completing the observable.
You can insert `+mergeMap(copyFile)+` in the `+pipe+` before `+dest+` to create these clones.
For a slightly more complicated example, we are going to emit a list of all processed files in a manifest file. For this, we need to write a custom operator for RxJS, as just using `+mergeMap+` will not allow us to do this.
[source,ts]
----
const manifest = (path: string) => (observable: Observable<VFile>) =>
new Observable<VFile>((subscriber) => {
const files = new Set<string>();
observable.subscribe({
next(file) {
files.add(file.path);
subscriber.next(file);
},
complete() {
subscriber.next(
new VFile({
path,
value: JSON.stringify(Array.from(files)),
}),
);
subscriber.complete();
},
error(err) {
subscriber.error(err);
},
});
});
----
RxJS operators like `+mergeAll+` are functions that take an `+Observable+` and return another `+Observable+`, which is what the `+manifest+` function written here returns. We simply create a new observable, and use `+subscribe+` on the source observable to insert our functions. In `+next+`, which is called for each item, we just add the file path to a local `+Set+`. Then, `+complete+` is called on completion (after the last element), in which we inject a new `+VFile+` into the `+subscriber+` before completing it.
You do not need `+mergeMap+` to use this operator:
[source,ts]
----
src("**/*.ts")
.pipe(
manifest("manifest.json"),
mergeMap(dest("../dist")),
);
----
If you run it with the previous code, it will not work, as we hit both noted limitations of our previous `+dest+` function. Here is an updated version:
[source,ts]
----
const dest = (prefix: string) => async (file: VFile) => {
const relativePath = isAbsolute(file.path) ? relative(file.cwd, file.path) : file.path;
const finalPath = resolve(prefix, relativePath);
await Deno.mkdir(dirname(finalPath), { recursive: true });
if (typeof file.value === "string") {
const encoder = new TextEncoder();
await Deno.writeFile(finalPath, encoder.encode(file.value));
} else {
await Deno.writeFile(finalPath, file.value);
}
console.log("Written", finalPath);
return file;
};
----
It has been modified to support relative paths, as well as `+VFile+` objects that are backed by a `+string+` instead of a `+UInt8Array+`, using `+TextEncoder+` to convert the string.
And thats it! This reduced set of tool should be enough to go through files and apply all sorts of transformations. Privilege existing RxJS operators if you can, but if you need, custom operators are available for the most complex use cases.
== Closing notes
Replacing Gulps streams with RxJS ones was not straightforward, and it is expected, as Gulp is specialised on the exact task of file transformation, and RxJS is a more generic stream tooling. However, it is still rather easily to do. RxJS is incredibly polyvalent and this article barely touches what it can actually do with streams, especially since we work on a very short execution time, while RxJS is also made for long-running applications, to handle, transform and re-dispatch events.

View File

@ -7,7 +7,7 @@
"chokidar": "npm:chokidar@^3.6.0",
"cssnano": "npm:cssnano@^7.0.2",
"glob": "npm:glob@^10.4.2",
"luxon": "npm:luxon@^3.4.4",
"luxon": "https://esm.sh/luxon@^3.4.4",
"postcss": "npm:postcss@^8.4.38",
"postcss-import": "npm:postcss-import@^16.1.0",
"postcss-nesting": "npm:postcss-nesting@^12.1.5",

View File

@ -11,7 +11,6 @@
"npm:chokidar@^3.6.0": "npm:chokidar@3.6.0",
"npm:cssnano@^7.0.2": "npm:cssnano@7.0.3_postcss@8.4.38",
"npm:glob@^10.4.2": "npm:glob@10.4.2",
"npm:luxon@^3.4.4": "npm:luxon@3.4.4",
"npm:postcss-import@^16.1.0": "npm:postcss-import@16.1.0_postcss@8.4.38",
"npm:postcss-nesting@^12.1.5": "npm:postcss-nesting@12.1.5_postcss@8.4.38_postcss-selector-parser@6.1.0",
"npm:postcss@^8.4.38": "npm:postcss@8.4.38",
@ -39,7 +38,6 @@
"@std/log@0.224.3": {
"integrity": "601af539ff0c80d117fcb6cab7d9339242872d7f7f5fe4862aaf32152d86b9bf",
"dependencies": [
"jsr:@std/fmt@^0.225.4",
"jsr:@std/fs@^1.0.0-rc.1",
"jsr:@std/io@^0.224.2"
]
@ -809,10 +807,6 @@
"integrity": "sha512-9hp3Vp2/hFQUiIwKo8XCeFVnrg8Pk3TYNPIR7tJADKi5YfcF7vEaK7avFHTlSy3kOKYaJQaalfEo6YuXdceBOQ==",
"dependencies": {}
},
"luxon@3.4.4": {
"integrity": "sha512-zobTr7akeGHnv7eBOXcRgMeCP6+uyYsczwmeRCauvpvaAltgNyTbLH/+VaEAPUeWBT+1GuNmz4wC/6jtQzbbVA==",
"dependencies": {}
},
"mdast-util-to-hast@13.2.0": {
"integrity": "sha512-QGYKEuUsYT9ykKBCMOEDLsU5JRObWQusAolFMeko/tYPufNkRffBAQjIE+99jbA87xv6FgmjLtwjh9wBWajwAA==",
"dependencies": {
@ -1749,14 +1743,18 @@
},
"redirects": {
"https://esm.sh/asciidoctor@^3.0.4": "https://esm.sh/asciidoctor@3.0.4",
"https://esm.sh/luxon@^3.4.4": "https://esm.sh/luxon@3.4.4",
"https://esm.sh/to-vfile@^8.0.0": "https://esm.sh/to-vfile@8.0.0",
"https://esm.sh/v135/@types/luxon@~3.4/index.d.ts": "https://esm.sh/v135/@types/luxon@3.4.2/index.d.ts",
"https://esm.sh/vfile@^6.0.1": "https://esm.sh/vfile@6.0.1"
},
"remote": {
"https://esm.sh/asciidoctor@3.0.4": "23f6b6ab844b5295074b6dd139b0153f4b6424df7f41aaa2fcec5004359bb094",
"https://esm.sh/luxon@3.4.4": "d0ab977827047c813736ac318a7ed3494d5d2f5cea20a305405f3134f5702df7",
"https://esm.sh/to-vfile@8.0.0": "05aa989433514d267833c5057b53935c837c1373824ed8b60ed38cae67655af5",
"https://esm.sh/v135/@asciidoctor/core@3.0.4/denonext/core.mjs": "fbb624c9375ac40a70e586d84f6986ff7e8c1ff572e006284a08709e5118bbf7",
"https://esm.sh/v135/asciidoctor@3.0.4/denonext/asciidoctor.mjs": "00c21f42422684a4bc5e35647d9495a6631632fdc4c97ddb045a1a3b46d58dba",
"https://esm.sh/v135/luxon@3.4.4/denonext/luxon.mjs": "033dc975621243fc8a447b2268b5823944fe1fa120242543394eceb1d6c42961",
"https://esm.sh/v135/to-vfile@8.0.0/denonext/to-vfile.mjs": "e9e39c7791eb0ea7055105ffd3eb6ccf3d36a1a4ddf6c5a3824bc3838544ad40",
"https://esm.sh/v135/unist-util-stringify-position@4.0.0/denonext/unist-util-stringify-position.mjs": "dabd32cb2b590bbb077fc6f6591a2e065cffd6c55646ba383455926a27ea64d7",
"https://esm.sh/v135/vfile-message@4.0.2/denonext/vfile-message.mjs": "efc85b18bedda337fb1c20cdc452fac3addac32ee55948cebf2845396ae641ac",
@ -1775,7 +1773,6 @@
"npm:chokidar@^3.6.0",
"npm:cssnano@^7.0.2",
"npm:glob@^10.4.2",
"npm:luxon@^3.4.4",
"npm:postcss-import@^16.1.0",
"npm:postcss-nesting@^12.1.5",
"npm:postcss@^8.4.38",

View File

@ -1,5 +1,5 @@
import { env } from "node:process";
import { DateTime } from "luxon";
export const PRODUCTION = env.NODE_ENV === "production";
export const DEFAULT_DATE = DateTime.fromSeconds(Number(env.SOURCE_DATE_EPOCH)).toUTC();
export const PRODUCTION: boolean = env.NODE_ENV === "production";
export const DEFAULT_DATE: DateTime = DateTime.fromSeconds(0).toUTC();

View File

@ -70,6 +70,7 @@ const transformArticle = (sink: Subscriber<VFile>, articles: Article[]) => async
const slug = basename(file.path, ".asciidoc");
const document = Asciidoctor.load(decoder.decode(file.value as Uint8Array), {
extension_registry: EXTENSION_REGISTRY,
attributes: { "docdate": `${DEFAULT_DATE.toISODate()}@` },
});
const date = DateTime.fromISO(document.getAttribute("docdate", { zone: "UTC" }));
const article = { path: file.path, slug, date, document };