RxJS is a JavaScript library that enables the creation of asynchronous and event-based programs. The main type is the Observable
and a suite of functions and operators are provided to make it easier to work with the data coming through. This series will detail those operators (and a few stand-alone functions) and provide examples of their use.
In this post, we’re going to cover the buffer
operator.
What does it do?
The buffer
operator lets us bundle together values from an observable and process them all together. We pass buffer
a second observable that controls when to output values. Once the second observable emits a message all the collected values will be pushed through the rest of the pipeline as an array.
Example
import { Subject, buffer, interval } from 'rxjs';
const bufferTrigger$ = new Subject<void>();
interval(1000)
.pipe(buffer(bufferTrigger$),)
.subscribe(x => {
console.log(x);
});
setInterval(() => {
bufferTrigger$.next();
}, 5000);
We’re using interval
to provide us with incrementing numbers every second. Every 5-seconds bufferTrigger$
will fire and force buffer
to pass whatever values it has to the rest of the pipeline. If we run this we see:
[ 0, 1, 2, 3 ]
[ 4, 5, 6, 7, 8 ]
[ 9, 10, 11, 12, 13 ]
[ 14, 15, 16, 17, 18 ]
We can see that buffer has spent 5 seconds collecting values. Once the trigger fired it built them into an array and forwarded that on. Just make sure the number of values coming from the original observable and the timing on the trigger are aligned. Otherwise, buffer
will end up collecting a lot more values than you expect, which may cause issues. We don’t want to run out of memory because we’ve not cleared the buffer.
The source code for this example is available on GitHub: