Java 类rx.functions.Func2 实例源码

项目:GitHub    文件:MockLocationsActivity.java   
private void setMockMode(boolean toggle) {
    if (toggle) {
        mockLocationSubscription =
                Observable.zip(locationProvider.mockLocation(mockLocationObservable),
                        mockLocationObservable, new Func2<Status, Location, String>() {
                            int count = 0;

                            @Override
                            public String call(Status result, Location location) {
                                return new LocationToStringFunc().call(location) + " " + count++;
                            }
                        })
                        .subscribe(new DisplayTextOnViewAction(mockLocationView), new ErrorHandler());
    } else {
        mockLocationSubscription.unsubscribe();
    }
}
项目:Bailan    文件:RetryWhenNetworkException.java   
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
    return observable
            .zipWith(Observable.range(1, count + 1), new Func2<Throwable, Integer, Wrapper>() {
                @Override
                public Wrapper call(Throwable throwable, Integer integer) {
                    //压缩规则 合并后的结果是一个Observable<Wrapper>
                    return new Wrapper(throwable, integer);
                }
            }).flatMap(new Func1<Wrapper, Observable<?>>() {
                @Override
                public Observable<?> call(Wrapper wrapper) {
                    //转换规则
                    if ((wrapper.throwable instanceof ConnectException
                            || wrapper.throwable instanceof SocketTimeoutException
                            || wrapper.throwable instanceof TimeoutException)
                            && wrapper.index < count + 1) { //如果超出重试次数也抛出错误,否则默认是会进入onCompleted
                        return Observable.timer(delay + (wrapper.index - 1) * increaseDelay, TimeUnit.MILLISECONDS);

                    }
                    return Observable.error(wrapper.throwable);
                }
            });
}
项目:RxRetrofit-tokean    文件:RetryWhenNetworkException.java   
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
    return observable
            .zipWith(Observable.range(1, count + 1), new Func2<Throwable, Integer, Wrapper>() {
                @Override
                public Wrapper call(Throwable throwable, Integer integer) {
                    return new Wrapper(throwable, integer);
                }
            }).flatMap(new Func1<Wrapper, Observable<?>>() {
                @Override
                public Observable<?> call(Wrapper wrapper) {
                    if ((wrapper.throwable instanceof ConnectException
                            || wrapper.throwable instanceof SocketTimeoutException
                            || wrapper.throwable instanceof TimeoutException)
                            && wrapper.index < count + 1) { //如果超出重试次数也抛出错误,否则默认是会进入onCompleted
                        return Observable.timer(delay + (wrapper.index - 1) * increaseDelay, TimeUnit.MILLISECONDS);

                    }
                    return Observable.error(wrapper.throwable);
                }
            });
}
项目:TestChat    文件:RetryWhenNetWorkException.java   
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {

        return observable.zipWith(Observable.range(1, retryCount + 1), new Func2<Throwable, Integer, ExceptionWrapper>() {
                @Override
                public ExceptionWrapper call(Throwable throwable, Integer integer) {
                        return new ExceptionWrapper(integer, throwable);
                }
        }).flatMap(new Func1<ExceptionWrapper, Observable<?>>() {
                @Override
                public Observable<?> call(ExceptionWrapper exceptionWrapper) {
                        if ((exceptionWrapper.throwable instanceof ConnectException ||
                                exceptionWrapper.throwable instanceof SocketException ||
                                exceptionWrapper.throwable instanceof TimeoutException) && exceptionWrapper.index < retryCount + 1) {
                                return Observable.timer(delayTime + (exceptionWrapper.index - 1) * delayTime, java.util.concurrent.TimeUnit.MILLISECONDS);
                        }
                        return Observable.error(exceptionWrapper.throwable);
                }
        });
}
项目:azure-libraries-for-java    文件:WebAppBaseImpl.java   
@Override
public Observable<Map<String, AppSetting>> getAppSettingsAsync() {
    return Observable.zip(listAppSettings(), listSlotConfigurations(), new Func2<StringDictionaryInner, SlotConfigNamesResourceInner, Map<String, AppSetting>>() {
        @Override
        public Map<String, AppSetting> call(final StringDictionaryInner appSettingsInner, final SlotConfigNamesResourceInner slotConfigs) {
            if (appSettingsInner == null || appSettingsInner.properties() == null) {
                return null;
            }
            return Maps.asMap(appSettingsInner.properties().keySet(), new Function<String, AppSetting>() {
                @Override
                public AppSetting apply(String input) {
                    return new AppSettingImpl(input, appSettingsInner.properties().get(input),
                            slotConfigs != null && slotConfigs.appSettingNames() != null && slotConfigs.appSettingNames().contains(input));
                }
            });
        }
    });
}
项目:azure-libraries-for-java    文件:WebAppBaseImpl.java   
@Override
public Observable<Map<String, ConnectionString>> getConnectionStringsAsync() {
    return Observable.zip(listConnectionStrings(), listSlotConfigurations(), new Func2<ConnectionStringDictionaryInner, SlotConfigNamesResourceInner, Map<String, ConnectionString>>() {
        @Override
        public Map<String, ConnectionString> call(final ConnectionStringDictionaryInner connectionStringsInner, final SlotConfigNamesResourceInner slotConfigs) {
            if (connectionStringsInner == null || connectionStringsInner.properties() == null) {
                return null;
            }
            return Maps.asMap(connectionStringsInner.properties().keySet(), new Function<String, ConnectionString>() {
                @Override
                public ConnectionString apply(String input) {
                    return new ConnectionStringImpl(input, connectionStringsInner.properties().get(input),
                            slotConfigs != null && slotConfigs.connectionStringNames() != null && slotConfigs.connectionStringNames().contains(input));
                }
            });
        }
    });
}
项目:disclosure-android-app    文件:AppService.java   
public Func2<AppReport, AppReport, Integer> getSortingFunction(SortBy sortBy) {
  switch (sortBy) {
    case NAME:
      return new SortByName();
    case LIBRARY_COUNT:
      return new SortByLibraryCount();
    case ANALYZED_AT:
      return new SortByAnalyzedAt();
    case PERMISSION_COUNT:
      return new SortByPermissionCount();
    default:
      throw new IllegalArgumentException("no sorting function for %s " + sortBy);
  }
}
项目:GongXianSheng    文件:RxIdlingResource.java   
private void setupHooks() {
    RxJavaHooks.setOnObservableStart(new Func2<Observable, OnSubscribe, OnSubscribe>() {
        @Override
        public OnSubscribe call(Observable observable, OnSubscribe onSubscribe) {
            incrementActiveSubscriptionsCount();
            return onSubscribe;
        }
    });

    RxJavaHooks.setOnObservableSubscribeError(new Func1<Throwable, Throwable>() {
        @Override
        public Throwable call(Throwable throwable) {
            decrementActiveSubscriptionsCount();
            return throwable;
        }
    });

    RxJavaHooks.setOnObservableReturn(new Func1<Subscription, Subscription>() {
        @Override
        public Subscription call(Subscription subscription) {
            decrementActiveSubscriptionsCount();
            return subscription;
        }
    });
}
项目:boohee_v5.6    文件:OperatorSequenceEqual.java   
public static <T> Observable<Boolean> sequenceEqual(Observable<? extends T> first, Observable<? extends T> second, final Func2<? super T, ? super T, Boolean> equality) {
    return Observable.zip(materializeLite(first), materializeLite(second), new Func2<Object, Object, Boolean>() {
        public Boolean call(Object t1, Object t2) {
            boolean c1;
            if (t1 == OperatorSequenceEqual.LOCAL_ONCOMPLETED) {
                c1 = true;
            } else {
                c1 = false;
            }
            boolean c2;
            if (t2 == OperatorSequenceEqual.LOCAL_ONCOMPLETED) {
                c2 = true;
            } else {
                c2 = false;
            }
            if (c1 && c2) {
                return Boolean.valueOf(true);
            }
            if (c1 || c2) {
                return Boolean.valueOf(false);
            }
            return (Boolean) equality.call(t1, t2);
        }
    }).all(UtilityFunctions.identity());
}
项目:mimi-reader    文件:PostItemsListFragment.java   
private Func2<ChanCatalog, List<HiddenThread>, ChanCatalog> hideThreads() {
    return new Func2<ChanCatalog, List<HiddenThread>, ChanCatalog>() {
        @Override
        public ChanCatalog call(ChanCatalog chanCatalog, List<HiddenThread> hiddenThreads) {
            if (chanCatalog != null) {
                List<ChanPost> posts = new ArrayList<>();
                for (ChanPost post : chanCatalog.getPosts()) {
                    boolean found = false;
                    for (HiddenThread hiddenThread : hiddenThreads) {
                        if (hiddenThread.threadId == post.getNo()) {
                            found = true;
                        }
                    }

                    if (!found) {
                        posts.add(post);
                    }
                }

                chanCatalog.setPosts(posts);

            }
            return chanCatalog;
        }
    };
}
项目:HttpService    文件:RxJavaCallAdapterFactory.java   
@Override public Observable<Long> call(Observable<? extends Throwable> errorObservable) {
  return errorObservable.zipWith(Observable.range(INITIAL, maxConnectCount),
      new Func2<Throwable, Integer, InnerThrowable>() {

        @Override public InnerThrowable call(Throwable throwable, Integer i) {
          if (throwable instanceof IOException) return new InnerThrowable(throwable, i);
          return new InnerThrowable(throwable, i);
        }
      }).concatMap(new Func1<InnerThrowable, Observable<Long>>() {
    @Override public Observable<Long> call(InnerThrowable innerThrowable) {

      Integer currentCount = innerThrowable.getCurrentRetryCount();
      if (RetryWhenFunc.this.maxConnectCount.equals(currentCount)) {
        return Observable.error(innerThrowable.getThrowable());
      }

      /*use Schedulers#immediate() to keep on same thread */
      return Observable.timer((long) Math.pow(2, currentCount), TimeUnit.SECONDS,
          Schedulers.immediate());
    }
  });
}
项目:zhihudailysoap    文件:CommentPresenter.java   
@Override
public void fetchDataByNetWork(final int newsID) {
    Observable<Comments> short_comments_service = RetrofitSingleton.getApiService((Activity) mCommentView).shortComment(newsID);
    Observable<Comments> long_comments_service = RetrofitSingleton.getApiService((Activity) mCommentView).longComment(newsID);
    Observable.zip(short_comments_service, long_comments_service, new Func2<Comments, Comments, Comments1>(){

        @Override
        public Comments1 call(Comments comments, Comments comments2) {
            Comments1 comments1 = new Comments1();
            comments1.long_comments = comments.comments;
            comments1.short_comments = comments2.comments;
            return comments1;
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer);
}
项目:yApp    文件:GankPresenter.java   
@Override
public Observable<List<Gank>> getObservable() {
    if (resId == -1) return null;

    return getDataSupports().getGankData(type, mCurrentPage, PAGE_SIZE)
            .map(new Func1<GankData, List<Gank>>() {
                @Override
                public List<Gank> call(GankData gankData) {
                    return gankData.getResults();
                }
            })
            .flatMap(new Func1<List<Gank>, Observable<Gank>>() {
                @Override
                public Observable<Gank> call(List<Gank> ganks) {
                    return Observable.from(ganks);
                }
            })
            .toSortedList(new Func2<Gank, Gank, Integer>() {
                @Override
                public Integer call(Gank gank, Gank gank2) {
                    return gank2.getPublishedAt().compareTo(gank.getPublishedAt());
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}
项目:Mondroid    文件:DataManager.java   
public Single<AccountData> getAccountDetails() {
    return getAccounts()
            .flatMap(new Func1<AccountsResponse, Single<? extends AccountData>>() {
                @Override
                public Single<? extends AccountData> call(AccountsResponse accountsResponse) {
                    if (!accountsResponse.accounts.isEmpty()) {
                        String accountId = accountsResponse.accounts.get(0).id;
                        return Single.zip(getBalance(accountId), getTransactions(accountId),
                                new Func2<Balance, List<Transaction>, AccountData>() {
                            @Override
                            public AccountData call(Balance balance,
                                                    List<Transaction> transactions) {
                                return new AccountData(balance, transactions);
                            }
                        });
                    }
                    return Single.just(new AccountData());
                }
            });
}
项目:android-oss    文件:ApiPaginator.java   
private ApiPaginator(
  final @NonNull Observable<Void> nextPage,
  final @NonNull Observable<Params> startOverWith,
  final @NonNull Func1<Envelope, List<Data>> envelopeToListOfData,
  final @NonNull Func1<Params, Observable<Envelope>> loadWithParams,
  final @NonNull Func1<String, Observable<Envelope>> loadWithPaginationPath,
  final @NonNull Func1<Envelope, String> envelopeToMoreUrl,
  final @NonNull Func1<List<Data>, List<Data>> pageTransformation,
  final boolean clearWhenStartingOver,
  final @NonNull Func2<List<Data>, List<Data>, List<Data>> concater,
  final boolean distinctUntilChanged
) {
  this.nextPage = nextPage;
  this.startOverWith = startOverWith;
  this.envelopeToListOfData = envelopeToListOfData;
  this.loadWithParams = loadWithParams;
  this.envelopeToMoreUrl = envelopeToMoreUrl;
  this.pageTransformation = pageTransformation;
  this.loadWithPaginationPath = loadWithPaginationPath;
  this.clearWhenStartingOver = clearWhenStartingOver;
  this.concater = concater;
  this.distinctUntilChanged = distinctUntilChanged;

  this.paginatedData = this.startOverWith.switchMap(this::dataWithPagination);
  this.loadingPage = this.startOverWith.switchMap(__ -> nextPage.scan(1, (accum, ___) -> accum + 1));
}
项目:FloatingSearchView    文件:GoogleSearchController.java   
private Observable<SearchResult[]> getQueryObservable(String query) {
    return mSearch.search(query)
            .flatMap(new Func1<Response, Observable<SearchResult[]>>() {
                @Override
                public Observable<SearchResult[]> call(Response response) {
                    if (response.responseData == null)
                        return Observable.error(new SearchException(response.responseDetails));
                    return Observable.just(response.responseData.results);
                }
            })
            .retry(new Func2<Integer, Throwable, Boolean>() {
                @Override
                public Boolean call(Integer integer, Throwable throwable) {
                    return throwable instanceof InterruptedIOException;
                }
            });
}
项目:RxjavaRetrofitDemo-string-master    文件:RetryWhenNetworkException.java   
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
    return observable
            .zipWith(Observable.range(1, count + 1), new Func2<Throwable, Integer, Wrapper>() {
                @Override
                public Wrapper call(Throwable throwable, Integer integer) {
                    return new Wrapper(throwable, integer);
                }
            }).flatMap(new Func1<Wrapper, Observable<?>>() {
                @Override
                public Observable<?> call(Wrapper wrapper) {
                    if ((wrapper.throwable instanceof ConnectException
                            || wrapper.throwable instanceof SocketTimeoutException
                            || wrapper.throwable instanceof TimeoutException)
                            && wrapper.index < count + 1) { //如果超出重试次数也抛出错误,否则默认是会进入onCompleted
                        return Observable.timer(delay + (wrapper.index - 1) * increaseDelay, TimeUnit.MILLISECONDS);

                    }
                    return Observable.error(wrapper.throwable);
                }
            });
}
项目:RxjavaRetrofitDemo-master    文件:RetryWhenNetworkException.java   
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
    return observable
            .zipWith(Observable.range(1, count + 1), new Func2<Throwable, Integer, Wrapper>() {
                @Override
                public Wrapper call(Throwable throwable, Integer integer) {
                    return new Wrapper(throwable, integer);
                }
            }).flatMap(new Func1<Wrapper, Observable<?>>() {
                @Override
                public Observable<?> call(Wrapper wrapper) {
                    if ((wrapper.throwable instanceof ConnectException
                            || wrapper.throwable instanceof SocketTimeoutException
                            || wrapper.throwable instanceof TimeoutException)
                            && wrapper.index < count + 1) { //如果超出重试次数也抛出错误,否则默认是会进入onCompleted
                        return Observable.timer(delay + (wrapper.index - 1) * increaseDelay, TimeUnit.MILLISECONDS);

                    }
                    return Observable.error(wrapper.throwable);
                }
            });
}
项目:collapselrecycler    文件:RetryWhenNetworkException.java   
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
    return observable
            .zipWith(Observable.range(1, count + 1), new Func2<Throwable, Integer, Wrapper>() {
                @Override
                public Wrapper call(Throwable throwable, Integer integer) {
                    return new Wrapper(throwable, integer);
                }
            }).flatMap(new Func1<Wrapper, Observable<?>>() {
                @Override
                public Observable<?> call(Wrapper wrapper) {
                    if ((wrapper.throwable instanceof ConnectException
                            || wrapper.throwable instanceof SocketTimeoutException
                            || wrapper.throwable instanceof TimeoutException)
                            && wrapper.index < count + 1) { //如果超出重试次数也抛出错误,否则默认是会进入onCompleted
                        return Observable.timer(delay + (wrapper.index - 1) * increaseDelay, TimeUnit.MILLISECONDS);

                    }
                    return Observable.error(wrapper.throwable);
                }
            });
}
项目:Rx-Retrofit    文件:RetryWhenNetworkException.java   
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
    return observable
            .zipWith(Observable.range(1, count + 1), new Func2<Throwable, Integer, Wrapper>() {
                @Override
                public Wrapper call(Throwable throwable, Integer integer) {
                    return new Wrapper(throwable, integer);
                }
            }).flatMap(new Func1<Wrapper, Observable<?>>() {
                @Override
                public Observable<?> call(Wrapper wrapper) {
                    if ((wrapper.throwable instanceof ConnectException
                            || wrapper.throwable instanceof SocketTimeoutException
                            || wrapper.throwable instanceof TimeoutException)
                            && wrapper.index < count + 1) { //如果超出重试次数也抛出错误,否则默认是会进入onCompleted
                        return Observable.timer(delay + (wrapper.index - 1) * increaseDelay, TimeUnit.MILLISECONDS);

                    }
                    return Observable.error(wrapper.throwable);
                }
            });
}
项目:RxRetrofit-mvp    文件:RetryWhenNetworkException.java   
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
    return observable
            .zipWith(Observable.range(1, count + 1), new Func2<Throwable, Integer, Wrapper>() {
                @Override
                public Wrapper call(Throwable throwable, Integer integer) {
                    return new Wrapper(throwable, integer);
                }
            }).flatMap(new Func1<Wrapper, Observable<?>>() {
                @Override
                public Observable<?> call(Wrapper wrapper) {
                    if ((wrapper.throwable instanceof ConnectException
                            || wrapper.throwable instanceof SocketTimeoutException
                            || wrapper.throwable instanceof TimeoutException)
                            && wrapper.index < count + 1) { //如果超出重试次数也抛出错误,否则默认是会进入onCompleted
                        return Observable.timer(delay + (wrapper.index - 1) * increaseDelay, TimeUnit.MILLISECONDS);

                    }
                    return Observable.error(wrapper.throwable);
                }
            });
}
项目:V2EX-Android    文件:NewestTopicsFragment.java   
@Override
public void loadData() {
    Subscription subscription = Observable.zip(V2exService.getInstance().getV2exApi().getTopicHot(), V2exService.getInstance().getV2exApi().getTopicLatest(), new Func2<List<Topics>, List<Topics>, TopicsData>() {
        @Override
        public TopicsData call(List<Topics> hotTopics, List<Topics> latestTopics) {
            setImagData(hotTopics);
            setImagData(latestTopics);
            TopicsData topicsData = new TopicsData();
            topicsData.hotTopics = hotTopics;
            latestTopics.addAll(0 , hotTopics);
            topicsData.allTopics = latestTopics;
            return topicsData;
        }
    }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(observer);
    addSubscription(subscription);
}
项目:RxJavaDemo    文件:MainListWithExample_Observable_flatMapIterable.java   
private Observable example2() {
    // flatMapIterable(Func1, Func2)
    return Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
            .flatMapIterable(
                    new Func1<Integer, Iterable<Integer>>() {
                        @Override
                        public Iterable<Integer> call(Integer integer) {
                            ArrayList<Integer> s = new ArrayList<>();
                            for (int i = 0; i < integer; i++) {
                                s.add(i);
                            }
                            return s;
                        }
                    }, new Func2<Integer, Integer, Integer>() {

                        @Override
                        public Integer call(Integer o, Integer o2) {
                            return o + o2;
                        }
                    }
            );
}
项目:microservices-dashboard-server    文件:NodeMerger.java   
public static Func2<List<Node>, Node, List<Node>> merge() {
    return (mergedNodes, node) -> {

        // TODO: we should be able to enrich nodes in a general way before merging, eg. convert all service names to lowercase
        // Aggregator specific logic should not come here, eg. removing the Eureka description

        Optional<Integer> nodeIndex = mergedNodes.stream()
                .filter(n -> n.getId().equalsIgnoreCase(node.getId()))
                .map(mergedNodes::indexOf)
                .findFirst();

        if (nodeIndex.isPresent()) {
            logger.info("Node with id '{}' previously added, merging", node.getId());
            mergedNodes.get(nodeIndex.get()).mergeWith(node);
        } else {
            logger.info("Node with id '{}' was not merged before, adding it to the list", node.getId());
            mergedNodes.add(node);
        }

        return mergedNodes;
    };
}
项目:AirportCodes-Android    文件:AirportsFragment.java   
@Override
protected Observable<List<Airport>> createObservable()
{
    copyLocalFileIfNone();

    return AirportPersister.INSTANCE.getAirports()
            .lift(new FlattenOperator<Airport>())
            .toSortedList(new Func2<Airport, Airport, Integer>()
            {
                @Override
                public Integer call(Airport airport1, Airport airport2)
                {
                    return airport1.code.compareTo(airport2.code);
                }
            });
}
项目:FMTech    文件:ZipFragment.java   
@OnClick(R.id.zipLoadBt)
void loadData(){
    mSwipeRefreshLayout.setRefreshing(true);
    unsubscribe();
    mSubscription = Observable.zip(NetWorkService.getGankApi().getBeauties(200, 1).map(GankBeautyResultToItemsMapper.getInstance()),
            NetWorkService.getZbApi().search("装逼"), new Func2<List<Image>, List<Image>, List<Image>>() {
                @Override
                public List<Image> call(List<Image> images1, List<Image> images2) {
                    List<Image> images = new ArrayList<Image>();
                    for(int i = 0; i < images1.size()/2 && i < images2.size(); i++){
                        images.add(images1.get(i * 2));
                        images.add(images1.get(i * 2 + 1));
                        images.add(images2.get(i));
                    }
                    return images;
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(mObserver);
}
项目:bigsort    文件:BigSort.java   
public static <T, Resource> Observable<T> sort(Observable<T> source,
        final Comparator<T> comparator,
        final Func2<Observable<T>, Resource, Observable<Resource>> writer,
        final Func1<Resource, Observable<T>> reader, final Func0<Resource> resourceFactory,
        final Action1<Resource> resourceDisposer, int maxToSortInMemoryPerThread,
        final int maxTempResources, Scheduler scheduler) {
    Preconditions.checkArgument(maxToSortInMemoryPerThread > 0,
            "maxToSortInMemoryPerThread must be greater than 0");
    Preconditions.checkArgument(maxTempResources >= 2, "maxTempResources must be at least 2");
    return source
            // buffer into groups small enough to sort in memory
            .buffer(maxToSortInMemoryPerThread)
            // sort each buffer to a resource
            .flatMap(sortInMemoryAndWriteToAResource(comparator, writer, resourceFactory,
                    scheduler))
            // reduce by merging groups of resources to a single resource
            // once the resource count is maxTempResources
            .lift(new OperatorResourceMerger<Resource, T>(comparator, writer, reader,
                    resourceFactory, resourceDisposer, maxTempResources))
            // help out backpressure because ResourceMerger doesn't support
            // yet
            .onBackpressureBuffer()
            // emit the contents of the last file in the reduction process
            .flatMap(reader);

}
项目:FloatingSearchView    文件:GoogleSearchController.java   
private Observable<SearchResult[]> getQueryObservable(String query) {
    return mSearch.search(query)
            .flatMap(new Func1<Response, Observable<SearchResult[]>>() {
                @Override
                public Observable<SearchResult[]> call(Response response) {
                    if (response.responseData == null)
                        return Observable.error(new SearchException(response.responseDetails));
                    return Observable.just(response.responseData.results);
                }
            })
            .retry(new Func2<Integer, Throwable, Boolean>() {
                @Override
                public Boolean call(Integer integer, Throwable throwable) {
                    return throwable instanceof InterruptedIOException;
                }
            });
}
项目:rx-extended    文件:OperatorCompareTest.java   
@Test
public void testCompareOperatorInitialValue() throws Exception {
    @SuppressWarnings("unchecked") rx.Observer<Integer> observer = mock(rx.Observer.class);

    Observable<Integer> observable = Observable.just(1, 2, 3);

    OperatorCompare<Integer, Integer> operatorCompare = new OperatorCompare<Integer, Integer>(10, new Func2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer integer, Integer integer2) {
            return integer + integer2;
        }
    });

    observable.lift(operatorCompare).subscribe(observer);

    verify(observer, never()).onError(any(Throwable.class));
    verify(observer, times(1)).onNext(11);
    verify(observer, times(1)).onNext(3);
    verify(observer, times(1)).onNext(5);
    verify(observer, times(1)).onCompleted();
}
项目:rx-extended    文件:OperatorCompareTest.java   
@Test
public void testCompareOperatorNoInitialValue() throws Exception {
    @SuppressWarnings("unchecked") rx.Observer<Integer> observer = mock(rx.Observer.class);

    Observable<Integer> observable = Observable.just(1, 2, 3);

    OperatorCompare<Integer, Integer> operatorCompare = new OperatorCompare<Integer, Integer>(new Func2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer integer, Integer integer2) {
            return integer + integer2;
        }
    });

    observable.lift(operatorCompare).subscribe(observer);

    verify(observer, never()).onError(any(Throwable.class));
    verify(observer, times(1)).onNext(1);
    verify(observer, times(1)).onNext(3);
    verify(observer, times(1)).onNext(5);
    verify(observer, times(1)).onCompleted();
}
项目:openwebnet-android    文件:TemperatureServiceImpl.java   
private Func1<TemperatureModel, Observable<TemperatureModel>> requestTemperature() {

        final Func2<String, Heating.TemperatureScale, Heating> request = Heating::requestTemperature;

        final Func2<OpenSession, TemperatureModel, TemperatureModel> handler = (openSession, temperature) -> {
            Heating.handleTemperature(
                value -> temperature.setValue(String.valueOf(value)),
                () -> temperature.setValue(null))
            .call(openSession);
            return temperature;
        };

        return temperature -> commonService.findClient(temperature.getGatewayUuid())
            .send(request.call(temperature.getWhere(), preferenceService.getDefaultTemperatureScale()))
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .map(openSession -> handler.call(openSession, temperature))
            .onErrorReturn(throwable -> {
                log.warn("temperature={} | failing request={}", temperature.getUuid(), Heating.requestTemperature(temperature.getWhere()));
                // unreadable temperature
                return temperature;
            });

    }
项目:openwebnet-android    文件:SoundServiceImpl.java   
private Func1<SoundModel, Observable<SoundModel>> requestSound(
    Func3<String, SoundSystem.Type, SoundSystem.Source, SoundSystem> request,
    Func2<OpenSession, SoundModel, SoundModel> handler) {

    return sound -> commonService.findClient(sound.getGatewayUuid())
        .send(request.call(sound.getWhere(), sound.getSoundSystemType(), sound.getSoundSystemSource()))
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .map(openSession -> handler.call(openSession, sound))
        .onErrorReturn(throwable -> {
            log.warn("sound={} | failing request={}", sound.getUuid(),
                request.call(sound.getWhere(), sound.getSoundSystemType(), sound.getSoundSystemSource()).getValue());
            // unreadable status
            return sound;
        });
}
项目:scoop    文件:Lists.java   
public static <T> boolean elementsEqual(List<T> list1, List<T> list2, Func2<T, T, Boolean> comparator) {
    if (list1 == null || list2 == null) {
        return false;
    }

    if (list1.size() != list2.size()) {
        return false;
    }

    for (int i = 0; i < list1.size(); i++) {
        if (!comparator.call(list1.get(i), list2.get(i))) {
            return false;
        }
    }

    return true;
}
项目:TLint    文件:MainModule.java   
@Provides
@PerActivity
Observable<Integer> provideNotificationObservable(GameApi mGameApi,
                                                  ForumApi mForumApi) {
    return Observable.zip(mGameApi.queryPmList(""), mForumApi.getMessageList("", 1),
            new Func2<PmData, MessageData, Integer>() {
                @Override
                public Integer call(PmData pmData, MessageData messageData) {
                    int size = 0;
                    if (pmData != null) {
                        if (pmData.is_login == 0) {
                            return null;
                        }
                        for (Pm pm : pmData.result.data) {
                            if (!TextUtils.isEmpty(pm.unread) && pm.unread.equals("1")) {
                                size++;
                            }
                        }
                    }
                    if (messageData != null && messageData.status == 200) {
                        size += messageData.result.list.size();
                    }
                    return size;
                }
            }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
}
项目:dhis2-android-eventcapture    文件:SyncWrapper.java   
public Observable<List<Program>> syncMetaData() {
    Set<ProgramType> programTypes = new HashSet<>();
    programTypes.add(ProgramType.WITHOUT_REGISTRATION);

    return Observable.zip(userOrganisationUnitInteractor.pull(),
            userProgramInteractor.pull(ProgramFields.DESCENDANTS, programTypes),
            new Func2<List<OrganisationUnit>, List<Program>, List<Program>>() {
                @Override
                public List<Program> call(List<OrganisationUnit> units, List<Program> programs) {
                    if (syncDateWrapper != null) {
                        syncDateWrapper.setLastSyncedNow();
                    }
                    return programs;
                }
            });
}
项目:GitHub    文件:RxOperatorExampleActivity.java   
private void findUsersWhoLovesBoth() {
    // here we are using zip operator to combine both request
    Observable.zip(getCricketFansObservable(), getFootballFansObservable(),
            new Func2<List<User>, List<User>, List<User>>() {
                @Override
                public List<User> call(List<User> cricketFans,
                                       List<User> footballFans) {
                    List<User> userWhoLovesBoth =
                            filterUserWhoLovesBoth(cricketFans, footballFans);
                    return userWhoLovesBoth;
                }
            }
    ).subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<List<User>>() {
                @Override
                public void onCompleted() {
                    // do anything onComplete
                }

                @Override
                public void onError(Throwable e) {
                    // handle error
                }

                @Override
                public void onNext(List<User> users) {
                    // do anything with user who loves both
                    Log.d(TAG, "userList size : " + users.size());
                    for (User user : users) {
                        Log.d(TAG, "id : " + user.id);
                        Log.d(TAG, "firstname : " + user.firstname);
                        Log.d(TAG, "lastname : " + user.lastname);
                    }
                }
            });
}
项目:GitHub    文件:RxOperatorExampleActivity.java   
private void findUsersWhoLovesBoth() {
    // here we are using zip operator to combine both request
    Observable.zip(getCricketFansObservable(), getFootballFansObservable(),
            new Func2<List<User>, List<User>, List<User>>() {
                @Override
                public List<User> call(List<User> cricketFans,
                                       List<User> footballFans) {
                    List<User> userWhoLovesBoth =
                            filterUserWhoLovesBoth(cricketFans, footballFans);
                    return userWhoLovesBoth;
                }
            }
    ).subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<List<User>>() {
                @Override
                public void onCompleted() {
                    // do anything onComplete
                }

                @Override
                public void onError(Throwable e) {
                    // handle error
                }

                @Override
                public void onNext(List<User> users) {
                    // do anything with user who loves both
                    Log.d(TAG, "userList size : " + users.size());
                    for (User user : users) {
                        Log.d(TAG, "id : " + user.id);
                        Log.d(TAG, "firstname : " + user.firstname);
                        Log.d(TAG, "lastname : " + user.lastname);
                    }
                }
            });
}
项目:Go-RxJava    文件:Fragment_And_Then_Where.java   
public void runCode() {

        Observable obs1 = Observable.interval(1, TimeUnit.SECONDS);
        Observable obs2 = Observable.range(0,3);
        Plan0 plan0 = JoinObservable.from(obs1).and(obs2).then(new Func2() {
            @Override
            public Object call(Object o1, Object o2) {
                return o2;
            }
        });
        JoinObservable.when(plan0).toObservable()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onCompleted() {
                        println("onCompleted");
                    }

                    @Override
                    public void onError(Throwable e) {
                        println("onError");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        println("onNext:"+integer);
                    }

                });


    }
项目:Go-RxJava    文件:Fragment_Reduce.java   
public void runCode() {

        Observable.just(1,2,3,4).reduce(new Func2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) {
                return i1*i2;
            }
        }) .subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                println("onCompleted");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Integer integer) {
                println("onNext:"+integer);
            }
        });



    }
项目:mobile-store    文件:WifiSwap.java   
@Override
public void start() {
    Utils.debugLog(TAG, "Preparing swap webserver.");
    sendBroadcast(SwapService.EXTRA_STARTING);

    if (FDroidApp.ipAddressString == null) {
        Log.e(TAG, "Not starting swap webserver, because we don't seem to be connected to a network.");
        setConnected(false);
    }

    Single.zip(
            Single.create(getWebServerTask()),
            Single.create(getBonjourTask()),
            new Func2<Boolean, Boolean, Boolean>() {
                @Override
                public Boolean call(Boolean webServerTask, Boolean bonjourServiceTask) {
                    return bonjourServiceTask && webServerTask;
                }
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.newThread())
            .subscribe(new Action1<Boolean>() {
                    @Override
                    public void call(Boolean success) {
                        setConnected(success);
                    }
                },
                new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        setConnected(false);
                    }
                });
}