public static ObservableTransformer<WxPayResult, WxPayResult> checkWechatResult() { return new ObservableTransformer<WxPayResult, WxPayResult>() { @Override public ObservableSource<WxPayResult> apply(Observable<WxPayResult> payResultObservable) { return payResultObservable.map(new Function<WxPayResult, WxPayResult>() { @Override public WxPayResult apply(WxPayResult wxPayResult) { if (!wxPayResult.isSucceed()) { throw new PayFailedException(wxPayResult.getErrInfo()); } return wxPayResult; } }); } }; }
public static <T> ObservableTransformer<T, T> compose() { return new ObservableTransformer<T, T>() { @Override public ObservableSource<T> apply(Observable<T> observable) { return observable .subscribeOn(Schedulers.io()) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { if (!Utils.isNetworkAvailable()) { //disposable.dispose(); //throw new NetworkNoAvailableException("no network,please check and retry"); //Utils.MakeShortToast("no network,please check and retry"); //throw new Exception(); } } }) .observeOn(AndroidSchedulers.mainThread()); } }; }
@Override protected ObservableTransformer<Action, Result> actionsToResults() { return upstream -> { final Observable<EmailLoginActions.LoginAction> loginActionObservable = upstream .ofType(EmailLoginActions.LoginAction.class); return loginActionObservable .flatMap(loginAction -> { if (!Patterns.EMAIL_ADDRESS.matcher(loginAction.getEmail()).matches()) { return Observable.just(Result.<Boolean, EmailLoginActions.LoginAction>error(loginAction, new FormValidationException("Must enter a valid email address!"))); } if (!Pattern.compile("[0-9a-zA-Z]{6,}").matcher(loginAction.getPassword()).matches()) { return Observable.just(Result.<Boolean, EmailLoginActions.LoginAction>error(loginAction, new FormValidationException("Password must be at least 6 characters long!"))); } return useCase.performAction(loginAction) .onErrorReturn(throwable -> Result.error(loginAction, throwable)) .startWith(Result.<Boolean, EmailLoginActions.LoginAction>loading(loginAction)); }) .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(Schedulers.io()); }; }
/** * Creates transform operator, which logs defined events in observable's lifecycle * @param msg message * @param bitMask bitmask of events which you want to log * @param <T> type * @return transformer */ public static <T> ObservableTransformer<T, T> logObservable(final String msg, final int bitMask) { return upstream -> { if ((bitMask & LOG_SUBSCRIBE) > 0) { upstream = upstream.compose(oLogSubscribe(msg)); } if ((bitMask & LOG_TERMINATE) > 0) { upstream = upstream.compose(oLogTerminate(msg)); } if ((bitMask & LOG_ERROR) > 0) { upstream = upstream.compose(oLogError(msg)); } if ((bitMask & LOG_COMPLETE) > 0) { upstream = upstream.compose(oLogComplete(msg)); } if ((bitMask & LOG_NEXT_DATA) > 0) { upstream = upstream.compose(oLogNext(msg)); } else if ((bitMask & LOG_NEXT_EVENT) > 0) { upstream = upstream.compose(oLogNextEvent(msg)); } if ((bitMask & LOG_DISPOSE) > 0) { upstream = upstream.compose(oLogDispose(msg)); } return upstream; }; }
public static <T> ObservableTransformer<T, T> applySchedulers(final IView view) { return new ObservableTransformer<T, T>() { @Override public Observable<T> apply(Observable<T> observable) { return observable.subscribeOn(Schedulers.io()) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(@NonNull Disposable disposable) throws Exception { view.showLoading();//显示进度条 } }) .subscribeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread()) .doAfterTerminate(new Action() { @Override public void run() { view.hideLoading();//隐藏进度条 } }).compose(RxUtils.bindToLifecycle(view)); } }; }
public static <T> ObservableTransformer<BaseResponse<T>, T> handleResult() { try { return baseResponseObservable -> baseResponseObservable .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .flatMap(baseResponse -> { if (baseResponse.getErrorCode() == 0) { if (baseResponse.getData() != null) { return Observable.just(baseResponse.getData()); } else { //这种情况是没有data的情况下需要走onComplete来进行处理 return Observable.empty(); } } else { return Observable.error(new DlException(baseResponse.getErrorCode(), baseResponse.getMsg())); } }); } catch (Exception e) { e.printStackTrace(); return baseResponseObservable -> baseResponseObservable .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .flatMap(baseResponse -> Observable.error(new Throwable("服务器错误"))); } }
@Before public void setUp() throws IOException { helper = new TestDb(InstrumentationRegistry.getContext(), dbFolder.newFile().getPath()); real = helper.getWritableDatabase(); SqlBrite.Logger logger = new SqlBrite.Logger() { @Override public void log(String message) { logs.add(message); } }; ObservableTransformer<Query, Query> queryTransformer = new ObservableTransformer<Query, Query>() { @Override public ObservableSource<Query> apply(Observable<Query> upstream) { return upstream.takeUntil(killSwitch); } }; PublishSubject<Set<String>> triggers = PublishSubject.create(); db = new BriteDatabase(helper, logger, triggers, triggers, scheduler, queryTransformer); }
public static <T> ObservableTransformer<T, T> applySchedulers(final IView view) { return new ObservableTransformer<T, T>() { @Override public Observable<T> apply(Observable<T> observable) { return observable.subscribeOn(Schedulers.io()) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(@NonNull Disposable disposable) throws Exception { view.showLoading();//显示进度条 } }) .subscribeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread()) .doFinally(new Action() { @Override public void run() { view.hideLoading();//隐藏进度条 } }).compose(RxLifecycleUtils.bindToLifecycle(view)); } }; }
public static <R extends Result> ObservableTransformer<R, R> forObservable() { return upstream -> upstream.onErrorResumeNext(throwable -> { if(throwable instanceof StatusException) { StatusException statusException = (StatusException) throwable; if(statusException.getStatus().hasResolution()) { return Observable.just((R) statusException.getResult()); } else { return Observable.error(throwable); } } else { return Observable.error(throwable); } }); }
@Override protected ObservableTransformer<Action, Result> actionsToResults() { return upstream -> Observable.merge( upstream.ofType(TimezonesUiActions.RefreshAction.class) .flatMap(refreshAction -> listUseCase.performAction(refreshAction) .onErrorReturn(t -> Result.error(refreshAction, t)) .startWith(Result.loading(refreshAction))), upstream.ofType(TimezonesUiActions.LoadMoreAction.class) .flatMap(loadMoreAction -> listUseCase.performAction(loadMoreAction) .onErrorReturn(t -> Result.error(loadMoreAction, t)) .startWith(Result.loading(loadMoreAction))), upstream.ofType(DeleteTimezoneAction.class) .flatMap(action -> deleteUseCase.performAction(action) .onErrorReturn(t -> Result.error(action, t)) .startWith(Result.loading(action))) ); }
public static <T> ObservableTransformer<T, T> io_main() { return new ObservableTransformer<T, T>() { @Override public ObservableSource<T> apply(@NonNull Observable<T> upstream) { return upstream .subscribeOn(Schedulers.io()) .unsubscribeOn(Schedulers.io()) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(@NonNull Disposable disposable) throws Exception { HttpLog.i("+++doOnSubscribe+++" + disposable.isDisposed()); } }) .doFinally(new Action() { @Override public void run() throws Exception { HttpLog.i("+++doFinally+++"); } }) .observeOn(AndroidSchedulers.mainThread()); } }; }
public static ObservableTransformer<PayResult, PayResult> checkAliPayResult() { return new ObservableTransformer<PayResult, PayResult>() { @Override public ObservableSource<PayResult> apply(Observable<PayResult> upstream) { return upstream.map(new Function<PayResult, PayResult>() { @Override public PayResult apply(PayResult payResult) throws Exception { if (!payResult.isSucceed()) { throw new PayFailedException(payResult.getErrInfo()); } return payResult; } }); } }; }
public static <T> ObservableTransformer<T, T> applySchedulersWithLifeCycle(IView view) { return new ObservableTransformer<T, T>() { @Override public Observable<T> apply(Observable<T> observable) { return observable.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doAfterTerminate(new Action() { @Override public void run() { view.hideLoading();//隐藏进度条 } }) .compose(RxLifecycleUtils.bindToLifecycle(view)); } }; }
public static <T> ObservableTransformer<T, T> compose() { return new ObservableTransformer<T, T>() { @Override public ObservableSource<T> apply(Observable<T> observable) { return observable .subscribeOn(Schedulers.io()) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { // if (!NetworkUtils.isConnected()) { // Toast.makeText(Bing.getApplicationContext(), R.string.toast_network_error, Toast.LENGTH_SHORT).show(); // } } }) .observeOn(AndroidSchedulers.mainThread()); } }; }
@Override protected Observable<Result> handleEvents(Observable<UiEvent> events) { ObservableTransformer<LoadConcertEvent, Result> loadConcert = loadEvents -> loadEvents.flatMap(event -> { return getRepository().getConcert(concertId) .map(LoadConcertResult::success) .onErrorReturn(LoadConcertResult::error) .toObservable() .startWith(LoadConcertResult.IN_PROGRESS); }); return events.publish(sharedEvents -> Observable.merge( handleEventsOfClass(sharedEvents, LoadConcertEvent.class, loadConcert), sharedEvents.ofType(Result.class) )); }
@Override protected ObservableTransformer<UiEvent, Action> eventsToActions() { return upstream -> upstream .map(event -> getLastState().getData() == null || getLastState().getData().getPhase() == CheckPhases.STATE_START ? CheckUserActions.exists(userManager.getUserId()) : CheckUserActions.createRecord(userManager.getUserId(), userManager.getUserName(), userManager.getUserEmail(), userManager.getAvatar())); }
public <T> ObservableTransformer<T, T> applyObservableAsync() { return new ObservableTransformer<T, T>() { @Override public ObservableSource<T> apply(Observable<T> observable) { return observable.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); } }; }
public <T> ObservableTransformer<T, T> applyObservableCompute() { return new ObservableTransformer<T, T>() { @Override public ObservableSource<T> apply(Observable<T> observable) { return observable.subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()); } }; }
BriteDatabase(Context context,SQLiteOpenHelper helper, @NonNull char[] password, Logger logger, Observable<Set<String>> triggerSource, Observer<Set<String>> triggerSink, Scheduler scheduler, ObservableTransformer<Query, Query> queryTransformer) { SQLiteDatabase.loadLibs(context); this.helper = helper; this.password = password; this.logger = logger; this.triggerSource = triggerSource; this.triggerSink = triggerSink; this.scheduler = scheduler; this.queryTransformer = queryTransformer; }
public static <U> ObservableTransformer<U, U> retry(final String hint, final int retryCount) { return new ObservableTransformer<U, U>() { @Override public ObservableSource<U> apply(Observable<U> upstream) { return upstream.retry(new BiPredicate<Integer, Throwable>() { @Override public boolean test(Integer integer, Throwable throwable) throws Exception { return retry(hint, retryCount, integer, throwable); } }); } }; }
@Override protected ObservableTransformer<Action, Result> actionsToResults() { return upstream -> Observable.merge( upstream.ofType(TimezoneEditUiActions.GetTimezoneAction.class) .flatMap(action -> getUseCase.performAction(action) .onErrorReturn(t -> Result.error(action, t)) .startWith(Result.loading(action)) ), upstream.ofType(TimezoneEditUiActions.SaveTimezoneAction.class) .flatMap(action -> { if (action.getName() == null || action.getName().isEmpty()) { return Observable.just(Result.error(action, new FormValidationException("Name cannot be empty!"))); } else if (action.getCity() == null || action.getCity().isEmpty()) { return Observable.just(Result.error(action, new FormValidationException("City cannot be empty!"))); } else if (action.getDifference() < -12 || action.getDifference() > 12) { return Observable.just(Result.error(action, new FormValidationException("Difference must be an integer between -12 and +12!"))); } final Observable<Result> result; if (action.getId() == null) { result = saveUseCase.performAction(action); } else { result = updateUseCase.performAction(action); } return result .onErrorReturn(t -> Result.error(action, t)) .startWith(Result.loading(action)); }), upstream.ofType(DeleteTimezoneAction.class) .flatMap(action -> deleteUseCase.performAction(action) .onErrorReturn(t -> Result.error(action, t)) .startWith(Result.loading(action)) ) ); }
/** * Map emitted items from the source observable into {@link Permission} objects for each * permission in parameters. * <p> * If one or several permissions have never been requested, invoke the related framework method * to ask the user if he allows the permissions. */ @NonNull @CheckReturnValue private <T> ObservableTransformer<T, Permission> ensureEach(@NonNull final String... permissions) { checkPermissions(permissions); return new ObservableTransformer<T, Permission>() { @Override @NonNull @CheckReturnValue public ObservableSource<Permission> apply(final Observable<T> o) { return request(o, permissions); } }; }
public ObservableUseCase(final UseCaseExecutor useCaseExecutor, final PostExecutionThread postExecutionThread) { super(useCaseExecutor, postExecutionThread); schedulersTransformer = new ObservableTransformer<R, R>() { @Override public Observable<R> apply(Observable<R> rObservable) { return rObservable.subscribeOn(useCaseExecutor.getScheduler()) .observeOn(postExecutionThread.getScheduler()); } }; }
public static <T> ObservableTransformer<T, T> getObservableScheduler(final Function<? super Observable<Throwable>, ? extends ObservableSource<?>> retryWhenHandler) { return new ObservableTransformer<T, T>() { @Override public ObservableSource<T> apply(@NonNull Observable<T> upstream) { return upstream .retryWhen(retryWhenHandler) .onErrorResumeNext(new ServerResultErrorFunc<T>()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); } }; }
@SuppressWarnings("unchecked") @Override public <T> ObservableTransformer<T, T> transformer() { return (ObservableTransformer<T, T>) new ObservableTransformer() { @Override public ObservableSource apply(Observable upstream) { return upstream.subscribeOn(Schedulers.trampoline()) .observeOn(Schedulers.trampoline()); } }; }
public <T> ObservableTransformer<T, CacheResult<T>> transformer(CacheMode cacheMode, final Type type) { final ICacheStrategy strategy = loadStrategy(cacheMode);//获取缓存策略 return new ObservableTransformer<T, CacheResult<T>>() { @Override public ObservableSource<CacheResult<T>> apply(Observable<T> apiResultObservable) { return strategy.execute(ApiCache.this, ApiCache.this.cacheKey, apiResultObservable, type); } }; }
@Override protected ObservableTransformer<UiEvent, Action> eventsToActions() { return upstream -> upstream .ofType(EmailLoginUiEvents.LoginClicked.class) .map((Function<EmailLoginUiEvents.LoginClicked, Action>) loginClicked -> EmailLoginActions.login(loginClicked.getEmail(), loginClicked.getPassword())) .observeOn(AndroidSchedulers.mainThread()); }
public static <T> ObservableTransformer<T, T> all_io() { return new ObservableTransformer<T, T>() { @Override public ObservableSource<T> apply(Observable<T> upstream) { return upstream.observeOn(Schedulers.io()).subscribeOn(Schedulers.io()); } }; }
@Override protected Observable<Result> handleEvents(Observable<UiEvent> events) { ObservableTransformer<LoadArtistsEvent, Result> loadArtists = loadEvents -> loadEvents.flatMap(event -> getRepository().getArtists() .map(LoadArtistsResult::success) .onErrorReturn(LoadArtistsResult::error) .startWith(LoadArtistsResult.IN_PROGRESS) ); ObservableTransformer<DeleteArtistsEvent, Result> deleteArtists = deleteEvents -> deleteEvents.flatMap(event -> { List<Artist> artists = event.getUiModel().getSelectedArtists(); return getRepository().deleteArtists(artists) .toSingleDefault(DeleteArtistsResult.SUCCESS) .onErrorReturn(DeleteArtistsResult::error) .toObservable() .startWith(DeleteArtistsResult.IN_PROGRESS); }); return events.publish(sharedEvents -> Observable.merge( handleEventsOfClass(sharedEvents, LoadArtistsEvent.class, loadArtists), handleEventsOfClass(sharedEvents, DeleteArtistsEvent.class, deleteArtists), sharedEvents.ofType(Result.class)) ); }
@Override protected ObservableTransformer<Action, Result> actionsToResults() { return upstream -> upstream.ofType(ForgotPwActions.ForgotPwSubmit.class) .flatMap(action -> { if (!Patterns.EMAIL_ADDRESS.matcher(action.getEmail()).matches()) { return Observable.just(Result.<Boolean, ForgotPwActions.ForgotPwSubmit>error(action, new FormValidationException("Must enter a valid email address!"))); } return useCase.performAction(action) .onErrorReturn(throwable -> Result.error(action, throwable)) .startWith(Result.<Boolean, ForgotPwActions.ForgotPwSubmit>loading(action)); }) .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(Schedulers.io()); }
/** * 线程切换 * * @param <T> * @return */ public static <T> ObservableTransformer<T, T> switchSchedulers() { return new ObservableTransformer<T, T>() { @Override public ObservableSource<T> apply(@NonNull Observable<T> upstream) { return switchSchedulers(upstream); } }; }
public static <T> ObservableTransformer<T, T> showDialog(final Activity activity, int messageRes) { return observable -> Observable.fromCallable(() -> { showProgressDialog(activity, messageRes); return true; }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .flatMap(t -> observable) .doOnTerminate(() -> dismissProgressDialog()); }
public static <T> ObservableTransformer<T, T> emptyTransformer(){ return new ObservableTransformer<T, T>() { @Override public ObservableSource<T> apply(io.reactivex.Observable<T> upstream) { String name = upstream.getClass().getName(); if (name.equals(CLASS_NAME1)||name.equals(CLASS_NAME2)){ observable(upstream); } return upstream; } }; }
/** * If the publish is empty, it wll run onError and throw a NoSuchElementException * @param <T> * @return */ public static <T> ObservableTransformer<T, T> notEmptyOrError() { return new ObservableTransformer<T, T>() { @Override public ObservableSource<T> apply(Observable<T> upstream) { return upstream.switchIfEmpty(new Observable<T>() { @Override protected void subscribeActual(Observer<? super T> observer) { observer.onError(new NoSuchElementException()); } }); } }; }
public static <T> ObservableTransformer<HttpResponseResult<T>, T> transformer() { return new ObservableTransformer<HttpResponseResult<T>, T>() { @Override public ObservableSource<T> apply(Observable<HttpResponseResult<T>> upstream) { return upstream .flatMap(ResultTransformer.<T>flatMap()) .compose(SchedulerTransformer.<T>transformer()); } }; }
public static <T> ObservableTransformer<T, T> transformer() { return new ObservableTransformer<T, T>() { @Override public ObservableSource<T> apply(Observable<T> upstream) { return upstream .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); } }; }
public <T> ObservableTransformer<T, T> transformer() { return new ObservableTransformer<T, T>() { private ProgressDialog progressDialog; @Override public ObservableSource<T> apply(final Observable<T> upstream) { return upstream.doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(@NonNull final Disposable disposable) throws Exception { progressDialog = ProgressDialog.show(activity, null, msg, true, cancelable); if (cancelable) { progressDialog.setOnCancelListener(new DialogInterface.OnCancelListener() { @Override public void onCancel(DialogInterface dialog) { disposable.dispose(); } }); } } }).doOnTerminate(new Action() { @Override public void run() throws Exception { if (progressDialog.isShowing()) { progressDialog.cancel(); } } }); } }; }