Skip to content Skip to sidebar Skip to footer

How To Unsubscribe Or Dispose An Interval Observable On Condition In Angular2 Or RxJS?

I'm new to the whole Rx thing and reactive programming, however I have to deal with such a situation. I want a interval observable to check a hardware's status by a POST request to

Solution 1:

So you want to make a POST request every 500 ms and then check its response. I assume your method intervalCheckingStatus evaluates the POST response and determines whether it's different?

First, I wouldn't use IntervalObservable. Have you imported the RxJS module? It's the third party library endorsed by Angular and the one they use in all their developer guide samples. If not, install it and import it. https://github.com/Reactive-Extensions/RxJS

import * as Rx from 'rxjs/Rx';

I assume you've already imported Http, ResponseOptions etc but here it is in case others are curious:

import { Http, Response, ResponseOptions } from '@angular/http';

EDIT 1: Forgot to include the dependency injection. Inject Http into your constructor. I've called it http, hence how I'm calling this.http.post

constructor(private http: Http) {

Then, I would do the following:

EDIT 2: This would be inside your loop where the post arguments are relevant to the item in your array.

    // Every 500 ms, make a POST request
    Rx.Observable.interval(500)
                  // Add your POST arguments here
                 .map(_ => this.http.post(yourUrl, yourBody))
                 // This filter here is so that it will only emit when intervalCheckingStatus returns true
                 // You need to get the property you need from the Response: resp
                 // Is it the status code you're interested in? That's what I put as an example here but whatever it is, pass it to your method
                .filter(resp => this.intervalCheckingStatus(resp.status))
                // Take(1) takes only the first emitted value and once it does that, the observable completes. So you do NOT need to unsubscribe explicitly.
                .take(1);

If you need to do something once the response has the status (or whatever property) you're looking for, then chain a .subscribe to the end and perform the action you need in there. Again, due to take(1), as soon as the first element is pumped, the observable stream completes and you do not need to unsubscribe.

Also, here is a very helpful website: http://rxmarbles.com/#take You can see that in their example, the resulting observable is complete (vertical line) after 2 elements are taken.


Solution 2:

You could use Observable.from and concatMap to iterate through all the items and then use a filter in combination with take(1) to stop an interval as soon as the validation passes the filter:

myLoop(item_array:number[], otheroption : string) {
    return Observable.from(item_array)
        .concatMap(item => this.configHardware(item, otheroption)
            .switchMap(resultId => Observable.interval(500)
                .switchMapTo(this.intervalCheckingStatus(resultId))
                .filter(status => Boolean(status)) // your logic if the status is valid, currently just a boolean-cast
                .take(1) // and complete after 1 value was valid
                .mapTo(item) // map back to "item" so we can notify the subscriber (this is optional I guess and depends on if you want this feature or not)
            )
        );
}

// usage:
myLoop([1,2,3,4], "fooBar")
    .subscribe(
        item => console.log(`Item ${item} is now valid`),
        error => console.error("Some error occured", error),
        () => console.log("All items are valid now")
    );

Here is a live-example with mock-data

const Observable = Rx.Observable;

function myLoop(item_array) {
    return Observable.from(item_array)
        // if you don't mind the execution-order you can use "mergeMap" instead of "concatMap"
        .concatMap(item => configHardwareMock(item)
            .switchMap(resultId => Observable.interval(500)
                .do(() => console.info("Checking status of: " + resultId))
                .switchMap(() => intervalCheckingStatus(resultId))
                .filter(status => Boolean(status)) // your logic if the status is valid, currently just a boolean-cast
                .take(1) // and complete after 1 value was valid
                .mapTo(item) // map back to "item" so we can notify the subscriber (this is optional I guess and depends on if you want this feature or not)
            )
        );
}

// usage:
myLoop([1,2,3,4])
    .subscribe(
        item => console.log(`Item ${item} is now valid`),
        error => console.error("Some error occured", error),
        () => console.log("All items are valid now")
    );

// mock helpers
function configHardwareMock(id) {
  return Observable.of(id);  
}

function intervalCheckingStatus(resultId) {
  if (Math.random() < .4) {
    return Observable.of(false);
  }
  
  return Observable.of(true);
}
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

Post a Comment for "How To Unsubscribe Or Dispose An Interval Observable On Condition In Angular2 Or RxJS?"