RxJS Operators: buffer

RxJS Operators: buffer

RxJS is a JavaScript library that enables the creation of asynchronous and event-based programs. The main type is the Observable and a suite of functions and operators are provided to make it easier to work with the data coming through. This series will detail those operators (and a few stand-alone functions) and provide examples of their use.

In this post, we’re going to cover the buffer operator.

What does it do?

The buffer operator lets us bundle together values from an observable and process them all together. We pass buffer a second observable that controls when to output values. Once the second observable emits a message all the collected values will be pushed through the rest of the pipeline as an array.

Example

import { Subject, buffer, interval } from 'rxjs';

const bufferTrigger$ = new Subject<void>();

interval(1000)
    .pipe(buffer(bufferTrigger$),)
    .subscribe(x => {
        console.log(x);
    });

setInterval(() => {
    bufferTrigger$.next();
}, 5000);

We’re using interval to provide us with incrementing numbers every second. Every 5-seconds bufferTrigger$ will fire and force buffer to pass whatever values it has to the rest of the pipeline. If we run this we see:

[ 0, 1, 2, 3 ]
[ 4, 5, 6, 7, 8 ]
[ 9, 10, 11, 12, 13 ]
[ 14, 15, 16, 17, 18 ]

We can see that buffer has spent 5 seconds collecting values. Once the trigger fired it built them into an array and forwarded that on. Just make sure the number of values coming from the original observable and the timing on the trigger are aligned. Otherwise, buffer will end up collecting a lot more values than you expect, which may cause issues. We don’t want to run out of memory because we’ve not cleared the buffer.

The source code for this example is available on GitHub: