private Observable example2() { return Observable.create(AsyncOnSubscribe.createStateful( new Func0<Integer>() { @Override public Integer call() { return 1; } }, new Func3<Integer, Long, Observer<Observable<? extends Integer>>, Integer>() { @Override public Integer call(Integer state, Long requested, Observer<Observable<? extends Integer>> observer) { if (state == 1) { Observable<Integer> o1 = Observable.just(1, 2, 3, 4) .delay(300, TimeUnit.MILLISECONDS); observer.onNext(o1); } else if (state == 2) { Observable<Integer> o = Observable.just(5, 6, 7, 8); observer.onNext(o); } else observer.onCompleted(); return state + 1; } })); }
private Func1<SoundModel, Observable<SoundModel>> requestSound( Func3<String, SoundSystem.Type, SoundSystem.Source, SoundSystem> request, Func2<OpenSession, SoundModel, SoundModel> handler) { return sound -> commonService.findClient(sound.getGatewayUuid()) .send(request.call(sound.getWhere(), sound.getSoundSystemType(), sound.getSoundSystemSource())) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .map(openSession -> handler.call(openSession, sound)) .onErrorReturn(throwable -> { log.warn("sound={} | failing request={}", sound.getUuid(), request.call(sound.getWhere(), sound.getSoundSystemType(), sound.getSoundSystemSource()).getValue()); // unreadable status return sound; }); }
@SuppressWarnings("unchecked") /* mock calls don't do generics */ @Test public void testCombineLatest3TypesA() { Func3<String, Integer, int[], String> combineLatestFunction = getConcatStringIntegerIntArrayCombineLatestFunction(); /* define a Observer to receive aggregated events */ Observer<String> observer = mock(Observer.class); Observable<String> w = Observable.combineLatest(Observable.just("one", "two"), Observable.just(2), Observable.just(new int[] { 4, 5, 6 }), combineLatestFunction); w.subscribe(observer); verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onComplete(); verify(observer, times(1)).onNext("two2[4, 5, 6]"); }
@SuppressWarnings("unchecked") /* mock calls don't do generics */ @Test public void testCombineLatest3TypesB() { Func3<String, Integer, int[], String> combineLatestFunction = getConcatStringIntegerIntArrayCombineLatestFunction(); /* define a Observer to receive aggregated events */ Observer<String> observer = mock(Observer.class); Observable<String> w = Observable.combineLatest(Observable.just("one"), Observable.just(2), Observable.just(new int[] { 4, 5, 6 }, new int[] { 7, 8 }), combineLatestFunction); w.subscribe(observer); verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onComplete(); verify(observer, times(1)).onNext("one2[4, 5, 6]"); verify(observer, times(1)).onNext("one2[7, 8]"); }
private Func3<String, String, String, String> getConcat3StringsCombineLatestFunction() { Func3<String, String, String, String> combineLatestFunction = new Func3<String, String, String, String>() { @Override public String call(String a1, String a2, String a3) { if (a1 == null) { a1 = ""; } if (a2 == null) { a2 = ""; } if (a3 == null) { a3 = ""; } return a1 + a2 + a3; } }; return combineLatestFunction; }
@Test public void test3SourcesOverload() { Observable<Integer> s1 = Observable.just(1); Observable<Integer> s2 = Observable.just(2); Observable<Integer> s3 = Observable.just(3); Observable<List<Integer>> result = Observable.combineLatest(s1, s2, s3, new Func3<Integer, Integer, Integer, List<Integer>>() { @Override public List<Integer> call(Integer t1, Integer t2, Integer t3) { return Arrays.asList(t1, t2, t3); } }); @SuppressWarnings("unchecked") Observer<Object> o = mock(Observer.class); result.subscribe(o); verify(o).onNext(Arrays.asList(1, 2, 3)); verify(o).onComplete(); verify(o, never()).onError(any(Throwable.class)); }
@SuppressWarnings("unchecked") /* mock calls don't do generics */ @Test public void testStart3Types() { Func3<String, Integer, int[], String> zipr = getConcatStringIntegerIntArrayZipr(); /* define a Observer to receive aggregated events */ Observer<String> observer = mock(Observer.class); Observable<String> w = Observable.zip(Observable.just("one", "two"), Observable.just(2), Observable.just(new int[] { 4, 5, 6 }), zipr); w.subscribe(observer); verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onComplete(); verify(observer, times(1)).onNext("one2[4, 5, 6]"); verify(observer, never()).onNext("two"); }
private Func3<String, String, String, String> getConcat3StringsZipr() { Func3<String, String, String, String> zipr = new Func3<String, String, String, String>() { @Override public String call(String a1, String a2, String a3) { if (a1 == null) { a1 = ""; } if (a2 == null) { a2 = ""; } if (a3 == null) { a3 = ""; } return a1 + a2 + a3; } }; return zipr; }
@Test public void testStateTransitionThrowsError() { final RuntimeException ex = new RuntimeException("boo"); Func0<Integer> initialState = Functions.constant0(1); Func3<Integer, Integer, Observer<Integer>, Integer> transition = new Func3<Integer, Integer, Observer<Integer>, Integer>() { @Override public Integer call(Integer collection, Integer t, Observer<Integer> observer) { throw ex; } }; Func2<Integer, Observer<Integer>, Boolean> completion = Functions.alwaysTrue2(); Transformer<Integer, Integer> transformer = Transformers.stateMachine(initialState, transition, completion); TestSubscriber<Integer> ts = new TestSubscriber<Integer>(); Observable.just(1, 1, 1).compose(transformer).subscribe(ts); ts.awaitTerminalEvent(); ts.assertError(ex); }
@Test public void testUnsubscriptionFromTransition() { Func0<Integer> initialState = Functions.constant0(1); Func3<Integer, Integer, Subscriber<Integer>, Integer> transition = new Func3<Integer, Integer, Subscriber<Integer>, Integer>() { @Override public Integer call(Integer collection, Integer t, Subscriber<Integer> subscriber) { subscriber.onNext(123); subscriber.unsubscribe(); return 1; } }; Func2<Integer, Observer<Integer>, Boolean> completion = Functions.alwaysTrue2(); Transformer<Integer, Integer> transformer = Transformers.stateMachine(initialState, transition, completion); TestSubscriber<Integer> ts = new TestSubscriber<Integer>(); Observable.just(1, 1, 1).repeat().compose(transformer).subscribe(ts); ts.assertValue(123); ts.assertCompleted(); ts.assertUnsubscribed(); }
@Test public void testForCompletionWithinStateMachine() { Func0<Integer> initialState = Functions.constant0(1); Func3<Integer, Integer, Subscriber<Integer>, Integer> transition = new Func3<Integer, Integer, Subscriber<Integer>, Integer>() { @Override public Integer call(Integer collection, Integer t, Subscriber<Integer> subscriber) { subscriber.onNext(123); // complete from within transition subscriber.onCompleted(); return 1; } }; Func2<? super Integer, ? super Subscriber<Integer>, Boolean> completion = Functions .alwaysTrue2(); Transformer<Integer, Integer> transformer = Transformers.stateMachine(initialState, transition, completion); TestSubscriber<Integer> ts = new TestSubscriber<Integer>(); final AtomicInteger count = new AtomicInteger(0); Observable.just(1, 2, 3).doOnNext(Actions.increment1(count)).compose(transformer) .subscribe(ts); ts.assertValues(123); ts.assertCompleted(); assertEquals(1, count.get()); }
@Experimental public static <S, T> Observable$OnSubscribe<T> createSingleState(Func0<? extends S> generator, final Action3<? super S, Long, ? super Observer<Observable<? extends T>>> next) { return new AsyncOnSubscribeImpl((Func0) generator, new Func3<S, Long, Observer<Observable<? extends T>>, S>() { public S call(S state, Long requested, Observer<Observable<? extends T>> subscriber) { next.call(state, requested, subscriber); return state; } }); }
@Experimental public static <S, T> Observable$OnSubscribe<T> createSingleState(Func0<? extends S> generator, final Action3<? super S, Long, ? super Observer<Observable<? extends T>>> next, Action1<? super S> onUnsubscribe) { return new AsyncOnSubscribeImpl(generator, new Func3<S, Long, Observer<Observable<? extends T>>, S>() { public S call(S state, Long requested, Observer<Observable<? extends T>> subscriber) { next.call(state, requested, subscriber); return state; } }, onUnsubscribe); }
@Experimental public static <T> Observable$OnSubscribe<T> createStateless(final Action2<Long, ? super Observer<Observable<? extends T>>> next) { return new AsyncOnSubscribeImpl(new Func3<Void, Long, Observer<Observable<? extends T>>, Void>() { public Void call(Void state, Long requested, Observer<Observable<? extends T>> subscriber) { next.call(requested, subscriber); return state; } }); }
@Experimental public static <T> Observable$OnSubscribe<T> createStateless(final Action2<Long, ? super Observer<Observable<? extends T>>> next, final Action0 onUnsubscribe) { return new AsyncOnSubscribeImpl(new Func3<Void, Long, Observer<Observable<? extends T>>, Void>() { public Void call(Void state, Long requested, Observer<Observable<? extends T>> subscriber) { next.call(requested, subscriber); return null; } }, new Action1<Void>() { public void call(Void t) { onUnsubscribe.call(); } }); }
private boolean simpleFind(CodeArea area, String text, int offset) { Func3<String, String, Integer, Integer> func = caseSensitive.isSelected() ? String::indexOf : StringUtil::indexOfIgnoreCase; int index = func.call(area.getText(), text, offset); if (index != -1) { area.selectRange(index, index + text.length()); return true; } return false; }
@RxLogObservable public Observable<Void> drop() { Observable<Object> serverObserver = storIOSQLite .executeSQL() .withQuery(ServersTable.QUERY_DROP) .prepare() .asRxObservable(); Observable<Object> usersObserver = storIOSQLite .executeSQL() .withQuery(UsersTable.QUERY_DROP) .prepare() .asRxObservable(); Observable<Object> devicesObserver = storIOSQLite .executeSQL() .withQuery(DevicesTable.QUERY_DROP) .prepare() .asRxObservable(); return Observable.zip( serverObserver, usersObserver, devicesObserver, new Func3<Object, Object, Object, Void>() { @Override public Void call(Object o, Object o2, Object o3) { return null; } } ) .onErrorReturn(throwable -> null); }
public void getHomeData(final User user) { Observable.zip( mUserBll.getInviteCount(user), mUserBll.getAcceptInvitedCount(user), mUserBll.getNewlyDynamic(user, DYNAMIC_COUNT), new Func3<Integer, Integer, List<Invitation>, PersonalCenterHomeBean>() { @Override public PersonalCenterHomeBean call(Integer inviteCount, Integer acceptInviteCount, List<Invitation> invitations) { PersonalCenterHomeBean data = new PersonalCenterHomeBean(); data.setUser(user); data.setBeInvitedCount(acceptInviteCount); data.setInviteCount(inviteCount); data.setNewlyDynamic(invitations); return data; } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<PersonalCenterHomeBean>() { @Override public void call(PersonalCenterHomeBean personalCenterHomeBean) { mFragment.getDataSuccess(personalCenterHomeBean); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { throwable.printStackTrace(); mFragment.getDataFail("数据请求失败"); } }); }
@Override protected void onHandleIntent(Intent intent) { subscription.add(Observable.zip(Injection.provideWorkoutsRepository().findAll(), Injection.provideWorkoutsRepository().findMyWorkouts(), Injection.provideWorkoutsRepository().findHistoriesWorkouts(), new Func3<List<Workout>, List<Workout>, List<Workout>, List<Workout>>() { @Override public List<Workout> call(List<Workout> workouts, List<Workout> workouts2, List<Workout> workouts3) { return workouts; } }) .subscribe()); }
private TransformerRxJsonGson(TransformerRxJson transformerRxJson, OperatorJsonGson operatorJsonGson, Gson gson, Func3<JsonPath, JsonElement, Gson, T> converter) { this.transformerRxJson = transformerRxJson; this.operatorJsonGson = operatorJsonGson; this.gson = gson; this.converter = converter; }
/** * Gets a single comm link * * @param id the comm link id. Note this is not the SQLite id */ @Override public void getCommLink(Long id) { final RxAction action = newRxAction(GET_COMM_LINK, Keys.COMM_LINK_ID, id); if (hasRxAction(action)) return; Observable<CommLinkModel> commLinkObservable = CommLinkApiService.Factory.getInstance().getCommLink(Secrets.GT_API_SECRET, id); Observable<List<Wrapper>> wrappersObservable = CommLinkWrapperApiService.Factory.getInstance().getCommLinkWrappers(id); Observable<Favorite> favoriteObservable = getFavoritesInternal(Favorite.TYPE_COMM_LINK, String.valueOf(id)); Observable.zip(commLinkObservable, wrappersObservable, favoriteObservable, (Func3<CommLinkModel, List<Wrapper>, Favorite, Object>) (commLinkModel, wrappers1, favorite) -> { for (Wrapper wrapper : wrappers1) { if (wrapper.getContentBlock2() != null && wrapper.getContentBlock2().headerImageType == ContentBlock2.TYPE_SLIDESHOW) { int size = wrapper.getContentBlock2().getHeaderImages().size(); wrapper.getContentBlock2().getHeaderImages().remove(size - 1); wrapper.getContentBlock2().getHeaderImages().remove(0); } } if (favorite != null) { commLinkModel.favorite = true; } commLinkModel.wrappers = wrappers1; action.getData().put(Keys.COMM_LINK, commLinkModel); postRxAction(action); return commLinkModel; }); }
private Observable example2() { return Observable.interval(500, TimeUnit.MILLISECONDS).take(10) .withLatestFrom( Observable.interval(300, TimeUnit.MILLISECONDS).take(5), Observable.just("One", "Two", "Three", "Four"), new Func3<Long, Long, String, String>() { @Override public String call(Long aLong1, Long aLong2, String string) { return "result : along1 = " + aLong1 + ", aLong2 = " + aLong2 + ", string = " + string; } }); }
/** * Creates an OperatorScan instance which will replace the first element of a bivalue with the * scan accumulator value. * * @param seed * @param func * @return */ public static <T0, T1, R0> BiOperator<R0, T1, T0, T1> bi1Operator(final R0 seed, final Func3<R0, ? super T0, ? super T1, R0> func) { return new OperatorScan<R0, T1, T0, T1>(new BaseBiSubscriber<R0, T1, T0, T1>() { R0 accum = seed; @Override protected void _onNext(T0 t0, T1 t1) { accum = func.call(accum, t0, t1); childOnNext(accum, t1); } }); }
/** * Creates an OperatorScan instance which will replace the first element of a bivalue with the * scan accumulator value. * * @param func * @return */ public static <T0, T1> BiOperator<T0, T1, T0, T1> bi1Operator(final Func3<T0, ? super T0, ? super T1, T0> func) { return new OperatorScan<T0, T1, T0, T1>(new BaseBiSubscriber<T0, T1, T0, T1>() { T0 accum; boolean isFirst = true; @Override protected void _onNext(T0 t0, T1 t1) { accum = !isFirst ? func.call(accum, t0, t1) : t0; isFirst = false; childOnNext(accum, t1); } }); }
/** * Creates an OperatorScan instance which will replace the second element of a bivalue with the * scan accumulator value. * * @param seed * @param func * @return */ public static <T0, T1, R1> BiOperator<T0, R1, T0, T1> bi2Operator(final R1 seed, final Func3<R1, ? super T0, ? super T1, R1> func) { return new OperatorScan<T0, R1, T0, T1>(new BaseBiSubscriber<T0, R1, T0, T1>() { R1 accum = seed; @Override protected void _onNext(T0 t0, T1 t1) { accum = func.call(accum, t0, t1); childOnNext(t0, accum); } }); }
/** * Creates an OperatorScan instance which will replace the second element of a bivalue with the * scan accumulator value. * * @param func * @return */ public static <T0, T1> BiOperator<T0, T1, T0, T1> bi2Operator(final Func3<T1, ? super T0, ? super T1, T1> func) { return new OperatorScan<T0, T1, T0, T1>(new BaseBiSubscriber<T0, T1, T0, T1>() { T1 accum; boolean isFirst = true; @Override protected void _onNext(T0 t0, T1 t1) { accum = !isFirst ? func.call(accum, t0, t1) : t1; isFirst = false; childOnNext(t0, accum); } }); }
private void tryZipObservables() { items.clear(); Observable<MockData> observable1 = ObservableCreator.createFromListObservable(); Observable<MockData> observable2 = Observable.just(new MockData("a"), new MockData("b"), new MockData("c")); Observable<MockData> observable3 = Observable.just(new MockData(","), new MockData("."), new MockData("?"), new MockData(":"), new MockData("+")); Observable.zip(observable1, observable2, observable3, new Func3<MockData, MockData, MockData, String>() { @Override public String call(MockData mockData, MockData mockData2, MockData mockData3) { return mockData.getName() + mockData2.getName() + mockData3.getName(); } }).subscribe(new Subscriber<String>() { @Override public void onCompleted() { gankAdapter.notifyDataSetChanged(); } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { items.add(new MockData(s)); } }); // result will show: "1a,", "2b.", "3c?", Only three values because least observable only has 3 values. }
private Func3<String, Integer, int[], String> getConcatStringIntegerIntArrayCombineLatestFunction() { Func3<String, Integer, int[], String> combineLatestFunction = new Func3<String, Integer, int[], String>() { @Override public String call(String s, Integer i, int[] iArray) { return getStringValue(s) + getStringValue(i) + getStringValue(iArray); } }; return combineLatestFunction; }
private Func3<String, Integer, int[], String> getConcatStringIntegerIntArrayZipr() { Func3<String, Integer, int[], String> zipr = new Func3<String, Integer, int[], String>() { @Override public String call(String s, Integer i, int[] iArray) { return getStringValue(s) + getStringValue(i) + getStringValue(iArray); } }; return zipr; }
public static <State, In, Out> Transformer<In, Out> stateMachine( Func0<State> initialStateFactory, Func3<? super State, ? super In, ? super Subscriber<Out>, ? extends State> transition, Func2<? super State, ? super Subscriber<Out>, Boolean> completion, BackpressureStrategy backpressureStrategy, int initialRequest) { return TransformerStateMachine.<State, In, Out> create(initialStateFactory, transition, completion, backpressureStrategy, initialRequest); }
private TransformerStateMachine(Func0<? extends State> initialState, Func3<? super State, ? super In, ? super Subscriber<Out>, ? extends State> transition, Func2<? super State, ? super Subscriber<Out>, Boolean> completion, BackpressureStrategy backpressureStrategy, int initialRequest) { Preconditions.checkNotNull(initialState); Preconditions.checkNotNull(transition); Preconditions.checkNotNull(completion); Preconditions.checkNotNull(backpressureStrategy); Preconditions.checkArgument(initialRequest > 0, "initialRequest must be greater than zero"); this.initialState = initialState; this.transition = transition; this.completion = completion; this.backpressureStrategy = backpressureStrategy; this.initialRequest = initialRequest; }
public static <State, In, Out> Transformer<In, Out> create(Func0<? extends State> initialState, Func3<? super State, ? super In, ? super Subscriber<Out>, ? extends State> transition, Func2<? super State, ? super Subscriber<Out>, Boolean> completion, BackpressureStrategy backpressureStrategy, int initialRequest) { return new TransformerStateMachine<State, In, Out>(initialState, transition, completion, backpressureStrategy, initialRequest); }
public static <T1, T2, T3, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Func3<? super T1, ? super T2, ? super T3, ? extends R> combineFunction) { return combineLatest(Arrays.asList(new Observable[]{o1, o2, o3}), Functions.fromFunc(combineFunction)); }
public static <T1, T2, T3, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Func3<? super T1, ? super T2, ? super T3, ? extends R> zipFunction) { return just(new Observable[]{o1, o2, o3}).lift(new OperatorZip(zipFunction)); }
public static final <T1, T2, T3, R> Single<R> zip(Single<? extends T1> o1, Single<? extends T2> o2, Single<? extends T3> o3, Func3<? super T1, ? super T2, ? super T3, ? extends R> zipFunction) { return just(new Observable[]{asObservable(o1), asObservable(o2), asObservable(o3)}).lift(new OperatorZip((Func3) zipFunction)); }
public OperatorZip(Func3 f) { this.zipFunction = Functions.fromFunc(f); }
public static final <T1, T2, T3, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Func3<? super T1, ? super T2, ? super T3, ? extends R> combineFunction) { return combineLatest(Arrays.asList(new Observable[]{o1, o2, o3}), Functions.fromFunc(combineFunction)); }
public static final <T1, T2, T3, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Func3<? super T1, ? super T2, ? super T3, ? extends R> zipFunction) { return just(new Observable[]{o1, o2, o3}).lift(new OperatorZip(zipFunction)); }
private AsyncOnSubscribeImpl(Func0<? extends S> generator, Func3<? super S, Long, ? super Observer<Observable<? extends T>>, ? extends S> next, Action1<? super S> onUnsubscribe) { this.generator = generator; this.next = next; this.onUnsubscribe = onUnsubscribe; }