/** * Start an async task which can do things beforehand, in background and callback when the job is done on the main thread, and handle the exception with the given action. * * @param preExecute action to do beforehand. * @param doInBackground action to do in the background. * @param doOnFinish action to do when the job is done.(this is called on main thread) * @param onError action to do when exceptions are thrown. * @return the subscription of the task. */ public static Subscription asyncTask(final Action0 preExecute, @NonNull final Action0 doInBackground, final Action0 doOnFinish, Action1<Throwable> onError) { return Observable.just("Hey nerd! This is an async task.") .subscribeOn(Schedulers.io()) .doOnSubscribe(new Action0() { @Override public void call() { if (preExecute != null) preExecute.call(); } }) .observeOn(Schedulers.io()) .doOnNext(Actions.toAction1(doInBackground)) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<String>() { @Override public void call(String s) { if (doOnFinish != null) doOnFinish.call(); } }, onError == null ? RxActions.onError() : onError); }
public <R> Subscription subscribe(Action1<R> onNext, Action1<Throwable> onError, Action0 onCompleted, Observable.Transformer<T, R> transformer) { Observable observable = build(false); if (transformer != null) observable = observable.compose(transformer); if (onNext == null) onNext = Actions.empty(); if (onError == null) onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED; if (onCompleted == null) onCompleted = Actions.empty(); Action1<R> actualOnNext = onNext; if (mQueuer != null && mQueueSubscriptionSafetyCheckEnabled) actualOnNext = RXBusUtil.wrapQueueAction(onNext, mQueuer); observable = applySchedular(observable); Subscription subscription = observable.subscribe(actualOnNext, onError, onCompleted); if (mBoundObject != null) RXSubscriptionManager.addSubscription(mBoundObject, subscription); return subscription; }
@Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); GoroService.setup(this, Goro.create()); Scheduler scheduler = new RxGoro(goro).scheduler("test-queue"); Observable.just("ok") .subscribeOn(scheduler) .subscribe(new Action1<String>() { @Override public void call(String s) { result = "ok"; resultSync.countDown(); } }); Observable.error(new RuntimeException("test error")) .subscribeOn(scheduler) .subscribe(Actions.empty(), new Action1<Throwable>() { @Override public void call(Throwable throwable) { error = throwable; errorSync.countDown(); } }); }
protected void bindToInstanceLifecycle(Instance<ConnectionProvider<W, R>> i, final HostConnectionProvider<W, R> hcp, final Subscriber<? super List<HostConnectionProvider<W, R>>> o) { i.getLifecycle() .finallyDo(new Action0() { @Override public void call() { removeHost(hcp, o); } }) .subscribe(Actions.empty(), new Action1<Throwable>() { @Override public void call(Throwable throwable) { // Do nothing as finallyDo takes care of both complete and error. } }); }
@Test public void testNoTimeoutPostSubscription() throws Exception { TestScheduler testScheduler = Schedulers.test(); UnicastAutoReleaseSubject<String> subject = UnicastAutoReleaseSubject.create(1, TimeUnit.DAYS, testScheduler); subject.onNext("Start the timeout now."); // Since the timeout is scheduled only after content arrival. final AtomicReference<Throwable> errorOnSubscribe = new AtomicReference<Throwable>(); final CountDownLatch latch = new CountDownLatch(1); subject.subscribe(Actions.empty(), new Action1<Throwable>() { @Override public void call(Throwable throwable) { errorOnSubscribe.set(throwable); latch.countDown(); } }, new Action0() { @Override public void call() { latch.countDown(); } }); testScheduler.advanceTimeBy(1, TimeUnit.DAYS); subject.onCompleted(); latch.await(1, TimeUnit.MINUTES); Assert.assertNull("Subscription got an error.", errorOnSubscribe.get()); }
public final Subscription subscribe(Action1<? super T> onNext, Action1<Throwable> onError) { if (onNext == null) { throw new IllegalArgumentException("onNext can not be null"); } else if (onError != null) { return subscribe(new ActionSubscriber(onNext, onError, Actions.empty())); } else { throw new IllegalArgumentException("onError can not be null"); } }
@Experimental public void subscribe(Action1<? super T> onNext) { subscribe(onNext, new Action1<Throwable>() { public void call(Throwable t) { throw new OnErrorNotImplementedException(t); } }, Actions.empty()); }
public static void syncShows(final ActivityBase activity) { new ShowSyncService(activity).fetchShows() .onErrorResumeNext(Observable.empty()) .defaultIfEmpty(null) .compose(activity.bindUntilEventAsync(ActivityEvent.DESTROY)) .lift(busyDialog(activity)) .subscribe(genreMap -> updateShows(activity, genreMap), Actions.empty()); }
public static void syncFavs(final ActivityBase activity) { new ShowSyncService(activity).fetchFavs() .onErrorResumeNext(Observable.empty()) .defaultIfEmpty(null) .compose(activity.bindUntilEventAsync(ActivityEvent.DESTROY)) .lift(busyDialog(activity)) .subscribe(genreMap -> updateFavs(activity, genreMap), Actions.empty()); }
public static void checkForUpdates(final ActivityBase activity, final boolean interactive) { final SharedPreferences shared = PreferenceManager.getDefaultSharedPreferences(activity.getApplicationContext()); if (!interactive && !BuildConfig.DEBUG) { Instant last = new Instant(shared.getLong(KEY_LAST_UPDATE_CHECK, 0)); if (last.isAfter(now().minus(standardHours(1)))) return; } /* Action to store the last check time */ Action0 storeCheckTime = () -> shared.edit() .putLong(KEY_LAST_UPDATE_CHECK, now().getMillis()) .apply(); /* show a busy-dialog or not? */ Observable.Operator<Update, Update> busyOperator = interactive ? busyDialog(activity) : NOOP; // do the check new Updater(activity).check() .onErrorResumeNext(Observable.empty()) .defaultIfEmpty(null) .compose(activity.bindUntilEventAsync(ActivityEvent.DESTROY)) .lift(busyOperator) .doAfterTerminate(storeCheckTime) .subscribe(update -> { if (interactive || update != null) { UpdateDialog dialog = newInstance(update); dialog.show(activity.getSupportFragmentManager(), null); } }, Actions.empty()); }
public static void download(FragmentActivity activity, Update update) { Log.d("BS-Updater", "Trying to download..."); AppComponent appComponent = Dagger.appComponent(activity); Observable<DownloadService.Status> progress = appComponent.downloadService() .downloadUpdate(update.apk()) .subscribeOn(BackgroundScheduler.instance()) .unsubscribeOn(BackgroundScheduler.instance()) .observeOn(AndroidSchedulers.mainThread()) .share(); // install on finish final Context appContext = activity.getApplicationContext(); progress.filter(DownloadService.Status::finished) .flatMap(status -> { try { install(appContext, status.file); return Observable.empty(); } catch (IOException error) { return Observable.error(error); } }) .subscribe(Actions.empty(), defaultOnError()); // show a progress dialog DownloadUpdateDialog dialog = new DownloadUpdateDialog(progress); dialog.show(activity.getSupportFragmentManager(), null); // remove pending upload notification appComponent.notificationService().cancelForUpdate(); }
public void bind(@NonNull IJoystickFragment view) { Log.d(TAG, "bind "); mView = Optional.of(view); mCurrentProgram = Optional.of(Scenography.getModel().currentProgram()); EventBus.getDefault().register(this); // fetch program mCurrentProgram.ifPresent(program -> program.fetch().subscribe(Actions.empty(), e -> Log.e(TAG, "fetch error", e))); mSubscriptions.add(view.bind(mCurrentProgram.get())); EventBus.getDefault().post(new PlenConnectionService.StateNotificationRequest()); }
public void unbind() { Log.d(TAG, "onDestroy "); mSubscriptions.clear(); EventBus.getDefault().unregister(this); // push program mCurrentProgram.ifPresent(program -> program.push().subscribe(Actions.empty(), e -> Log.e(TAG, "push error", e))); mView = Optional.empty(); mCurrentProgram = Optional.empty(); }
public void bind(@NonNull IProgramFragment view) { Log.d(TAG, "bind "); mView = Optional.of(view); mCurrentProgram = Optional.of(Scenography.getModel().currentProgram()); // fetch program mCurrentProgram.ifPresent(program -> program.fetch().subscribe(Actions.empty(), e -> Log.e(TAG, "fetch error", e))); mSubscriptions.add(view.bind(mCurrentProgram.get())); }
public void unbind() { Log.d(TAG, "onDestroy "); mSubscriptions.clear(); // push program mCurrentProgram.ifPresent(program -> program.push().subscribe(Actions.empty(), e -> Log.e(TAG, "push error", e))); mView = Optional.empty(); mCurrentProgram = Optional.empty(); }
@Override public void onConnectionUnsubscribed() { clientOperationQueue .queue(operationDisconnect) .subscribe( Actions.empty(), Actions.<Throwable>toAction1(Actions.empty()) ); }
private void dismissCharacteristicNotification(UUID characteristicUuid, NotificationSetupMode setupMode, boolean isIndication) { notificationObservableMap.remove(characteristicUuid); setupCharacteristicNotification(characteristicUuid, setupMode, false, isIndication) .subscribe( Actions.empty(), Actions.<Throwable>toAction1(Actions.empty()) ); }
@Override public boolean shouldOverrideUrlLoading(WebView view, String url) { if (url.startsWith(AppConstants.OAUTH_CALLBACK_URL)) { Uri uri = Uri.parse(url); if (mProgressHolder != null) { mProgressHolder.setVisibility(View.VISIBLE); } subscriptionList.add( accountService.finishOAuthLogin(uri.getQueryParameter(OAUTH_VERIFIER)) .compose(AndroidSchedulerTransformer.get()). subscribe(Subscribers.create( Actions.empty(), throwable -> onLoginFinished(exceptionHandler.handle(throwable)), () -> onLoginFinished(null))) ); return true; } if (!isOAuthUrl(url)) { Timber.d("External URL: " + url); // launch external URLs in a full browser Intent intent = new Intent(Intent.ACTION_VIEW, Uri.parse(url)); intent.addFlags(Intent.FLAG_ACTIVITY_NEW_TASK); getActivity().startActivity(intent); return true; } return false; }
private void playFile(Optional<Uri> filePath, final Action0 action) { if (filePath.isPresent()) { playFile(filePath.get()) // .subscribeOn(Schedulers.computation()) // .observeOn(AndroidSchedulers.mainThread()) // .subscribe( // Actions.empty(), // throwable -> Timber.e(throwable, "Error playing music"), // action == null ? Actions.empty() : action // ); } else { Toast.makeText(getContext(), "No more songs to play", Toast.LENGTH_LONG).show(); } }
@Test public void shouldNotLeakThreadsWithDefaultConfiguration() throws InterruptedException { int loops = 3; ThreadMXBean mx = ManagementFactory.getThreadMXBean(); LOGGER.info("Initial Threads (will be ignored later):"); Set<String> ignore = dump(threads(mx)); int[] peaks = new int[loops]; for (int i = 0; i < loops; i++) { CoreEnvironment env = DefaultCoreEnvironment.create(); env.scheduler().createWorker().schedule(Actions.empty()); env.scheduler().createWorker().schedule(Actions.empty()); env.scheduler().createWorker().schedule(Actions.empty()); env.scheduler().createWorker().schedule(Actions.empty()); env.scheduler().createWorker().schedule(Actions.empty()); env.scheduler().createWorker().schedule(Actions.empty()); LOGGER.info("===Created threads:"); Set<String> afterCreate = dump(threads(mx, ignore, false)); LOGGER.info("Shutdown result: " + env.shutdown()); //we only consider threads starting with cb- or containing Rx, minus the ones existing at startup Set<String> afterShutdown = threads(mx, ignore, true); peaks[i] = afterShutdown.size(); LOGGER.info("===Shutdown went from " + afterCreate.size() + " to " + afterShutdown.size() + " threads, remaining: "); dump(afterShutdown); } boolean peakGrowing = false; StringBuilder peaksDump = new StringBuilder("========Thread peaks : ").append(peaks[0]); for (int i = 1; i < loops; i++) { peaksDump.append(' ').append(peaks[i]); peakGrowing = peakGrowing || (peaks[i] != peaks[i - 1]); } LOGGER.info(peaksDump.toString()); assertFalse("Number of threads is growing despite shutdown, see console output", peakGrowing); }
@Override public void clear() { checkOpen(); try { internalClear(Actions.empty()); } catch (Exception e) { throw new CacheException("Unable to clear", e); } }
public static <T> Action1<T> showAction1(Context context, @StringRes int stringRes) { return Actions.toAction1(show(context, stringRes)); }
public static <T> Action1<T> showAction1(final Context context, @StringRes final int stringRes) { return Actions.toAction1(showAction0(context, stringRes)); }
public final Completable doOnCompleted(Action0 onCompleted) { return doOnLifecycle(Actions.empty(), Actions.empty(), onCompleted, Actions.empty(), Actions.empty()); }
public final Completable doOnUnsubscribe(Action0 onUnsubscribe) { return doOnLifecycle(Actions.empty(), Actions.empty(), Actions.empty(), Actions.empty(), onUnsubscribe); }
public final Completable doOnError(Action1<? super Throwable> onError) { return doOnLifecycle(Actions.empty(), onError, Actions.empty(), Actions.empty(), Actions.empty()); }
public final Completable doOnSubscribe(Action1<? super Subscription> onSubscribe) { return doOnLifecycle(onSubscribe, Actions.empty(), Actions.empty(), Actions.empty(), Actions.empty()); }
public final Completable doAfterTerminate(Action0 onAfterComplete) { return doOnLifecycle(Actions.empty(), Actions.empty(), Actions.empty(), onAfterComplete, Actions.empty()); }
public final Observable<T> doOnCompleted(Action0 onCompleted) { return lift(new OperatorDoOnEach(new ActionSubscriber(Actions.empty(), Actions.empty(), onCompleted))); }
public final Observable<T> doOnError(Action1<Throwable> onError) { return lift(new OperatorDoOnEach(new ActionSubscriber(Actions.empty(), onError, Actions.empty()))); }
public final Observable<T> doOnNext(Action1<? super T> onNext) { return lift(new OperatorDoOnEach(new ActionSubscriber(onNext, Actions.empty(), Actions.empty()))); }
public final Observable<T> doOnTerminate(Action0 onTerminate) { return lift(new OperatorDoOnEach(new ActionSubscriber(Actions.empty(), Actions.toAction1(onTerminate), onTerminate))); }
public final Subscription subscribe() { return subscribe(new ActionSubscriber(Actions.empty(), InternalObservableUtils.ERROR_NOT_IMPLEMENTED, Actions.empty())); }
public final Subscription subscribe(Action1<? super T> onNext) { if (onNext != null) { return subscribe(new ActionSubscriber(onNext, InternalObservableUtils.ERROR_NOT_IMPLEMENTED, Actions.empty())); } throw new IllegalArgumentException("onNext can not be null"); }
@Beta public Observable<T> autoConnect(int numberOfSubscribers) { return autoConnect(numberOfSubscribers, Actions.empty()); }
@Experimental public void subscribe(Action1<? super T> onNext, Action1<? super Throwable> onError) { subscribe(onNext, onError, Actions.empty()); }
Observable<Observable<byte[]>> setupServerInitiatedCharacteristicRead( @NonNull final BluetoothGattCharacteristic characteristic, final NotificationSetupMode setupMode, final boolean isIndication ) { return Observable.defer(new Func0<Observable<Observable<byte[]>>>() { @Override public Observable<Observable<byte[]>> call() { synchronized (activeNotificationObservableMap) { final CharacteristicNotificationId id = new CharacteristicNotificationId(characteristic.getUuid(), characteristic.getInstanceId()); final ActiveCharacteristicNotification activeCharacteristicNotification = activeNotificationObservableMap.get(id); if (activeCharacteristicNotification != null) { if (activeCharacteristicNotification.isIndication == isIndication) { return activeCharacteristicNotification.notificationObservable; } else { return Observable.error( new BleConflictingNotificationAlreadySetException(characteristic.getUuid(), !isIndication) ); } } final byte[] enableNotificationTypeValue = isIndication ? configEnableIndication : configEnableNotification; final PublishSubject<?> notificationCompletedSubject = PublishSubject.create(); final Observable<Observable<byte[]>> newObservable = setCharacteristicNotification(bluetoothGatt, characteristic, true) .compose(setupModeTransformer(descriptorWriter, characteristic, enableNotificationTypeValue, setupMode)) .andThen(ObservableUtil.justOnNext( observeOnCharacteristicChangeCallbacks(gattCallback, id).takeUntil(notificationCompletedSubject) )) .doOnUnsubscribe(new Action0() { @Override public void call() { notificationCompletedSubject.onCompleted(); synchronized (activeNotificationObservableMap) { activeNotificationObservableMap.remove(id); } // teardown the notification setCharacteristicNotification(bluetoothGatt, characteristic, false) .compose(setupModeTransformer(descriptorWriter, characteristic, configDisable, setupMode)) .subscribe( Actions.empty(), Actions.<Throwable>toAction1(Actions.empty()) ); } }) .mergeWith(gattCallback.<Observable<byte[]>>observeDisconnect()) .replay(1) .refCount(); activeNotificationObservableMap.put(id, new ActiveCharacteristicNotification(newObservable, isIndication)); return newObservable; } } }); }
public <T> Subscription subscribe(Observable<T> observable, Action1<? super T> onNext, Action1<Throwable> onError) { return subscribe(observable, onNext, onError, Actions.empty()); }
public <T> Subscription subscribe(Observable<T> observable, Action1<? super T> onNext) { return subscribe(observable, onNext, Actions.empty(), Actions.empty()); }
public ActionSubscription(Action0 action) { this.actual = action != null ? action : Actions.empty(); }