Skip to content Skip to sidebar Skip to footer

Initialize Cache When It Is Used

Lets say that I have the following events: DoSomething FetchSomething FetchSomethingSuccessful DoSomething does something that needs some cached data. When I fire off the event I

Solution 1:

You want a simple shareReplay, but you need rxjs 5.5.0 or later as there was a bug that has been fixed.

const cached$ = request$.shareReplay(1);

This will trigger the request when the first subscription is made, but subsequent subscriptions will use the cached value.

Errors are passed to the subscriber and then the internal subject is destroyed so that the error itself isn't cached. This makes the observable retryable. So you can attach whatever retry logic you want (such as retrying until successful).

Lastly, the cache also persists if the refCount goes to 0 at some point.

shareReplay also takes a second argument, much like the ReplaySubject constructor, defining a time window for which to keep the cache.

// Faked request which sometimes errorsconst request$ = Rx.Observable
  .defer(() =>Rx.Observable.of(Math.random()))
  .do(() =>console.log('API called'))
  .map(val => {
    if (val <= 0.3) {
      console.log('API error');
      throw val;
    } else {
      return val;
    }
  })
  .delay(250);

const cached$ = request$.shareReplay(1);

Rx.Observable.timer(0, 1000)
  .take(5)
  .switchMap(() => cached$.retry(5))
  .subscribe(console.log);
<scriptsrc="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.min.js"></script>

Solution 2:

When caching HTTP responses I use something like this:

let counter = 1;
const cache$ = newReplaySubject(1, 1000);

// Mock HTTP requestconst http$ = Observable
  .defer(() =>Observable.of(counter++).delay(100).do(val => cache$.next(val)))
  .share();

const source$ = Observable
  .merge(cache$, http$)
  .take(1);

See live demo: https://jsbin.com/rogetem/9/edit?js,console

It still needs one extra variable cache$. I believe it's possible to implement this without it but I think I'd have to use materialize() and dematerialize() which makes is quite confusing.

You know the cache is empty when the callback for defer() is called. Right now it just increments counter.

Note, that I still had to use .do(val => cache$.next(val)) to pass only next notifications to the ReplaySubject and avoid error and complete because these would stop the Subject.

You can see that it really works with for example this:

source$.subscribe(val =>console.log("Response 0:", val));
setTimeout(() => source$.subscribe(val =>console.log("Response 50:", val)), 50);
setTimeout(() => source$.subscribe(val =>console.log("Response 60:", val)), 60);
setTimeout(() => source$.subscribe(val =>console.log("Response 200:", val)), 200);
setTimeout(() => source$.subscribe(val =>console.log("Response 1200:", val)), 1200);
setTimeout(() => source$.subscribe(val =>console.log("Response 1500:", val)), 1500);
setTimeout(() => source$.subscribe(val =>console.log("Response 3500:", val)), 3500);

This prints the following output (each value is cached for 1 second):

Response 0:1Response 50:1Response 60:1Response 200:1Response 1200:2Response 1500:2Response 3500:3

Post a Comment for "Initialize Cache When It Is Used"