@Override @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP) public <T> Observable<T> queue(final Operation<T> operation) { return Observable.create(new Action1<Emitter<T>>() { @Override public void call(Emitter<T> tEmitter) { final FIFORunnableEntry entry = new FIFORunnableEntry<>(operation, tEmitter); tEmitter.setCancellation(new Cancellable() { @Override public void cancel() throws Exception { if (queue.remove(entry)) { logOperationRemoved(operation); } } }); logOperationQueued(operation); queue.add(entry); } }, Emitter.BackpressureMode.NONE); }
@Override @RestrictTo(RestrictTo.Scope.LIBRARY_GROUP) public synchronized <T> Observable<T> queue(final Operation<T> operation) { if (!shouldRun) { return Observable.error(disconnectionException); } return Observable.create(new Action1<Emitter<T>>() { @Override public void call(Emitter<T> tEmitter) { final FIFORunnableEntry entry = new FIFORunnableEntry<>(operation, tEmitter); tEmitter.setCancellation(new Cancellable() { @Override public void cancel() throws Exception { if (queue.remove(entry)) { logOperationRemoved(operation); } } }); logOperationQueued(operation); queue.add(entry); } }, Emitter.BackpressureMode.NONE); }
@Override final protected void protectedRun(final Emitter<SCAN_RESULT_TYPE> emitter, QueueReleaseInterface queueReleaseInterface) { final SCAN_CALLBACK_TYPE scanCallback = createScanCallback(emitter); try { emitter.setCancellation(new Cancellable() { @Override public void cancel() throws Exception { RxBleLog.i("Scan operation is requested to stop."); stopScan(rxBleAdapterWrapper, scanCallback); } }); RxBleLog.i("Scan operation is requested to start."); boolean startLeScanStatus = startScan(rxBleAdapterWrapper, scanCallback); if (!startLeScanStatus) { emitter.onError(new BleScanException(BleScanException.BLUETOOTH_CANNOT_START)); } } catch (Throwable throwable) { RxBleLog.e(throwable, "Error while calling the start scan function"); emitter.onError(new BleScanException(BleScanException.BLUETOOTH_CANNOT_START)); } finally { queueReleaseInterface.release(); } }
public static Observable<Visit> visitStream(final Ahoy ahoy) { return Observable.create(new Action1<Emitter<Visit>>() { @Override public void call(final Emitter<Visit> emitter) { final VisitListener listener = new VisitListener() { @Override public void onVisitUpdated(Visit visit) { emitter.onNext(visit); } }; ahoy.addVisitListener(listener); emitter.setCancellation(new Cancellable() { @Override public void cancel() throws Exception { ahoy.removeVisitListener(listener); } }); } }, BackpressureMode.LATEST); }
@Override public void call(final Emitter<AppState> appStateEmitter) { final AppStateListener appStateListener = new AppStateListener() { @Override public void onAppDidEnterForeground() { appStateEmitter.onNext(FOREGROUND); } @Override public void onAppDidEnterBackground() { appStateEmitter.onNext(BACKGROUND); } }; appStateEmitter.setCancellation(new Cancellable() { @Override public void cancel() throws Exception { recognizer.removeListener(appStateListener); recognizer.stop(); } }); recognizer.addListener(appStateListener); recognizer.start(); }
public static <T> Observable<T> toObservable(final ObservableField<T> observableField, final boolean emitCurrent) { return Observable.create(new Action1<Emitter<T>>() { @Override public void call(final Emitter<T> emitter) { if (emitCurrent && observableField.get() != null) { emitter.onNext(observableField.get()); } final OnPropertyChangedCallback callback = new OnPropertyChangedCallback() { @Override public void onPropertyChanged(android.databinding.Observable dataBindingObservable, int propertyId) { if (dataBindingObservable == observableField) { emitter.onNext(observableField.get()); } } }; observableField.addOnPropertyChangedCallback(callback); emitter.setCancellation(new Cancellable() { @Override public void cancel() throws Exception { observableField.removeOnPropertyChangedCallback(callback); } }); } }, Emitter.BackpressureMode.BUFFER); }
/** * Converts an {@link ApolloQueryWatcher} into an Observable. * * @param watcher the ApolloQueryWatcher to convert * @param backpressureMode the back pressure strategy to apply to the observable source. * @param <T> the value type * @return the converted Observable */ @Nonnull public static <T> Observable<Response<T>> from(@Nonnull final ApolloQueryWatcher<T> watcher, @Nonnull Emitter.BackpressureMode backpressureMode) { checkNotNull(backpressureMode, "backpressureMode == null"); checkNotNull(watcher, "watcher == null"); return Observable.create(new Action1<Emitter<Response<T>>>() { @Override public void call(final Emitter<Response<T>> emitter) { final AtomicBoolean canceled = new AtomicBoolean(); emitter.setCancellation(new Cancellable() { @Override public void cancel() throws Exception { canceled.set(true); watcher.cancel(); } }); watcher.enqueueAndWatch(new ApolloCall.Callback<T>() { @Override public void onResponse(@Nonnull Response<T> response) { if (!canceled.get()) { emitter.onNext(response); } } @Override public void onFailure(@Nonnull ApolloException e) { Exceptions.throwIfFatal(e); if (!canceled.get()) { emitter.onError(e); } } }); } }, backpressureMode); }
/** * Converts an {@link ApolloCall} to a Observable. The number of emissions this Observable will have is based on the * {@link ResponseFetcher} used with the call. * * @param call the ApolloCall to convert * @param <T> the value type * @param backpressureMode The {@link rx.Emitter.BackpressureMode} to use. * @return the converted Observable */ @Nonnull public static <T> Observable<Response<T>> from(@Nonnull final ApolloCall<T> call, Emitter.BackpressureMode backpressureMode) { checkNotNull(call, "call == null"); return Observable.create(new Action1<Emitter<Response<T>>>() { @Override public void call(final Emitter<Response<T>> emitter) { final AtomicBoolean canceled = new AtomicBoolean(); emitter.setCancellation(new Cancellable() { @Override public void cancel() throws Exception { canceled.set(true); call.cancel(); } }); call.enqueue(new ApolloCall.Callback<T>() { @Override public void onResponse(@Nonnull Response<T> response) { if (!canceled.get()) { emitter.onNext(response); } } @Override public void onFailure(@Nonnull ApolloException e) { Exceptions.throwIfFatal(e); if (!canceled.get()) { emitter.onError(e); } } @Override public void onStatusEvent(@Nonnull ApolloCall.StatusEvent event) { if (!canceled.get()) { if (event == ApolloCall.StatusEvent.COMPLETED) { emitter.onCompleted(); } } } }); } }, backpressureMode); }
@Nonnull public static <T> Observable<Response<T>> from(@Nonnull final ApolloSubscriptionCall<T> call, Emitter.BackpressureMode backpressureMode) { checkNotNull(call, "call == null"); return Observable.create(new Action1<Emitter<Response<T>>>() { @Override public void call(final Emitter<Response<T>> emitter) { final AtomicBoolean canceled = new AtomicBoolean(); emitter.setCancellation(new Cancellable() { @Override public void cancel() throws Exception { canceled.set(true); call.cancel(); } }); call.execute(new ApolloSubscriptionCall.Callback<T>() { @Override public void onResponse(@Nonnull Response<T> response) { if (!canceled.get()) { emitter.onNext(response); } } @Override public void onFailure(@Nonnull ApolloException e) { Exceptions.throwIfFatal(e); if (!canceled.get()) { emitter.onError(e); } } @Override public void onCompleted() { if (!canceled.get()) { emitter.onCompleted(); } } }); } }, backpressureMode); }
private Cancellable createCancellable() { return new Cancellable() { @Override public void cancel() throws Exception { merlin.unbind(); } }; }
/** * Emits BluetoothGatt and completes after connection is established. * * @return BluetoothGatt after connection reaches {@link com.polidea.rxandroidble.RxBleConnection.RxBleConnectionState#CONNECTED} * state. * @throws com.polidea.rxandroidble.exceptions.BleDisconnectedException if connection was disconnected/failed before it was established. */ @NonNull private Observable<BluetoothGatt> getConnectedBluetoothGatt() { // start connecting the BluetoothGatt // note: Due to different Android BLE stack implementations it is not certain whether `connectGatt()` or `BluetoothGattCallback` // will emit BluetoothGatt first return Observable.create( new Action1<Emitter<BluetoothGatt>>() { @Override public void call(Emitter<BluetoothGatt> emitter) { final Subscription connectedBluetoothGattSubscription = Observable.fromCallable(new Func0<BluetoothGatt>() { @Override public BluetoothGatt call() { connectionStateChangedAction.onConnectionStateChange(CONNECTED); return bluetoothGattProvider.getBluetoothGatt(); } }) // when the connected state will be emitted bluetoothGattProvider should contain valid Gatt .delaySubscription( rxBleGattCallback .getOnConnectionStateChange() .takeFirst(new Func1<RxBleConnection.RxBleConnectionState, Boolean>() { @Override public Boolean call(RxBleConnection.RxBleConnectionState rxBleConnectionState) { return rxBleConnectionState == CONNECTED; } }) ) // disconnect may happen even if the connection was not established yet .mergeWith(rxBleGattCallback.<BluetoothGatt>observeDisconnect()) .take(1) .subscribe(emitter); emitter.setCancellation(new Cancellable() { @Override public void cancel() throws Exception { connectedBluetoothGattSubscription.unsubscribe(); } }); connectionStateChangedAction.onConnectionStateChange(CONNECTING); /* * Apparently the connection may be established fast enough to introduce a race condition so the subscription * must be established first before starting the connection. * https://github.com/Polidea/RxAndroidBle/issues/178 * */ final BluetoothGatt bluetoothGatt = connectionCompat .connectGatt(bluetoothDevice, autoConnect, rxBleGattCallback.getBluetoothGattCallback()); /* * Update BluetoothGatt when connection is initiated. It is not certain * if this or RxBleGattCallback.onConnectionStateChange will be first. * */ bluetoothGattProvider.updateBluetoothGatt(bluetoothGatt); } }, Emitter.BackpressureMode.NONE ); }
@Inject protected ClientStateObservable( final RxBleAdapterWrapper rxBleAdapterWrapper, final Observable<RxBleAdapterStateObservable.BleAdapterState> bleAdapterStateObservable, @Named(ClientComponent.NamedBooleanObservables.LOCATION_SERVICES_OK) final Observable<Boolean> locationServicesOkObservable, final LocationServicesStatus locationServicesStatus, @Named(ClientComponent.NamedSchedulers.TIMEOUT) final Scheduler timerScheduler ) { super(new OnSubscribeCreate<>( new Action1<Emitter<RxBleClient.State>>() { @Override public void call(Emitter<RxBleClient.State> emitter) { if (!rxBleAdapterWrapper.hasBluetoothAdapter()) { emitter.onCompleted(); return; } final Subscription changingStateSubscription = checkPermissionUntilGranted(locationServicesStatus, timerScheduler) .flatMapObservable(new Func1<Boolean, Observable<RxBleClient.State>>() { @Override public Observable<RxBleClient.State> call(Boolean permissionWasInitiallyGranted) { return checkAdapterAndServicesState( permissionWasInitiallyGranted, rxBleAdapterWrapper, bleAdapterStateObservable, locationServicesOkObservable ); } }) .distinctUntilChanged() .subscribe(emitter); emitter.setCancellation(new Cancellable() { @Override public void cancel() throws Exception { changingStateSubscription.unsubscribe(); } }); } }, Emitter.BackpressureMode.LATEST )); }
@Test public void setsCancellation() { verify(mockEmitter).setCancellation(any(Cancellable.class)); }
@Override public Observable<T> get(String id) { LoggerHelper.logDebug("local:" + this.getClass().toString() + " get:" + id); return database .get() .createQuery(getTableName(), "SELECT * FROM " + getTableName() + " WHERE id = ?", id) .take(1) .switchMap( new Func1<SqlBrite.Query, Observable<? extends T>>() { @Override public Observable<? extends T> call(final SqlBrite.Query map) { return Observable.create( new Action1<Emitter<T>>() { @Override public void call(Emitter<T> emitter) { final Cursor cursor = map.run(); emitter.setCancellation( new Cancellable() { @Override public void cancel() throws Exception { cursor.close(); } }); if (!cursor.isClosed() && cursor.moveToFirst()) { try { emitter.onNext( JsonMapper.INSTANCE.fromJson( cursor.getString(DATA_COLUMN), LocalStorageRepoImpl.this.getType())); } catch (IOException e) { emitter.onError(e); return; } } emitter.onCompleted(); } }, Emitter.BackpressureMode.LATEST); } }); }
@Override public Observable<T> getAll() { LoggerHelper.logDebug("local:" + this.getClass().toString() + " getAll"); return database .get() .createQuery(getTableName(), "SELECT * FROM " + getTableName()) .take(1) .switchMap( new Func1<SqlBrite.Query, Observable<? extends T>>() { @Override public Observable<? extends T> call(final SqlBrite.Query map) { return Observable.create( new Action1<Emitter<T>>() { @Override public void call(Emitter<T> emitter) { final Cursor cursor = map.run(); emitter.setCancellation( new Cancellable() { @Override public void cancel() throws Exception { cursor.close(); } }); while (!cursor.isClosed() && cursor.moveToNext()) { try { emitter.onNext( JsonMapper.INSTANCE.fromJson( cursor.getString(DATA_COLUMN), LocalStorageRepoImpl.this.getType())); } catch (IOException e) { emitter.onError(e); return; } } emitter.onCompleted(); } }, Emitter.BackpressureMode.BUFFER); } }); }