RxJs is a great set of libraries. But testing it can be a challenge.

Recently I needed to unit test throttle operator and it took me quite a bit to discover solution that works well in our project. At first I’ve tried to apply the solution described Hyphe Blog unfortunately it didn’t work for me but the method for injecting TestScheduler was really helpful.

Testing the throttling

I’ve created a plunk to describe the solution.

var throttleWindowDuration = 2 * 1000; /* 2 seconds */

function throttleTest() {
  var unthrottledStream = new Rx.Subject();
  var source = unthrottledStream.throttle(throttleWindowDuration);
  var result = {
    emitCounter: 0,
    unthrottledStream
  };

  var subscription = source.subscribe(
    function() {
      result.emitCounter++;
    });

  return result;
}

describe('run with test scheduler', function() {
  var testScheduler;
  var throttleSpy;

  beforeAll(function() {
    testScheduler = new Rx.TestScheduler();
    var originalThrottle = Rx.Observable.prototype.throttle;
    throttleSpy = spyOn(Rx.Observable.prototype, 'throttle')
      .and.callFake(function(dueTime) {
        return originalThrottle.call(this, dueTime, testScheduler);
      });
  });

  afterAll(function() {
    throttleSpy.and.callThrough();
  });

  it('advancing testScheduler allows to test throttling synchronously', function() {
    var throttleTestResult = throttleTest();

    //throttled stream will not emit if scheduler clock is at zero

    testScheduler.advanceBy(1);
    throttleTestResult.unthrottledStream.onNext();
    throttleTestResult.unthrottledStream.onNext();
    throttleTestResult.unthrottledStream.onNext();

    testScheduler.advanceBy(throttleWindowDuration);
    throttleTestResult.unthrottledStream.onNext();
    throttleTestResult.unthrottledStream.onNext();

    testScheduler.advanceBy(throttleWindowDuration);
    throttleTestResult.unthrottledStream.onNext();

    testScheduler.advanceBy(throttleWindowDuration);
    throttleTestResult.unthrottledStream.onNext();

    expect(throttleTestResult.emitCounter).toBe(4);
  });
});

describe('run without test scheduler', function() {
  it('without test scheduler the emit counter will stay at 1 '
    + 'as throttle duration is not elapsed', function() {
    var throttleTestResult = throttleTest();

    throttleTestResult.unthrottledStream.onNext();
    throttleTestResult.unthrottledStream.onNext();
    throttleTestResult.unthrottledStream.onNext();
    throttleTestResult.unthrottledStream.onNext();
    expect(throttleTestResult.emitCounter).toBe(1);
  });
});

As you can see from the example above you can simply use advanceBy method to advance internal clock and test throttling synchronously. RxJs 5 brings many changes I would have to revisit this code after it gets out of beta.