private void setMockMode(boolean toggle) { if (toggle) { mockLocationSubscription = Observable.zip(locationProvider.mockLocation(mockLocationObservable), mockLocationObservable, new Func2<Status, Location, String>() { int count = 0; @Override public String call(Status result, Location location) { return new LocationToStringFunc().call(location) + " " + count++; } }) .subscribe(new DisplayTextOnViewAction(mockLocationView), new ErrorHandler()); } else { mockLocationSubscription.unsubscribe(); } }
@Override public Observable<?> call(Observable<? extends Throwable> observable) { return observable .zipWith(Observable.range(1, count + 1), new Func2<Throwable, Integer, Wrapper>() { @Override public Wrapper call(Throwable throwable, Integer integer) { //压缩规则 合并后的结果是一个Observable<Wrapper> return new Wrapper(throwable, integer); } }).flatMap(new Func1<Wrapper, Observable<?>>() { @Override public Observable<?> call(Wrapper wrapper) { //转换规则 if ((wrapper.throwable instanceof ConnectException || wrapper.throwable instanceof SocketTimeoutException || wrapper.throwable instanceof TimeoutException) && wrapper.index < count + 1) { //如果超出重试次数也抛出错误,否则默认是会进入onCompleted return Observable.timer(delay + (wrapper.index - 1) * increaseDelay, TimeUnit.MILLISECONDS); } return Observable.error(wrapper.throwable); } }); }
@Override public Observable<?> call(Observable<? extends Throwable> observable) { return observable .zipWith(Observable.range(1, count + 1), new Func2<Throwable, Integer, Wrapper>() { @Override public Wrapper call(Throwable throwable, Integer integer) { return new Wrapper(throwable, integer); } }).flatMap(new Func1<Wrapper, Observable<?>>() { @Override public Observable<?> call(Wrapper wrapper) { if ((wrapper.throwable instanceof ConnectException || wrapper.throwable instanceof SocketTimeoutException || wrapper.throwable instanceof TimeoutException) && wrapper.index < count + 1) { //如果超出重试次数也抛出错误,否则默认是会进入onCompleted return Observable.timer(delay + (wrapper.index - 1) * increaseDelay, TimeUnit.MILLISECONDS); } return Observable.error(wrapper.throwable); } }); }
@Override public Observable<?> call(Observable<? extends Throwable> observable) { return observable.zipWith(Observable.range(1, retryCount + 1), new Func2<Throwable, Integer, ExceptionWrapper>() { @Override public ExceptionWrapper call(Throwable throwable, Integer integer) { return new ExceptionWrapper(integer, throwable); } }).flatMap(new Func1<ExceptionWrapper, Observable<?>>() { @Override public Observable<?> call(ExceptionWrapper exceptionWrapper) { if ((exceptionWrapper.throwable instanceof ConnectException || exceptionWrapper.throwable instanceof SocketException || exceptionWrapper.throwable instanceof TimeoutException) && exceptionWrapper.index < retryCount + 1) { return Observable.timer(delayTime + (exceptionWrapper.index - 1) * delayTime, java.util.concurrent.TimeUnit.MILLISECONDS); } return Observable.error(exceptionWrapper.throwable); } }); }
@Override public Observable<Map<String, AppSetting>> getAppSettingsAsync() { return Observable.zip(listAppSettings(), listSlotConfigurations(), new Func2<StringDictionaryInner, SlotConfigNamesResourceInner, Map<String, AppSetting>>() { @Override public Map<String, AppSetting> call(final StringDictionaryInner appSettingsInner, final SlotConfigNamesResourceInner slotConfigs) { if (appSettingsInner == null || appSettingsInner.properties() == null) { return null; } return Maps.asMap(appSettingsInner.properties().keySet(), new Function<String, AppSetting>() { @Override public AppSetting apply(String input) { return new AppSettingImpl(input, appSettingsInner.properties().get(input), slotConfigs != null && slotConfigs.appSettingNames() != null && slotConfigs.appSettingNames().contains(input)); } }); } }); }
@Override public Observable<Map<String, ConnectionString>> getConnectionStringsAsync() { return Observable.zip(listConnectionStrings(), listSlotConfigurations(), new Func2<ConnectionStringDictionaryInner, SlotConfigNamesResourceInner, Map<String, ConnectionString>>() { @Override public Map<String, ConnectionString> call(final ConnectionStringDictionaryInner connectionStringsInner, final SlotConfigNamesResourceInner slotConfigs) { if (connectionStringsInner == null || connectionStringsInner.properties() == null) { return null; } return Maps.asMap(connectionStringsInner.properties().keySet(), new Function<String, ConnectionString>() { @Override public ConnectionString apply(String input) { return new ConnectionStringImpl(input, connectionStringsInner.properties().get(input), slotConfigs != null && slotConfigs.connectionStringNames() != null && slotConfigs.connectionStringNames().contains(input)); } }); } }); }
public Func2<AppReport, AppReport, Integer> getSortingFunction(SortBy sortBy) { switch (sortBy) { case NAME: return new SortByName(); case LIBRARY_COUNT: return new SortByLibraryCount(); case ANALYZED_AT: return new SortByAnalyzedAt(); case PERMISSION_COUNT: return new SortByPermissionCount(); default: throw new IllegalArgumentException("no sorting function for %s " + sortBy); } }
private void setupHooks() { RxJavaHooks.setOnObservableStart(new Func2<Observable, OnSubscribe, OnSubscribe>() { @Override public OnSubscribe call(Observable observable, OnSubscribe onSubscribe) { incrementActiveSubscriptionsCount(); return onSubscribe; } }); RxJavaHooks.setOnObservableSubscribeError(new Func1<Throwable, Throwable>() { @Override public Throwable call(Throwable throwable) { decrementActiveSubscriptionsCount(); return throwable; } }); RxJavaHooks.setOnObservableReturn(new Func1<Subscription, Subscription>() { @Override public Subscription call(Subscription subscription) { decrementActiveSubscriptionsCount(); return subscription; } }); }
public static <T> Observable<Boolean> sequenceEqual(Observable<? extends T> first, Observable<? extends T> second, final Func2<? super T, ? super T, Boolean> equality) { return Observable.zip(materializeLite(first), materializeLite(second), new Func2<Object, Object, Boolean>() { public Boolean call(Object t1, Object t2) { boolean c1; if (t1 == OperatorSequenceEqual.LOCAL_ONCOMPLETED) { c1 = true; } else { c1 = false; } boolean c2; if (t2 == OperatorSequenceEqual.LOCAL_ONCOMPLETED) { c2 = true; } else { c2 = false; } if (c1 && c2) { return Boolean.valueOf(true); } if (c1 || c2) { return Boolean.valueOf(false); } return (Boolean) equality.call(t1, t2); } }).all(UtilityFunctions.identity()); }
private Func2<ChanCatalog, List<HiddenThread>, ChanCatalog> hideThreads() { return new Func2<ChanCatalog, List<HiddenThread>, ChanCatalog>() { @Override public ChanCatalog call(ChanCatalog chanCatalog, List<HiddenThread> hiddenThreads) { if (chanCatalog != null) { List<ChanPost> posts = new ArrayList<>(); for (ChanPost post : chanCatalog.getPosts()) { boolean found = false; for (HiddenThread hiddenThread : hiddenThreads) { if (hiddenThread.threadId == post.getNo()) { found = true; } } if (!found) { posts.add(post); } } chanCatalog.setPosts(posts); } return chanCatalog; } }; }
@Override public Observable<Long> call(Observable<? extends Throwable> errorObservable) { return errorObservable.zipWith(Observable.range(INITIAL, maxConnectCount), new Func2<Throwable, Integer, InnerThrowable>() { @Override public InnerThrowable call(Throwable throwable, Integer i) { if (throwable instanceof IOException) return new InnerThrowable(throwable, i); return new InnerThrowable(throwable, i); } }).concatMap(new Func1<InnerThrowable, Observable<Long>>() { @Override public Observable<Long> call(InnerThrowable innerThrowable) { Integer currentCount = innerThrowable.getCurrentRetryCount(); if (RetryWhenFunc.this.maxConnectCount.equals(currentCount)) { return Observable.error(innerThrowable.getThrowable()); } /*use Schedulers#immediate() to keep on same thread */ return Observable.timer((long) Math.pow(2, currentCount), TimeUnit.SECONDS, Schedulers.immediate()); } }); }
@Override public void fetchDataByNetWork(final int newsID) { Observable<Comments> short_comments_service = RetrofitSingleton.getApiService((Activity) mCommentView).shortComment(newsID); Observable<Comments> long_comments_service = RetrofitSingleton.getApiService((Activity) mCommentView).longComment(newsID); Observable.zip(short_comments_service, long_comments_service, new Func2<Comments, Comments, Comments1>(){ @Override public Comments1 call(Comments comments, Comments comments2) { Comments1 comments1 = new Comments1(); comments1.long_comments = comments.comments; comments1.short_comments = comments2.comments; return comments1; } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(observer); }
@Override public Observable<List<Gank>> getObservable() { if (resId == -1) return null; return getDataSupports().getGankData(type, mCurrentPage, PAGE_SIZE) .map(new Func1<GankData, List<Gank>>() { @Override public List<Gank> call(GankData gankData) { return gankData.getResults(); } }) .flatMap(new Func1<List<Gank>, Observable<Gank>>() { @Override public Observable<Gank> call(List<Gank> ganks) { return Observable.from(ganks); } }) .toSortedList(new Func2<Gank, Gank, Integer>() { @Override public Integer call(Gank gank, Gank gank2) { return gank2.getPublishedAt().compareTo(gank.getPublishedAt()); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); }
public Single<AccountData> getAccountDetails() { return getAccounts() .flatMap(new Func1<AccountsResponse, Single<? extends AccountData>>() { @Override public Single<? extends AccountData> call(AccountsResponse accountsResponse) { if (!accountsResponse.accounts.isEmpty()) { String accountId = accountsResponse.accounts.get(0).id; return Single.zip(getBalance(accountId), getTransactions(accountId), new Func2<Balance, List<Transaction>, AccountData>() { @Override public AccountData call(Balance balance, List<Transaction> transactions) { return new AccountData(balance, transactions); } }); } return Single.just(new AccountData()); } }); }
private ApiPaginator( final @NonNull Observable<Void> nextPage, final @NonNull Observable<Params> startOverWith, final @NonNull Func1<Envelope, List<Data>> envelopeToListOfData, final @NonNull Func1<Params, Observable<Envelope>> loadWithParams, final @NonNull Func1<String, Observable<Envelope>> loadWithPaginationPath, final @NonNull Func1<Envelope, String> envelopeToMoreUrl, final @NonNull Func1<List<Data>, List<Data>> pageTransformation, final boolean clearWhenStartingOver, final @NonNull Func2<List<Data>, List<Data>, List<Data>> concater, final boolean distinctUntilChanged ) { this.nextPage = nextPage; this.startOverWith = startOverWith; this.envelopeToListOfData = envelopeToListOfData; this.loadWithParams = loadWithParams; this.envelopeToMoreUrl = envelopeToMoreUrl; this.pageTransformation = pageTransformation; this.loadWithPaginationPath = loadWithPaginationPath; this.clearWhenStartingOver = clearWhenStartingOver; this.concater = concater; this.distinctUntilChanged = distinctUntilChanged; this.paginatedData = this.startOverWith.switchMap(this::dataWithPagination); this.loadingPage = this.startOverWith.switchMap(__ -> nextPage.scan(1, (accum, ___) -> accum + 1)); }
private Observable<SearchResult[]> getQueryObservable(String query) { return mSearch.search(query) .flatMap(new Func1<Response, Observable<SearchResult[]>>() { @Override public Observable<SearchResult[]> call(Response response) { if (response.responseData == null) return Observable.error(new SearchException(response.responseDetails)); return Observable.just(response.responseData.results); } }) .retry(new Func2<Integer, Throwable, Boolean>() { @Override public Boolean call(Integer integer, Throwable throwable) { return throwable instanceof InterruptedIOException; } }); }
@Override public void loadData() { Subscription subscription = Observable.zip(V2exService.getInstance().getV2exApi().getTopicHot(), V2exService.getInstance().getV2exApi().getTopicLatest(), new Func2<List<Topics>, List<Topics>, TopicsData>() { @Override public TopicsData call(List<Topics> hotTopics, List<Topics> latestTopics) { setImagData(hotTopics); setImagData(latestTopics); TopicsData topicsData = new TopicsData(); topicsData.hotTopics = hotTopics; latestTopics.addAll(0 , hotTopics); topicsData.allTopics = latestTopics; return topicsData; } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(observer); addSubscription(subscription); }
private Observable example2() { // flatMapIterable(Func1, Func2) return Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9) .flatMapIterable( new Func1<Integer, Iterable<Integer>>() { @Override public Iterable<Integer> call(Integer integer) { ArrayList<Integer> s = new ArrayList<>(); for (int i = 0; i < integer; i++) { s.add(i); } return s; } }, new Func2<Integer, Integer, Integer>() { @Override public Integer call(Integer o, Integer o2) { return o + o2; } } ); }
public static Func2<List<Node>, Node, List<Node>> merge() { return (mergedNodes, node) -> { // TODO: we should be able to enrich nodes in a general way before merging, eg. convert all service names to lowercase // Aggregator specific logic should not come here, eg. removing the Eureka description Optional<Integer> nodeIndex = mergedNodes.stream() .filter(n -> n.getId().equalsIgnoreCase(node.getId())) .map(mergedNodes::indexOf) .findFirst(); if (nodeIndex.isPresent()) { logger.info("Node with id '{}' previously added, merging", node.getId()); mergedNodes.get(nodeIndex.get()).mergeWith(node); } else { logger.info("Node with id '{}' was not merged before, adding it to the list", node.getId()); mergedNodes.add(node); } return mergedNodes; }; }
@Override protected Observable<List<Airport>> createObservable() { copyLocalFileIfNone(); return AirportPersister.INSTANCE.getAirports() .lift(new FlattenOperator<Airport>()) .toSortedList(new Func2<Airport, Airport, Integer>() { @Override public Integer call(Airport airport1, Airport airport2) { return airport1.code.compareTo(airport2.code); } }); }
@OnClick(R.id.zipLoadBt) void loadData(){ mSwipeRefreshLayout.setRefreshing(true); unsubscribe(); mSubscription = Observable.zip(NetWorkService.getGankApi().getBeauties(200, 1).map(GankBeautyResultToItemsMapper.getInstance()), NetWorkService.getZbApi().search("装逼"), new Func2<List<Image>, List<Image>, List<Image>>() { @Override public List<Image> call(List<Image> images1, List<Image> images2) { List<Image> images = new ArrayList<Image>(); for(int i = 0; i < images1.size()/2 && i < images2.size(); i++){ images.add(images1.get(i * 2)); images.add(images1.get(i * 2 + 1)); images.add(images2.get(i)); } return images; } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(mObserver); }
public static <T, Resource> Observable<T> sort(Observable<T> source, final Comparator<T> comparator, final Func2<Observable<T>, Resource, Observable<Resource>> writer, final Func1<Resource, Observable<T>> reader, final Func0<Resource> resourceFactory, final Action1<Resource> resourceDisposer, int maxToSortInMemoryPerThread, final int maxTempResources, Scheduler scheduler) { Preconditions.checkArgument(maxToSortInMemoryPerThread > 0, "maxToSortInMemoryPerThread must be greater than 0"); Preconditions.checkArgument(maxTempResources >= 2, "maxTempResources must be at least 2"); return source // buffer into groups small enough to sort in memory .buffer(maxToSortInMemoryPerThread) // sort each buffer to a resource .flatMap(sortInMemoryAndWriteToAResource(comparator, writer, resourceFactory, scheduler)) // reduce by merging groups of resources to a single resource // once the resource count is maxTempResources .lift(new OperatorResourceMerger<Resource, T>(comparator, writer, reader, resourceFactory, resourceDisposer, maxTempResources)) // help out backpressure because ResourceMerger doesn't support // yet .onBackpressureBuffer() // emit the contents of the last file in the reduction process .flatMap(reader); }
@Test public void testCompareOperatorInitialValue() throws Exception { @SuppressWarnings("unchecked") rx.Observer<Integer> observer = mock(rx.Observer.class); Observable<Integer> observable = Observable.just(1, 2, 3); OperatorCompare<Integer, Integer> operatorCompare = new OperatorCompare<Integer, Integer>(10, new Func2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) { return integer + integer2; } }); observable.lift(operatorCompare).subscribe(observer); verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(11); verify(observer, times(1)).onNext(3); verify(observer, times(1)).onNext(5); verify(observer, times(1)).onCompleted(); }
@Test public void testCompareOperatorNoInitialValue() throws Exception { @SuppressWarnings("unchecked") rx.Observer<Integer> observer = mock(rx.Observer.class); Observable<Integer> observable = Observable.just(1, 2, 3); OperatorCompare<Integer, Integer> operatorCompare = new OperatorCompare<Integer, Integer>(new Func2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) { return integer + integer2; } }); observable.lift(operatorCompare).subscribe(observer); verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onNext(1); verify(observer, times(1)).onNext(3); verify(observer, times(1)).onNext(5); verify(observer, times(1)).onCompleted(); }
private Func1<TemperatureModel, Observable<TemperatureModel>> requestTemperature() { final Func2<String, Heating.TemperatureScale, Heating> request = Heating::requestTemperature; final Func2<OpenSession, TemperatureModel, TemperatureModel> handler = (openSession, temperature) -> { Heating.handleTemperature( value -> temperature.setValue(String.valueOf(value)), () -> temperature.setValue(null)) .call(openSession); return temperature; }; return temperature -> commonService.findClient(temperature.getGatewayUuid()) .send(request.call(temperature.getWhere(), preferenceService.getDefaultTemperatureScale())) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .map(openSession -> handler.call(openSession, temperature)) .onErrorReturn(throwable -> { log.warn("temperature={} | failing request={}", temperature.getUuid(), Heating.requestTemperature(temperature.getWhere())); // unreadable temperature return temperature; }); }
private Func1<SoundModel, Observable<SoundModel>> requestSound( Func3<String, SoundSystem.Type, SoundSystem.Source, SoundSystem> request, Func2<OpenSession, SoundModel, SoundModel> handler) { return sound -> commonService.findClient(sound.getGatewayUuid()) .send(request.call(sound.getWhere(), sound.getSoundSystemType(), sound.getSoundSystemSource())) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .map(openSession -> handler.call(openSession, sound)) .onErrorReturn(throwable -> { log.warn("sound={} | failing request={}", sound.getUuid(), request.call(sound.getWhere(), sound.getSoundSystemType(), sound.getSoundSystemSource()).getValue()); // unreadable status return sound; }); }
public static <T> boolean elementsEqual(List<T> list1, List<T> list2, Func2<T, T, Boolean> comparator) { if (list1 == null || list2 == null) { return false; } if (list1.size() != list2.size()) { return false; } for (int i = 0; i < list1.size(); i++) { if (!comparator.call(list1.get(i), list2.get(i))) { return false; } } return true; }
@Provides @PerActivity Observable<Integer> provideNotificationObservable(GameApi mGameApi, ForumApi mForumApi) { return Observable.zip(mGameApi.queryPmList(""), mForumApi.getMessageList("", 1), new Func2<PmData, MessageData, Integer>() { @Override public Integer call(PmData pmData, MessageData messageData) { int size = 0; if (pmData != null) { if (pmData.is_login == 0) { return null; } for (Pm pm : pmData.result.data) { if (!TextUtils.isEmpty(pm.unread) && pm.unread.equals("1")) { size++; } } } if (messageData != null && messageData.status == 200) { size += messageData.result.list.size(); } return size; } }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()); }
public Observable<List<Program>> syncMetaData() { Set<ProgramType> programTypes = new HashSet<>(); programTypes.add(ProgramType.WITHOUT_REGISTRATION); return Observable.zip(userOrganisationUnitInteractor.pull(), userProgramInteractor.pull(ProgramFields.DESCENDANTS, programTypes), new Func2<List<OrganisationUnit>, List<Program>, List<Program>>() { @Override public List<Program> call(List<OrganisationUnit> units, List<Program> programs) { if (syncDateWrapper != null) { syncDateWrapper.setLastSyncedNow(); } return programs; } }); }
private void findUsersWhoLovesBoth() { // here we are using zip operator to combine both request Observable.zip(getCricketFansObservable(), getFootballFansObservable(), new Func2<List<User>, List<User>, List<User>>() { @Override public List<User> call(List<User> cricketFans, List<User> footballFans) { List<User> userWhoLovesBoth = filterUserWhoLovesBoth(cricketFans, footballFans); return userWhoLovesBoth; } } ).subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<List<User>>() { @Override public void onCompleted() { // do anything onComplete } @Override public void onError(Throwable e) { // handle error } @Override public void onNext(List<User> users) { // do anything with user who loves both Log.d(TAG, "userList size : " + users.size()); for (User user : users) { Log.d(TAG, "id : " + user.id); Log.d(TAG, "firstname : " + user.firstname); Log.d(TAG, "lastname : " + user.lastname); } } }); }
public void runCode() { Observable obs1 = Observable.interval(1, TimeUnit.SECONDS); Observable obs2 = Observable.range(0,3); Plan0 plan0 = JoinObservable.from(obs1).and(obs2).then(new Func2() { @Override public Object call(Object o1, Object o2) { return o2; } }); JoinObservable.when(plan0).toObservable() .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<Integer>() { @Override public void onCompleted() { println("onCompleted"); } @Override public void onError(Throwable e) { println("onError"); } @Override public void onNext(Integer integer) { println("onNext:"+integer); } }); }
public void runCode() { Observable.just(1,2,3,4).reduce(new Func2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1*i2; } }) .subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { println("onCompleted"); } @Override public void onError(Throwable e) { } @Override public void onNext(Integer integer) { println("onNext:"+integer); } }); }
@Override public void start() { Utils.debugLog(TAG, "Preparing swap webserver."); sendBroadcast(SwapService.EXTRA_STARTING); if (FDroidApp.ipAddressString == null) { Log.e(TAG, "Not starting swap webserver, because we don't seem to be connected to a network."); setConnected(false); } Single.zip( Single.create(getWebServerTask()), Single.create(getBonjourTask()), new Func2<Boolean, Boolean, Boolean>() { @Override public Boolean call(Boolean webServerTask, Boolean bonjourServiceTask) { return bonjourServiceTask && webServerTask; } }) .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(Schedulers.newThread()) .subscribe(new Action1<Boolean>() { @Override public void call(Boolean success) { setConnected(success); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { setConnected(false); } }); }