@Test public void testCompletedRestartableIsUnsubscribed() throws Exception { RxPresenter presenter = new RxPresenter(); presenter.create(null); Func0<Subscription> restartable = mock(Func0.class); Subscription subscription = mock(Subscription.class); when(restartable.call()).thenReturn(subscription); when(subscription.isUnsubscribed()).thenReturn(true); presenter.restartable(1, restartable); assertTrue(presenter.isUnsubscribed(1)); presenter.start(1); assertTrue(presenter.isUnsubscribed(1)); }
public static <T, U, R> Observable<R> multicastSelector(final Func0<? extends ConnectableObservable<U>> connectableFactory, final Func1<? super Observable<U>, ? extends Observable<R>> selector) { return Observable.create(new Observable$OnSubscribe<R>() { public void call(final Subscriber<? super R> child) { try { ConnectableObservable<U> co = (ConnectableObservable) connectableFactory.call(); ((Observable) selector.call(co)).subscribe(child); co.connect(new Action1<Subscription>() { public void call(Subscription t) { child.add(t); } }); } catch (Throwable e) { Exceptions.throwOrReport(e, child); } } }); }
/** As of RxJava 1.1.7, Observable.fromCallable is still @Beta, so just in case... */ @Internal static <T> Observable<T> fromCallable(final Callable<T> callable) { return Observable.defer(new Func0<Observable<T>>() { @Override public Observable<T> call() { T result; try { result = callable.call(); } catch (Exception e) { return Observable.error(e); } return Observable.just(result); } }); }
@Test public void testCompletedRestartable() throws Exception { Func0<Subscription> restartable = mock(Func0.class); Subscription subscription = mock(Subscription.class); RxPresenter presenter = new RxPresenter(); presenter.create(null); when(restartable.call()).thenReturn(subscription); when(subscription.isUnsubscribed()).thenReturn(true); presenter.restartable(1, restartable); verifyNoMoreInteractions(restartable); presenter.start(1); }
private <T extends Resource> Observable<ResourceResponse<T>> createResourceResponseObservable( final ImplFunc<ResourceResponse<T>> impl) { logger.trace("Creating Observable<ResourceResponse<T>>"); return Observable.defer(new Func0<Observable<ResourceResponse<T>>>() { @Override public Observable<ResourceResponse<T>> call() { try { ResourceResponse<T> rr = impl.invoke(); return Observable.just(rr); } catch (Exception e) { return Observable.error(flatten(e)); } } }).subscribeOn(scheduler); }
@Override public Observable<T> adapt(final Call<T> call) { return Observable.<T>create(SyncOnSubscribe.createSingleState(new Func0<Call<T>>() { @Override public Call<T> call() { return call.clone(); } }, new Action2<Call<T>, Observer<? super T>>() { @Override public void call(Call<T> callClone, Observer<? super T> observer) { try { observer.onNext(callClone.execute()); observer.onCompleted(); } catch (Throwable throwable) { observer.onError(throwable); } } }, new Action1<Call<T>>() { @Override public void call(Call<T> tCall) { tCall.cancel(); } })); }
/** * Invokes tasks in the group. * * @param context group level shared context that need be passed to invokeAsync(cxt) * method of each task item in the group when it is selected for invocation. * * @return an observable that emits the result of tasks in the order they finishes. */ public Observable<Indexable> invokeAsync(final InvocationContext context) { return Observable.defer(new Func0<Observable<Indexable>>() { @Override public Observable<Indexable> call() { if (proxyTaskGroupWrapper.isActive()) { return proxyTaskGroupWrapper.taskGroup() .invokeInternAsync(context, true, null); } else { Set<String> processedKeys = runBeforeGroupInvoke(null); if (proxyTaskGroupWrapper.isActive()) { // If proxy got activated after 'runBeforeGroupInvoke()' stage due to the addition of direct // 'postRunDependent's then delegate group invocation to proxy group. // return proxyTaskGroupWrapper.taskGroup() .invokeInternAsync(context, true, processedKeys); } else { return invokeInternAsync(context, false, null); } } } }); }
private void setImplicitDataDisks(Func0<Integer> nextLun) { VirtualMachineScaleSetStorageProfile storageProfile = this.vmss .inner() .virtualMachineProfile() .storageProfile(); List<VirtualMachineScaleSetDataDisk> dataDisks = storageProfile.dataDisks(); for (VirtualMachineScaleSetDataDisk dataDisk : this.implicitDisksToAssociate) { dataDisk.withCreateOption(DiskCreateOptionTypes.EMPTY); if (dataDisk.lun() == -1) { dataDisk.withLun(nextLun.call()); } if (dataDisk.managedDisk() == null) { dataDisk.withManagedDisk(new VirtualMachineScaleSetManagedDiskParameters()); } if (dataDisk.caching() == null) { dataDisk.withCaching(getDefaultCachingType()); } if (dataDisk.managedDisk().storageAccountType() == null) { dataDisk.managedDisk().withStorageAccountType(getDefaultStorageAccountType()); } dataDisk.withName(null); dataDisks.add(dataDisk); } }
@Override public Observable<Indexable> invokeTaskAsync(TaskGroup.InvocationContext context) { return Observable.defer(new Func0<Observable<Indexable>>() { @Override public Observable<Indexable> call() { return getMSIExtensionAsync(); } }).flatMap(new Func1<Indexable, Observable<Indexable>>() { @Override public Observable<Indexable> call(Indexable extension) { return updateMSIExtensionAsync((VirtualMachineExtension) extension); } }) .switchIfEmpty(Observable.defer(new Func0<Observable<Indexable>>() { @Override public Observable<Indexable> call() { return installMSIExtensionAsync(); } })); }
private void setAttachableNewDataDisks(Func0<Integer> nextLun) { List<DataDisk> dataDisks = vm.inner().storageProfile().dataDisks(); for (Map.Entry<String, DataDisk> entry : this.newDisksToAttach.entrySet()) { Disk managedDisk = vm.<Disk>taskResult(entry.getKey()); DataDisk dataDisk = entry.getValue(); dataDisk.withCreateOption(DiskCreateOptionTypes.ATTACH); if (dataDisk.lun() == -1) { dataDisk.withLun(nextLun.call()); } dataDisk.withManagedDisk(new ManagedDiskParametersInner()); dataDisk.managedDisk().withId(managedDisk.id()); if (dataDisk.caching() == null) { dataDisk.withCaching(getDefaultCachingType()); } // Don't set default storage account type for the attachable managed disks, it is already // defined in the managed disk and not allowed to change. dataDisk.withName(null); dataDisks.add(dataDisk); } }
private void setAttachableExistingDataDisks(Func0<Integer> nextLun) { List<DataDisk> dataDisks = vm.inner().storageProfile().dataDisks(); for (DataDisk dataDisk : this.existingDisksToAttach) { dataDisk.withCreateOption(DiskCreateOptionTypes.ATTACH); if (dataDisk.lun() == -1) { dataDisk.withLun(nextLun.call()); } if (dataDisk.caching() == null) { dataDisk.withCaching(getDefaultCachingType()); } // Don't set default storage account type for the attachable managed disks, it is already // defined in the managed disk and not allowed to change. dataDisk.withName(null); dataDisks.add(dataDisk); } }
private void setImplicitDataDisks(Func0<Integer> nextLun) { List<DataDisk> dataDisks = vm.inner().storageProfile().dataDisks(); for (DataDisk dataDisk : this.implicitDisksToAssociate) { dataDisk.withCreateOption(DiskCreateOptionTypes.EMPTY); if (dataDisk.lun() == -1) { dataDisk.withLun(nextLun.call()); } if (dataDisk.caching() == null) { dataDisk.withCaching(getDefaultCachingType()); } if (dataDisk.managedDisk() == null) { dataDisk.withManagedDisk(new ManagedDiskParametersInner()); } if (dataDisk.managedDisk().storageAccountType() == null) { dataDisk.managedDisk().withStorageAccountType(getDefaultStorageAccountType()); } dataDisk.withName(null); dataDisks.add(dataDisk); } }
@Override public Observable<DnsZone> createResourceAsync() { final DnsZoneImpl self = this; Func0<Observable<ZoneInner>> createOrUpdateAsync = new Func0<Observable<ZoneInner>>() { @Override public Observable<ZoneInner> call() { if (self.isInCreateMode()) { return self.manager().inner().zones().createOrUpdateAsync(self.resourceGroupName(), self.name(), self.inner(), null/*IfMatch*/, self.dnsZoneETag/*IfNoneMatch*/); } else { return self.manager().inner().zones().createOrUpdateAsync(self.resourceGroupName(), self.name(), self.inner(), self.dnsZoneETag/*IfMatch*/, null/*IfNoneMatch*/); } } }; return createOrUpdateAsync.call() .map(innerToFluentMap(this)) .map(new Func1<DnsZone, DnsZone>() { @Override public DnsZone call(DnsZone dnsZone) { self.dnsZoneETag = null; return dnsZone; } }); }
public Observable<MailStatistics> getStatistics() { return Observable.defer(new Func0<Observable<MailStatistics>>() { @Override public Observable<MailStatistics> call() { delay(); Observable o = checkExceptions(); if (o != null) { return o; } Map<String, MailsCount> mailsCountMap = new HashMap<String, MailsCount>(); for (Mail m : mails) { MailsCount count = mailsCountMap.get(m.getLabel()); if (count == null) { count = new MailsCount(m.getLabel(), 0); mailsCountMap.put(m.getLabel(), count); } count.incrementCount(); } return Observable.just( new MailStatistics(new ArrayList<MailsCount>(mailsCountMap.values()))); } }); }
public Observable<List<ProfileScreen>> getProfileScreens(Person person) { // TODO throw error from time to time return Observable.defer(new Func0<Observable<List<ProfileScreen>>>() { @Override public Observable<List<ProfileScreen>> call() { List<ProfileScreen> screens = new ArrayList<ProfileScreen>(); screens.add (new ProfileScreen(ProfileScreen.TYPE_MAILS, "Mails")); screens.add(new ProfileScreen(ProfileScreen.TYPE_ABOUT, "About")); return Observable.just(screens); } }).delay(2, TimeUnit.SECONDS); }
/** * This is a shortcut that can be used instead of combining together * {@link #restartable(int, Func0)}, * {@link #deliverFirst()}, * {@link #split(Action2, Action2)}. * * @param restartableId an id of the restartable. * @param observableFactory a factory that should return an Observable when the restartable should run. * @param onNext a callback that will be called when received data should be delivered to view. * @param onError a callback that will be called if the source observable emits onError. * @param <T> the type of the observable. */ public <T> void restartableFirst(int restartableId, final Func0<Observable<T>> observableFactory, final Action2<View, T> onNext, @Nullable final Action2<View, Throwable> onError) { restartable(restartableId, new Func0<Subscription>() { @Override public Subscription call() { return observableFactory.call() .compose(RxPresenter.this.<T>deliverFirst()) .subscribe(split(onNext, onError)); } }); }
/** * This is a shortcut that can be used instead of combining together * {@link #restartable(int, Func0)}, * {@link #deliverLatestCache()}, * {@link #split(Action2, Action2)}. * * @param restartableId an id of the restartable. * @param observableFactory a factory that should return an Observable when the restartable should run. * @param onNext a callback that will be called when received data should be delivered to view. * @param onError a callback that will be called if the source observable emits onError. * @param <T> the type of the observable. */ public <T> void restartableLatestCache(int restartableId, final Func0<Observable<T>> observableFactory, final Action2<View, T> onNext, @Nullable final Action2<View, Throwable> onError) { restartable(restartableId, new Func0<Subscription>() { @Override public Subscription call() { return observableFactory.call() .compose(RxPresenter.this.<T>deliverLatestCache()) .subscribe(split(onNext, onError)); } }); }
/** * This is a shortcut that can be used instead of combining together * {@link #restartable(int, Func0)}, * {@link #deliverReplay()}, * {@link #split(Action2, Action2)}. * * @param restartableId an id of the restartable. * @param observableFactory a factory that should return an Observable when the restartable should run. * @param onNext a callback that will be called when received data should be delivered to view. * @param onError a callback that will be called if the source observable emits onError. * @param <T> the type of the observable. */ public <T> void restartableReplay(int restartableId, final Func0<Observable<T>> observableFactory, final Action2<View, T> onNext, @Nullable final Action2<View, Throwable> onError) { restartable(restartableId, new Func0<Subscription>() { @Override public Subscription call() { return observableFactory.call() .compose(RxPresenter.this.<T>deliverReplay()) .subscribe(split(onNext, onError)); } }); }
@Test public void testRestartable() throws Exception { RxPresenter presenter = new RxPresenter(); presenter.create(null); Func0<Subscription> restartable = mock(Func0.class); Subscription subscription = mock(Subscription.class); when(restartable.call()).thenReturn(subscription); when(subscription.isUnsubscribed()).thenReturn(false); presenter.restartable(1, restartable); verifyNoMoreInteractions(restartable); presenter.start(1); verify(restartable, times(1)).call(); verifyNoMoreInteractions(restartable); Bundle bundle = BundleMock.mock(); presenter.onSave(bundle); presenter = new RxPresenter(); presenter.create(bundle); presenter.restartable(1, restartable); verify(restartable, times(2)).call(); verifyNoMoreInteractions(restartable); }
@Test public void testStopRestartable() throws Exception { RxPresenter presenter = new RxPresenter(); presenter.onCreate(null); Func0<Subscription> restartable = mock(Func0.class); Subscription subscription = mock(Subscription.class); when(restartable.call()).thenReturn(subscription); when(subscription.isUnsubscribed()).thenReturn(false); presenter.restartable(1, restartable); verifyNoMoreInteractions(restartable); presenter.start(1); verify(restartable, times(1)).call(); verifyNoMoreInteractions(restartable); presenter.stop(1); Bundle bundle = BundleMock.mock(); presenter.onSave(bundle); presenter = new RxPresenter(); presenter.onCreate(bundle); presenter.restartable(1, restartable); verify(restartable, times(1)).call(); verifyNoMoreInteractions(restartable); }
@Experimental public static <S, T> Observable$OnSubscribe<T> createSingleState(Func0<? extends S> generator, final Action3<? super S, Long, ? super Observer<Observable<? extends T>>> next) { return new AsyncOnSubscribeImpl((Func0) generator, new Func3<S, Long, Observer<Observable<? extends T>>, S>() { public S call(S state, Long requested, Observer<Observable<? extends T>> subscriber) { next.call(state, requested, subscriber); return state; } }); }
@Test public void testCompletedRestartableDoesNoRestart() throws Exception { RxPresenter presenter = new RxPresenter(); presenter.onCreate(null); Func0<Subscription> restartable = mock(Func0.class); Subscription subscription = mock(Subscription.class); when(restartable.call()).thenReturn(subscription); when(subscription.isUnsubscribed()).thenReturn(false); presenter.restartable(1, restartable); verifyNoMoreInteractions(restartable); presenter.start(1); verify(restartable, times(1)).call(); verifyNoMoreInteractions(restartable); when(subscription.isUnsubscribed()).thenReturn(true); Bundle bundle = BundleMock.mock(); presenter.onSave(bundle); presenter = new RxPresenter(); presenter.onCreate(bundle); presenter.restartable(1, restartable); verifyNoMoreInteractions(restartable); }
@Test public void testRestartableIsUnsubscribed() throws Exception { RxPresenter presenter = new RxPresenter(); presenter.create(null); Func0<Subscription> restartable = mock(Func0.class); Subscription subscription = mock(Subscription.class); when(restartable.call()).thenReturn(subscription); when(subscription.isUnsubscribed()).thenReturn(false); presenter.restartable(1, restartable); assertTrue(presenter.isUnsubscribed(1)); }
@Test public void testStartedRestartableIsNotUnsubscribed() throws Exception { RxPresenter presenter = new RxPresenter(); presenter.create(null); Func0<Subscription> restartable = mock(Func0.class); Subscription subscription = mock(Subscription.class); when(restartable.call()).thenReturn(subscription); when(subscription.isUnsubscribed()).thenReturn(false); presenter.restartable(1, restartable); assertTrue(presenter.isUnsubscribed(1)); presenter.start(1); assertFalse(presenter.isUnsubscribed(1)); }
public Observable<Mail> addMail(final Mail mail) { return Observable.defer(new Func0<Observable<Mail>>() { @Override public Observable<Mail> call() { mail.id(lastId.incrementAndGet()); mails.add(mail); return Observable.just(mail); } }); }
/** * Get the total requests and failed requests for instance level. * * @param metricsName Name of the metrics * @param metricToEvaluate observable method to be called for preparation of metrics. * @return Guage metrics */ protected Monitor<Number> getRequestValuesGaugeMonitor(final String metricsName, final Func0<Number> metricToEvaluate) { return new GaugeMetric(MonitorConfig.builder(metricsName).build()) { @Override public Number getValue() { return metricToEvaluate.call(); } }; }
/** * Get the total requests and failed requests for each producer. * * @param name Name of the metrics * @param metricToEvaluate observable method to be called for preparation of metrics. * @return Guage metrics */ protected Monitor<String> getInfoMetricsOperationalAndInstance(final String name, final Func0<String> metricToEvaluate) { return new InformationalMetric(MonitorConfig.builder(name).build()) { @Override public String getValue() { return metricToEvaluate.call(); } }; }
private OperatorMulticast(Object guard, AtomicReference<Subject<? super T, ? extends R>> connectedSubject, List<Subscriber<? super R>> waitingForConnect, Observable<? extends T> source, Func0<? extends Subject<? super T, ? extends R>> subjectFactory) { super(new AnonymousClass1(guard, connectedSubject, waitingForConnect)); this.guard = guard; this.connectedSubject = connectedSubject; this.waitingForConnect = waitingForConnect; this.source = source; this.subjectFactory = subjectFactory; }
private <T> Observable<T> createDeferObservable(final ImplFunc<T> impl) { return Observable.defer(new Func0<Observable<T>>() { @Override public Observable<T> call() { try { T rr = impl.invoke(); return Observable.just(rr); } catch (Exception e) { return Observable.error(e); } } }).subscribeOn(scheduler); }
public static <T> RxBusSub<T> create(final Class<T> tClass) { return RxBusSub.create(new Func0<Observable<T>>() { @Override public Observable<T> call() { return RxBus.getDefault().toObservable(tClass); } }); }
public final RxBusSub<T> observeOn(final Scheduler scheduler) { return RxBusSub.create(new Func0<Observable<T>>() { @Override public Observable<T> call() { return observableFunc0.call().observeOn(scheduler); } }); }
public final <R> RxBusSub<R> compose(final Observable.Transformer<? super T, ? extends R> transformer) { return RxBusSub.create(new Func0<Observable<R>>() { @Override public Observable<R> call() { return observableFunc0.call().compose(transformer); } }); }
private Observable<Response> getDriversObserver() { return Observable.defer(new Func0<Observable<Response>>() { @Override public Observable<Response> call() { return mApiService.getDrivers(); } }); }