@Override public Observable<?> call(Observable<? extends Notification<?>> ts) { return ts.map(new Function<Notification<?>, Notification<?>>() { int num=0; @Override public Notification<?> call(Notification<?> terminalNotification) { if(count == 0) { return terminalNotification; } num++; if(num <= count) { return Notification.createOnNext(num); } else { return terminalNotification; } } }).dematerialize(); }
@Test public void testTakeWithErrorHappeningInOnNext() { Observable<Integer> w = Observable.from(Arrays.asList(1, 2, 3)).take(2).map(new Function<Integer, Integer>() { @Override public Integer call(Integer t1) { throw new IllegalArgumentException("some error"); } }); @SuppressWarnings("unchecked") Observer<Integer> observer = mock(Observer.class); w.subscribe(observer); InOrder inOrder = inOrder(observer); inOrder.verify(observer, times(1)).onError(any(IllegalArgumentException.class)); inOrder.verifyNoMoreInteractions(); }
@Test public void testIssue1451Case1() { // https://github.com/Netflix/RxJava/issues/1451 int[] nums = { 1, 2, 3 }; final AtomicInteger count = new AtomicInteger(); for (final int n : nums) { Observable .just(Boolean.TRUE, Boolean.FALSE) .takeWhile(new Function<Boolean, Boolean>() { @Override public Boolean call(Boolean value) { return value; } }) .toList() .doOnNext(new Action1<List<Boolean>>() { @Override public void call(List<Boolean> booleans) { count.incrementAndGet(); } }) .subscribe(); } assertEquals(nums.length, count.get()); }
@Test public void debounceSelectorFuncThrows() { PublishSubject<Integer> source = PublishSubject.create(); Function<Integer, Observable<Integer>> debounceSel = new Function<Integer, Observable<Integer>>() { @Override public Observable<Integer> call(Integer t1) { throw new TestException(); } }; @SuppressWarnings("unchecked") Observer<Object> o = mock(Observer.class); source.debounce(debounceSel).subscribe(o); source.onNext(1); verify(o, never()).onNext(any()); verify(o, never()).onComplete(); verify(o).onError(any(TestException.class)); }
@Test public void testIssue1451Case2() { // https://github.com/Netflix/RxJava/issues/1451 int[] nums = { 1, 2, 3 }; final AtomicInteger count = new AtomicInteger(); for (final int n : nums) { Observable .just(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE) .takeWhile(new Function<Boolean, Boolean>() { @Override public Boolean call(Boolean value) { return value; } }) .toList() .doOnNext(new Action1<List<Boolean>>() { @Override public void call(List<Boolean> booleans) { count.incrementAndGet(); } }) .subscribe(); } assertEquals(nums.length, count.get()); }
@Test public void testAnyWithPredicate1() { Observable<Integer> w = Observable.just(1, 2, 3); Observable<Boolean> observable = w.exists( new Function<Integer, Boolean>() { @Override public Boolean call(Integer t1) { return t1 < 2; } }); @SuppressWarnings("unchecked") Observer<Boolean> observer = mock(Observer.class); observable.subscribe(observer); verify(observer, never()).onNext(false); verify(observer, times(1)).onNext(true); verify(observer, never()).onError(org.mockito.Matchers.any(Throwable.class)); verify(observer, times(1)).onComplete(); }
@Test public void testToMultimapWithErrorInValueSelector() { Observable<String> source = Observable.just("a", "b", "cc", "dd"); Function<String, String> duplicateErr = new Function<String, String>() { @Override public String call(String t1) { if ("b".equals(t1)) { throw new RuntimeException("Forced failure"); } return t1 + t1; } }; Observable<Map<Integer, Collection<String>>> mapped = source.toMultimap(lengthFunc, duplicateErr); Map<Integer, Collection<String>> expected = new HashMap<Integer, Collection<String>>(); expected.put(1, Arrays.asList("aa", "bb")); expected.put(2, Arrays.asList("cccc", "dddd")); mapped.subscribe(objectObserver); verify(objectObserver, times(1)).onError(any(Throwable.class)); verify(objectObserver, never()).onNext(expected); verify(objectObserver, never()).onComplete(); }
@Test public void testTakeUnsubscribesOnGroupBy() { Observable.merge( EventStream.getEventStream("HTTP-ClusterA", 50), EventStream.getEventStream("HTTP-ClusterB", 20)) // group by type (2 clusters) .groupBy(new Function<Event, String>() { @Override public String call(Event event) { return event.type; } }).take(1) .toBlocking().forEach(new Action1<GroupedObservable<String, Event>>() { @Override public void call(GroupedObservable<String, Event> g) { System.out.println(g); } }); System.out.println("**** finished"); }
private void performTestUsingWithObservableFactoryError(boolean disposeEagerly) { final Action0 unsubscribe = mock(Action0.class); Supplier<Subscription> resourceFactory = new Supplier<Subscription>() { @Override public Subscription call() { return Subscriptions.create(unsubscribe); } }; Function<Subscription, Observable<Integer>> observableFactory = new Function<Subscription, Observable<Integer>>() { @Override public Observable<Integer> call(Subscription subscription) { throw new TestException(); } }; try { Observable.using(resourceFactory, observableFactory, disposeSubscription).toBlocking() .last(); fail("Should throw a TestException when the observableFactory throws it"); } catch (TestException e) { // Make sure that unsubscribe is called so that users can close // the resource if some error happens. verify(unsubscribe, times(1)).call(); } }
@Test public void testExists1() { Observable<Integer> w = Observable.just(1, 2, 3); Observable<Boolean> observable = w.exists( new Function<Integer, Boolean>() { @Override public Boolean call(Integer t1) { return t1 < 2; } }); @SuppressWarnings("unchecked") Observer<Boolean> observer = mock(Observer.class); observable.subscribe(observer); verify(observer, never()).onNext(false); verify(observer, times(1)).onNext(true); verify(observer, never()).onError(org.mockito.Matchers.any(Throwable.class)); verify(observer, times(1)).onComplete(); }
@Test public void debounceSelectorObservableThrows() { PublishSubject<Integer> source = PublishSubject.create(); Function<Integer, Observable<Integer>> debounceSel = new Function<Integer, Observable<Integer>>() { @Override public Observable<Integer> call(Integer t1) { return Observable.error(new TestException()); } }; @SuppressWarnings("unchecked") Observer<Object> o = mock(Observer.class); source.debounce(debounceSel).subscribe(o); source.onNext(1); verify(o, never()).onNext(any()); verify(o, never()).onComplete(); verify(o).onError(any(TestException.class)); }
@Test @SuppressWarnings("unchecked") public void testAll() { Observable<String> obs = Observable.just("one", "two", "six"); Observer<Boolean> observer = mock(Observer.class); obs.all(new Function<String, Boolean>() { @Override public Boolean call(String s) { return s.length() == 3; } }).subscribe(observer); verify(observer).onNext(true); verify(observer).onComplete(); verifyNoMoreInteractions(observer); }
@Test @SuppressWarnings("unchecked") public void testNotAll() { Observable<String> obs = Observable.just("one", "two", "three", "six"); Observer<Boolean> observer = mock(Observer.class); obs.all(new Function<String, Boolean>() { @Override public Boolean call(String s) { return s.length() == 3; } }).subscribe(observer); verify(observer).onNext(false); verify(observer).onComplete(); verifyNoMoreInteractions(observer); }
@Test @SuppressWarnings("unchecked") public void testEmpty() { Observable<String> obs = Observable.empty(); Observer<Boolean> observer = mock(Observer.class); obs.all(new Function<String, Boolean>() { @Override public Boolean call(String s) { return s.length() == 3; } }).subscribe(observer); verify(observer).onNext(true); verify(observer).onComplete(); verifyNoMoreInteractions(observer); }
@Test public void testDelayWithObservableEmptyDelayer() { PublishSubject<Integer> source = PublishSubject.create(); Function<Integer, Observable<Integer>> delayFunc = new Function<Integer, Observable<Integer>>() { @Override public Observable<Integer> call(Integer t1) { return Observable.empty(); } }; @SuppressWarnings("unchecked") Observer<Object> o = mock(Observer.class); InOrder inOrder = inOrder(o); source.delay(delayFunc).subscribe(o); source.onNext(1); source.onComplete(); inOrder.verify(o).onNext(1); inOrder.verify(o).onComplete(); inOrder.verifyNoMoreInteractions(); verify(o, never()).onError(any(Throwable.class)); }
@Test public void leftDurationSelectorThrows() { PublishSubject<Integer> source1 = PublishSubject.create(); PublishSubject<Integer> source2 = PublishSubject.create(); Function<Integer, Observable<Integer>> fail = new Function<Integer, Observable<Integer>>() { @Override public Observable<Integer> call(Integer t1) { throw new RuntimeException("Forced failure"); } }; Observable<Integer> m = source1.join(source2, fail, just(Observable.never()), add); m.subscribe(observer); source1.onNext(1); verify(observer, times(1)).onError(any(Throwable.class)); verify(observer, never()).onComplete(); verify(observer, never()).onNext(any()); }
@Test public void testAnyWithEmptyAndPredicate() { // If the source is empty, always output false. Observable<Integer> w = Observable.empty(); Observable<Boolean> observable = w.exists( new Function<Integer, Boolean>() { @Override public Boolean call(Integer t1) { return true; } }); @SuppressWarnings("unchecked") Observer<Boolean> observer = mock(Observer.class); observable.subscribe(observer); verify(observer, times(1)).onNext(false); verify(observer, never()).onNext(true); verify(observer, never()).onError(org.mockito.Matchers.any(Throwable.class)); verify(observer, times(1)).onComplete(); }
/** * Make sure we get a MissingBackpressureException propagated through when we have a fast temporal (hot) producer. */ @Test public void testHotOperatorBackpressure() { TestSubscriber<String> ts = new TestSubscriber<String>(); Observable.timer(0, 1, TimeUnit.MICROSECONDS) .observeOn(Schedulers.computation()) .map(new Function<Long, String>() { @Override public String call(Long t1) { System.out.println(t1); try { Thread.sleep(100); } catch (InterruptedException e) { } return t1 + " slow value"; } }).subscribe(ts); ts.awaitTerminalEvent(); System.out.println("Errors: " + ts.getOnErrorEvents()); assertEquals(1, ts.getOnErrorEvents().size()); assertEquals(MissingBackpressureException.class, ts.getOnErrorEvents().get(0).getClass()); }
@Test public void testAnyWithPredicate2() { Observable<Integer> w = Observable.just(1, 2, 3); Observable<Boolean> observable = w.exists( new Function<Integer, Boolean>() { @Override public Boolean call(Integer t1) { return t1 < 1; } }); @SuppressWarnings("unchecked") Observer<Boolean> observer = mock(Observer.class); observable.subscribe(observer); verify(observer, times(1)).onNext(false); verify(observer, never()).onNext(true); verify(observer, never()).onError(org.mockito.Matchers.any(Throwable.class)); verify(observer, times(1)).onComplete(); }
@Test public void testUsingDoesNotDisposesEagerlyBeforeError() { final List<String> events = new ArrayList<String>(); Supplier<Resource> resourceFactory = createResourceFactory(events); final Action1<Throwable> onError = createOnErrorAction(events); final Action0 unsub = createUnsubAction(events); Function<Resource, Observable<String>> observableFactory = new Function<Resource, Observable<String>>() { @Override public Observable<String> call(Resource resource) { return Observable.from(resource.getTextFromWeb().split(" ")).concatWith(Observable.<String>error(new RuntimeException())); } }; @SuppressWarnings("unchecked") Observer<String> observer = mock(Observer.class); Observable<String> observable = Observable.using(resourceFactory, observableFactory, new DisposeAction(), false).doOnUnsubscribe(unsub) .doOnError(onError); observable.subscribe(observer); assertEquals(Arrays.asList("error", "unsub", "disposed"), events); }
@Test public void testFirstOrDefaultWithPredicateAndEmpty() { Observable<Integer> observable = Observable.just(1).firstOrDefault(2, new Function<Integer, Boolean>() { @Override public Boolean call(Integer t1) { return t1 % 2 == 0; } }); @SuppressWarnings("unchecked") Observer<Integer> observer = mock(Observer.class); observable.subscribe(observer); InOrder inOrder = inOrder(observer); inOrder.verify(observer, times(1)).onNext(2); inOrder.verify(observer, times(1)).onComplete(); inOrder.verifyNoMoreInteractions(); }
private void performTestUsingWithResourceFactoryError(boolean disposeEagerly) { Supplier<Subscription> resourceFactory = new Supplier<Subscription>() { @Override public Subscription call() { throw new TestException(); } }; Function<Subscription, Observable<Integer>> observableFactory = new Function<Subscription, Observable<Integer>>() { @Override public Observable<Integer> call(Subscription subscription) { return Observable.empty(); } }; Observable.using(resourceFactory, observableFactory, disposeSubscription).toBlocking() .last(); }
@Test public void testDelayWithObservableDelayFunctionThrows() { PublishSubject<Integer> source = PublishSubject.create(); Function<Integer, Observable<Integer>> delayFunc = new Function<Integer, Observable<Integer>>() { @Override public Observable<Integer> call(Integer t1) { throw new TestException(); } }; @SuppressWarnings("unchecked") Observer<Object> o = mock(Observer.class); InOrder inOrder = inOrder(o); source.delay(delayFunc).subscribe(o); source.onNext(1); inOrder.verify(o).onError(any(TestException.class)); inOrder.verifyNoMoreInteractions(); verify(o, never()).onNext(any()); verify(o, never()).onComplete(); }
@Test public void testTakeWhile2() { Observable<String> w = Observable.just("one", "two", "three"); Observable<String> take = w.takeWhile(new Function<String, Boolean>() { int index = 0; @Override public Boolean call(String input) { return index++ < 2; } }); @SuppressWarnings("unchecked") Observer<String> observer = mock(Observer.class); take.subscribe(observer); verify(observer, times(1)).onNext("one"); verify(observer, times(1)).onNext("two"); verify(observer, never()).onNext("three"); verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onComplete(); }
@Test public void testTakeWhileDoesntLeakErrors() { Observable<String> source = Observable.create(new OnSubscribe<String>() { @Override public void call(Subscriber<? super String> observer) { observer.onNext("one"); observer.onError(new Throwable("test failed")); } }); source.takeWhile(new Function<String, Boolean>() { @Override public Boolean call(String s) { return false; } }).toBlocking().lastOrDefault(""); }
@Test public void testTakeWhileProtectsPredicateCall() { TestObservable source = new TestObservable(mock(Subscription.class), "one"); final RuntimeException testException = new RuntimeException("test exception"); @SuppressWarnings("unchecked") Observer<String> observer = mock(Observer.class); Observable<String> take = Observable.create(source).takeWhile(new Function<String, Boolean>() { @Override public Boolean call(String s) { throw testException; } }); take.subscribe(observer); // wait for the Observable to complete try { source.t.join(); } catch (Throwable e) { e.printStackTrace(); fail(e.getMessage()); } verify(observer, never()).onNext(any(String.class)); verify(observer, times(1)).onError(testException); }
@Test public void testUsingDisposesEagerlyBeforeError() { final List<String> events = new ArrayList<String>(); Supplier<Resource> resourceFactory = createResourceFactory(events); final Action1<Throwable> onError = createOnErrorAction(events); final Action0 unsub = createUnsubAction(events); Function<Resource, Observable<String>> observableFactory = new Function<Resource, Observable<String>>() { @Override public Observable<String> call(Resource resource) { return Observable.from(resource.getTextFromWeb().split(" ")).concatWith(Observable.<String>error(new RuntimeException())); } }; @SuppressWarnings("unchecked") Observer<String> observer = mock(Observer.class); Observable<String> observable = Observable.using(resourceFactory, observableFactory, new DisposeAction(), true).doOnUnsubscribe(unsub) .doOnError(onError); observable.subscribe(observer); assertEquals(Arrays.asList("disposed", "error", "unsub"), events); }
@Test public void testLastWithPredicate() { Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5, 6) .last(new Function<Integer, Boolean>() { @Override public Boolean call(Integer t1) { return t1 % 2 == 0; } }); @SuppressWarnings("unchecked") Observer<Integer> observer = mock(Observer.class); observable.subscribe(observer); InOrder inOrder = inOrder(observer); inOrder.verify(observer, times(1)).onNext(6); inOrder.verify(observer, times(1)).onComplete(); inOrder.verifyNoMoreInteractions(); }
@Test public void testLastWithPredicateAndEmpty() { Observable<Integer> observable = Observable.just(1).last( new Function<Integer, Boolean>() { @Override public Boolean call(Integer t1) { return t1 % 2 == 0; } }); @SuppressWarnings("unchecked") Observer<Integer> observer = mock(Observer.class); observable.subscribe(observer); InOrder inOrder = inOrder(observer); inOrder.verify(observer, times(1)).onError( isA(NoSuchElementException.class)); inOrder.verifyNoMoreInteractions(); }
@Test public void testLastOrDefaultWithPredicate() { Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5, 6) .lastOrDefault(8, new Function<Integer, Boolean>() { @Override public Boolean call(Integer t1) { return t1 % 2 == 0; } }); @SuppressWarnings("unchecked") Observer<Integer> observer = mock(Observer.class); observable.subscribe(observer); InOrder inOrder = inOrder(observer); inOrder.verify(observer, times(1)).onNext(6); inOrder.verify(observer, times(1)).onComplete(); inOrder.verifyNoMoreInteractions(); }
@Test public void testDelayWithObservableDelayThrows() { PublishSubject<Integer> source = PublishSubject.create(); final PublishSubject<Integer> delay = PublishSubject.create(); Function<Integer, Observable<Integer>> delayFunc = new Function<Integer, Observable<Integer>>() { @Override public Observable<Integer> call(Integer t1) { return delay; } }; @SuppressWarnings("unchecked") Observer<Object> o = mock(Observer.class); InOrder inOrder = inOrder(o); source.delay(delayFunc).subscribe(o); source.onNext(1); delay.onError(new TestException()); inOrder.verify(o).onError(any(TestException.class)); inOrder.verifyNoMoreInteractions(); verify(o, never()).onNext(any()); verify(o, never()).onComplete(); }
@Test public void testLastOrDefaultWithPredicateAndEmpty() { Observable<Integer> observable = Observable.just(1).lastOrDefault(2, new Function<Integer, Boolean>() { @Override public Boolean call(Integer t1) { return t1 % 2 == 0; } }); @SuppressWarnings("unchecked") Observer<Integer> observer = mock(Observer.class); observable.subscribe(observer); InOrder inOrder = inOrder(observer); inOrder.verify(observer, times(1)).onNext(2); inOrder.verify(observer, times(1)).onComplete(); inOrder.verifyNoMoreInteractions(); }
@Test public void testDelayWithObservableSourceThrows() { PublishSubject<Integer> source = PublishSubject.create(); final PublishSubject<Integer> delay = PublishSubject.create(); Function<Integer, Observable<Integer>> delayFunc = new Function<Integer, Observable<Integer>>() { @Override public Observable<Integer> call(Integer t1) { return delay; } }; @SuppressWarnings("unchecked") Observer<Object> o = mock(Observer.class); InOrder inOrder = inOrder(o); source.delay(delayFunc).subscribe(o); source.onNext(1); source.onError(new TestException()); delay.onNext(1); inOrder.verify(o).onError(any(TestException.class)); inOrder.verifyNoMoreInteractions(); verify(o, never()).onNext(any()); verify(o, never()).onComplete(); }
@Test public void leftDurationSelectorThrows() { PublishSubject<Integer> source1 = PublishSubject.create(); PublishSubject<Integer> source2 = PublishSubject.create(); Function<Integer, Observable<Integer>> fail = new Function<Integer, Observable<Integer>>() { @Override public Observable<Integer> call(Integer t1) { throw new RuntimeException("Forced failure"); } }; Observable<Observable<Integer>> m = source1.groupJoin(source2, fail, just(Observable.never()), add2); m.subscribe(observer); source1.onNext(1); verify(observer, times(1)).onError(any(Throwable.class)); verify(observer, never()).onComplete(); verify(observer, never()).onNext(any()); }
@Test public void rightDurationSelectorThrows() { PublishSubject<Integer> source1 = PublishSubject.create(); PublishSubject<Integer> source2 = PublishSubject.create(); Function<Integer, Observable<Integer>> fail = new Function<Integer, Observable<Integer>>() { @Override public Observable<Integer> call(Integer t1) { throw new RuntimeException("Forced failure"); } }; Observable<Observable<Integer>> m = source1.groupJoin(source2, just(Observable.never()), fail, add2); m.subscribe(observer); source2.onNext(1); verify(observer, times(1)).onError(any(Throwable.class)); verify(observer, never()).onComplete(); verify(observer, never()).onNext(any()); }
@Override public Observable<?> call(Observable<? extends Notification<?>> ts) { return ts.map(new Function<Notification<?>, Notification<?>>() { @Override public Notification<?> call(Notification<?> terminal) { return Notification.createOnNext(null); } }); }
@Test public void testToMapWithErrorThrowingFactory() { Observable<String> source = Observable.just("a", "bb", "ccc", "dddd"); Supplier<Map<Integer, String>> mapFactory = new Supplier<Map<Integer, String>>() { @Override public Map<Integer, String> call() { throw new RuntimeException("Forced failure"); } }; Function<String, Integer> lengthFunc = new Function<String, Integer>() { @Override public Integer call(String t1) { return t1.length(); } }; Observable<Map<Integer, String>> mapped = source.toMap(lengthFunc, UtilityFunctions.<String>identity(), mapFactory); Map<Integer, String> expected = new LinkedHashMap<Integer, String>(); expected.put(2, "bb"); expected.put(3, "ccc"); expected.put(4, "dddd"); mapped.subscribe(objectObserver); verify(objectObserver, never()).onNext(expected); verify(objectObserver, never()).onComplete(); verify(objectObserver, times(1)).onError(any(Throwable.class)); }
private OnSubscribeRedo(Observable<T> source, Function<? super Observable<? extends Notification<?>>, ? extends Observable<?>> f, boolean stopOnComplete, boolean stopOnError, Scheduler scheduler) { this.source = source; this.controlHandlerFunction = f; this.stopOnComplete = stopOnComplete; this.stopOnError = stopOnError; this.scheduler = scheduler; }
@Test public void testToMultimapWithThrowingCollectionFactory() { Observable<String> source = Observable.just("cc", "cc", "eee", "eee"); Function<Integer, Collection<String>> collectionFactory = new Function<Integer, Collection<String>>() { @Override public Collection<String> call(Integer t1) { if (t1 == 2) { throw new RuntimeException("Forced failure"); } else { return new HashSet<String>(); } } }; Observable<Map<Integer, Collection<String>>> mapped = source.toMultimap(lengthFunc, UtilityFunctions.<String>identity(), new DefaultToMultimapFactory<Integer, String>(), collectionFactory); Map<Integer, Collection<String>> expected = new HashMap<Integer, Collection<String>>(); expected.put(2, Arrays.asList("cc", "dd")); expected.put(3, Collections.singleton("eee")); mapped.subscribe(objectObserver); verify(objectObserver, times(1)).onError(any(Throwable.class)); verify(objectObserver, never()).onNext(expected); verify(objectObserver, never()).onComplete(); }
public OnSubscribeJoin( Observable<TLeft> left, Observable<TRight> right, Function<TLeft, Observable<TLeftDuration>> leftDurationSelector, Function<TRight, Observable<TRightDuration>> rightDurationSelector, BiFunction<TLeft, TRight, R> resultSelector) { this.left = left; this.right = right; this.leftDurationSelector = leftDurationSelector; this.rightDurationSelector = rightDurationSelector; this.resultSelector = resultSelector; }