/** * Create an observable to load the xml file in the background. mXmlInfoRx is subscribed to * it and will complete when the file has been loaded. */ private void startXmlLoading() { mXmlInfoRx = AsyncSubject.create(); Observable.create(new Observable.OnSubscribe<List<HeroInfo>>() { @Override public void call(Subscriber<? super List<HeroInfo>> observer) { XmlResourceParser parser = mMainActivityPresenter.getContext().getResources() .getXml(R.xml.hero_info_from_web); List<HeroInfo> heroInfoList = new ArrayList<>(); LoadHeroXml.Load(parser, heroInfoList); observer.onNext(heroInfoList); observer.onCompleted(); } }) //TODO-beauty: check if the XML parsing should be done on the io thread instead? .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(mXmlInfoRx); }
@Test public void testNeverCompleted() { AsyncSubject<String> subject = AsyncSubject.create(); @SuppressWarnings("unchecked") Subscriber<String> observer = mock(Subscriber.class); subject.subscribe(observer); subject.onNext("one"); subject.onNext("two"); subject.onNext("three"); verify(observer, Mockito.never()).onNext(anyString()); verify(observer, Mockito.never()).onError(testException); verify(observer, Mockito.never()).onComplete(); }
@Test public void testCompleted() { AsyncSubject<String> subject = AsyncSubject.create(); @SuppressWarnings("unchecked") Subscriber<String> observer = mock(Subscriber.class); subject.subscribe(observer); subject.onNext("one"); subject.onNext("two"); subject.onNext("three"); subject.onComplete(); verify(observer, times(1)).onNext("three"); verify(observer, Mockito.never()).onError(any(Throwable.class)); verify(observer, times(1)).onComplete(); }
@Test(expected = NullPointerException.class) public void testNull() { AsyncSubject<String> subject = AsyncSubject.create(); @SuppressWarnings("unchecked") Subscriber<String> observer = mock(Subscriber.class); subject.subscribe(observer); subject.onNext(null); // Reactive-Streams prohibits null value // subject.onComplete(); // // verify(observer, times(1)).onNext(null); // verify(observer, Mockito.never()).onError(any(Throwable.class)); // verify(observer, times(1)).onComplete(); }
@Test public void testSubscribeAfterCompleted() { AsyncSubject<String> subject = AsyncSubject.create(); @SuppressWarnings("unchecked") Subscriber<String> observer = mock(Subscriber.class); TestSubscriber<String> ts = new TestSubscriber<>(observer, 0); subject.onNext("one"); subject.onNext("two"); subject.onNext("three"); subject.onComplete(); subject.subscribe(ts); ts.requestMore(1); verify(observer, times(1)).onNext("three"); verify(observer, Mockito.never()).onError(any(Throwable.class)); verify(observer, times(1)).onComplete(); }
@Test public void testSubscribeAfterCompletedRequestBefore() { AsyncSubject<String> subject = AsyncSubject.create(); @SuppressWarnings("unchecked") Subscriber<String> observer = mock(Subscriber.class); TestSubscriber<String> ts = new TestSubscriber<>(observer); subject.onNext("one"); subject.onNext("two"); subject.onNext("three"); subject.onComplete(); subject.subscribe(ts); ts.requestMore(1); verify(observer, times(1)).onNext("three"); verify(observer, Mockito.never()).onError(any(Throwable.class)); verify(observer, times(1)).onComplete(); }
@Test public void testSubscribeAfterCompletedNoRequest() { AsyncSubject<String> subject = AsyncSubject.create(); @SuppressWarnings("unchecked") Subscriber<String> observer = mock(Subscriber.class); subject.onNext("one"); subject.onNext("two"); subject.onNext("three"); subject.onComplete(); subject.subscribe(observer); verify(observer, never()).onNext(anyString()); verify(observer, never()).onError(any(Throwable.class)); verify(observer, never()).onComplete(); }
@Test public void testSubscribeAfterError() { AsyncSubject<String> subject = AsyncSubject.create(); @SuppressWarnings("unchecked") Subscriber<String> observer = mock(Subscriber.class); subject.onNext("one"); subject.onNext("two"); subject.onNext("three"); RuntimeException re = new RuntimeException("failed"); subject.onError(re); subject.subscribe(observer); verify(observer, times(1)).onError(re); verify(observer, Mockito.never()).onNext(any(String.class)); verify(observer, Mockito.never()).onComplete(); }
@Test public void testError() { AsyncSubject<String> subject = AsyncSubject.create(); @SuppressWarnings("unchecked") Subscriber<String> observer = mock(Subscriber.class); subject.subscribe(observer); subject.onNext("one"); subject.onNext("two"); subject.onNext("three"); subject.onError(testException); subject.onNext("four"); subject.onError(new Throwable()); subject.onComplete(); verify(observer, Mockito.never()).onNext(anyString()); verify(observer, times(1)).onError(testException); verify(observer, Mockito.never()).onComplete(); }
@Test public void testEmptySubjectCompleted() { AsyncSubject<String> subject = AsyncSubject.create(); @SuppressWarnings("unchecked") Subscriber<String> observer = mock(Subscriber.class); subject.subscribe(observer); subject.onComplete(); InOrder inOrder = inOrder(observer); inOrder.verify(observer, never()).onNext(null); inOrder.verify(observer, never()).onNext(any(String.class)); inOrder.verify(observer, times(1)).onComplete(); inOrder.verifyNoMoreInteractions(); assertFalse(subject.hasSubscribers()); }
@Ignore // FIXME Reactive-Streams rules prohibit throwing exceptions @Test public void testOnErrorThrowsDoesntPreventDelivery() { AsyncSubject<String> ps = AsyncSubject.create(); ps.subscribe(); TestSubscriber<String> ts = new TestSubscriber<>(); ps.subscribe(ts); try { ps.onError(new RuntimeException("an exception")); fail("expect OnErrorNotImplementedException"); } catch (OnErrorNotImplementedException e) { // ignore } // even though the onError above throws we should still receive it on the other subscriber ts.assertError(); }
/** * This one has multiple failures so should get a CompositeException */ @Ignore // FIXME Reactive-Streams rules prohibit throwing exceptions @Test public void testOnErrorThrowsDoesntPreventDelivery2() { AsyncSubject<String> ps = AsyncSubject.create(); ps.subscribe(); ps.subscribe(); TestSubscriber<String> ts = new TestSubscriber<>(); ps.subscribe(ts); ps.subscribe(); ps.subscribe(); ps.subscribe(); try { ps.onError(new RuntimeException("an exception")); fail("expect OnErrorNotImplementedException"); } catch (CompositeException e) { // we should have 5 of them assertEquals(5, e.getExceptions().size()); } // even though the onError above throws we should still receive it on the other subscriber ts.assertError(); }
@Test public void testCurrentStateMethodsNormal() { AsyncSubject<Object> as = AsyncSubject.create(); assertFalse(as.hasValue()); assertFalse(as.hasThrowable()); assertFalse(as.hasComplete()); assertNull(as.getValue()); assertNull(as.getThrowable()); as.onNext(1); assertTrue(as.hasValue()); assertFalse(as.hasThrowable()); assertFalse(as.hasComplete()); assertEquals(1, as.getValue()); assertNull(as.getThrowable()); as.onComplete(); assertTrue(as.hasValue()); assertFalse(as.hasThrowable()); assertTrue(as.hasComplete()); assertEquals(1, as.getValue()); assertNull(as.getThrowable()); }
@Test public void testCurrentStateMethodsEmpty() { AsyncSubject<Object> as = AsyncSubject.create(); assertFalse(as.hasValue()); assertFalse(as.hasThrowable()); assertFalse(as.hasComplete()); assertNull(as.getValue()); assertNull(as.getThrowable()); as.onComplete(); assertFalse(as.hasValue()); assertFalse(as.hasThrowable()); assertTrue(as.hasComplete()); assertNull(as.getValue()); assertNull(as.getThrowable()); }
@Test public void testCurrentStateMethodsError() { AsyncSubject<Object> as = AsyncSubject.create(); assertFalse(as.hasValue()); assertFalse(as.hasThrowable()); assertFalse(as.hasComplete()); assertNull(as.getValue()); assertNull(as.getThrowable()); as.onError(new TestException()); assertFalse(as.hasValue()); assertTrue(as.hasThrowable()); assertFalse(as.hasComplete()); assertNull(as.getValue()); assertTrue(as.getThrowable() instanceof TestException); }
/** * Starts immediately the execution of the given Callable with a thread from * {@link Schedulers#io()}. * * The caller can await the execution termination through subscribing to the {@link Single} return value. * It's safe to "share" the {@link Single} return value reference and subscribe to it as many times as you want. * All subscribers get the result value (or the error) individually. * * Cancellation? The execution is automatically cancelled when all subscribers do unsubscribe while * the execution is still running. The given {@link Callable} will be interrupted. * * If there is no subscriber ever to the {@link Single} return value, the callable will be executed unobserved. * Make sure to have some kind of "exception handling" also for that case (like try-catch-logging blocks or * {@link Thread.UncaughtExceptionHandler}) to not "miss" issues. * * @param callable the code to execute * @param <T> type of result * @return Single instance delivering asynchronously the result of the callable */ // experimental public static <T> Single<T> executeAsync(Callable<T> callable) { AsyncSubject<T> resultSubject = AsyncSubject.create(); final Subscription asyncOp = Observable.fromCallable(callable) .subscribeOn(Schedulers.io()) .lift(preserveFullStackTrace()) .subscribe( t -> { resultSubject.onNext(t); resultSubject.onCompleted(); }, throwable -> { resultSubject.onError(throwable); resultSubject.onCompleted(); } ); return resultSubject.doOnUnsubscribe(asyncOp::unsubscribe) .share() .toSingle(); }
/** * Creates a {@link ViewQueryResponse} from its request based on the returned info. * * Note that observables are attached to this response which are completed later in the response cycle. * * @return the initial response. */ private CouchbaseResponse handleViewQueryResponse() { int code = responseHeader.getStatus().code(); String phrase = responseHeader.getStatus().reasonPhrase(); ResponseStatus status = ResponseStatusConverter.fromHttp(responseHeader.getStatus().code()); Scheduler scheduler = env().scheduler(); long ttl = env().autoreleaseAfter(); viewRowObservable = UnicastAutoReleaseSubject.create(ttl, TimeUnit.MILLISECONDS, scheduler); viewInfoObservable = UnicastAutoReleaseSubject.create(ttl, TimeUnit.MILLISECONDS, scheduler); viewErrorObservable = AsyncSubject.create(); //set up trace ids on all these UnicastAutoReleaseSubjects, so that if they get in a bad state // (multiple subscribers or subscriber coming in too late) we can trace back to here viewRowObservable.withTraceIdentifier("viewRow"); viewInfoObservable.withTraceIdentifier("viewInfo"); return new ViewQueryResponse( viewRowObservable.onBackpressureBuffer().observeOn(scheduler), viewInfoObservable.onBackpressureBuffer().observeOn(scheduler), viewErrorObservable.observeOn(scheduler), code, phrase, status, currentRequest() ); }
/** * Initialize this parser for a response parsing cycle. * * * @param responseContent the raw content to parse from. * @param responseStatus the status of the response. * @param request the original request. */ public void initialize(final ByteBuf responseContent, final ResponseStatus responseStatus, final CouchbaseRequest request) { this.requestID = ""; this.clientContextID = ""; //initialize to empty string instead of null as it is optional on the wire this.sentResponse = false; this.response = null; this.status = responseStatus; this.responseContent = responseContent; this.currentRequest = request; queryRowObservable = UnicastAutoReleaseSubject.create(ttl, TimeUnit.MILLISECONDS, scheduler); queryErrorObservable = UnicastAutoReleaseSubject.create(ttl, TimeUnit.MILLISECONDS, scheduler); queryStatusObservable = AsyncSubject.create(); queryInfoObservable = UnicastAutoReleaseSubject.create(ttl, TimeUnit.MILLISECONDS, scheduler); querySignatureObservable = UnicastAutoReleaseSubject.create(ttl, TimeUnit.MILLISECONDS, scheduler); queryProfileInfoObservable = UnicastAutoReleaseSubject.create(ttl, TimeUnit.MILLISECONDS, scheduler); parser.initialize(responseContent); initialized = true; }
@Test(expected = RequestCancelledException.class) public void shouldCancelRequestOnFailFastStrategy() { Endpoint endpoint1 = mock(Endpoint.class); EndpointStates e1s = new EndpointStates(LifecycleState.DISCONNECTED); when(endpoint1.states()).thenReturn(e1s.states()); when(endpoint1.connect()).thenReturn(Observable.just(LifecycleState.DISCONNECTED)); when(endpoint1.disconnect()).thenReturn(Observable.just(LifecycleState.DISCONNECTING)); CoreEnvironment env = mock(CoreEnvironment.class); when(env.retryStrategy()).thenReturn(FailFastRetryStrategy.INSTANCE); int endpoints = 1; SelectionStrategy strategy = mock(SelectionStrategy.class); when(strategy.select(any(CouchbaseRequest.class), any(List.class))).thenReturn(null); InstrumentedService service = new InstrumentedService(host, bucket, password, port, env, endpoints, endpoints, strategy, null, factory); CouchbaseRequest request = mock(CouchbaseRequest.class); when(request.isActive()).thenReturn(true); AsyncSubject<CouchbaseResponse> response = AsyncSubject.create(); when(request.observable()).thenReturn(response); service.send(request); response.toBlocking().single(); }
@Test public void shouldFailObservableIfCouldNotConnect() { InstrumentedService service = new InstrumentedService(host, bucket, password, port, env, null, factory); Endpoint endpoint = mock(Endpoint.class); final EndpointStates endpointStates = new EndpointStates(LifecycleState.DISCONNECTED); when(endpoint.states()).thenReturn(endpointStates.states()); when(endpoint.connect()).thenReturn(Observable.just(LifecycleState.DISCONNECTED)); when(factory.create(host, bucket, bucket, password, port, env, null)).thenReturn(endpoint); CouchbaseRequest req = mock(CouchbaseRequest.class); AsyncSubject<CouchbaseResponse> reqObservable = AsyncSubject.create(); when(req.observable()).thenReturn(reqObservable); try { service.send(req); reqObservable.toBlocking().single(); assertTrue("Should've failed but did not", false); } catch(CouchbaseException ex) { assertEquals("Could not connect endpoint.", ex.getMessage()); } catch(Throwable tr) { assertTrue(tr.getMessage(), false); } }
@Test public void shouldFailObservableIfErrorOnConnect() { InstrumentedService service = new InstrumentedService(host, bucket, password, port, env, null, factory); Endpoint endpoint = mock(Endpoint.class); final EndpointStates endpointStates = new EndpointStates(LifecycleState.DISCONNECTED); when(endpoint.states()).thenReturn(endpointStates.states()); when(endpoint.connect()).thenReturn(Observable.<LifecycleState>error(new AuthenticationException())); when(factory.create(host, bucket, bucket, password, port, env, null)).thenReturn(endpoint); CouchbaseRequest req = mock(CouchbaseRequest.class); AsyncSubject<CouchbaseResponse> reqObservable = AsyncSubject.create(); when(req.observable()).thenReturn(reqObservable); try { service.send(req); reqObservable.toBlocking().single(); assertTrue("Should've failed but did not", false); } catch(AuthenticationException ex) { assertTrue(true); assertEquals(LifecycleState.IDLE, service.state()); } catch(Throwable tr) { assertTrue(tr.getMessage(), false); } }
@Test(expected = RequestCancelledException.class) public void shouldCancelIfServiceCouldNotBeLocated() { ServiceRegistry registryMock = mock(ServiceRegistry.class); Service serviceMock = mock(Service.class); when(registryMock.serviceBy(ServiceType.BINARY, "bucket")).thenReturn(serviceMock); when(serviceMock.states()).thenReturn(Observable.<LifecycleState>empty()); CoreEnvironment env = mock(CoreEnvironment.class); when(env.retryStrategy()).thenReturn(FailFastRetryStrategy.INSTANCE); CouchbaseNode node = new CouchbaseNode(host, registryMock, env, null, ServiceFactory.INSTANCE); CouchbaseRequest request = mock(CouchbaseRequest.class); when(request.isActive()).thenReturn(true); AsyncSubject<CouchbaseResponse> response = AsyncSubject.create(); when(request.observable()).thenReturn(response); node.send(request); response.toBlocking().single(); }
@Test public void shouldFailWhenUsedAgainstMemcacheBucket() { Locator locator = new ViewLocator(0); ClusterConfig config = mock(ClusterConfig.class); when(config.bucketConfig("default")).thenReturn(mock(MemcachedBucketConfig.class)); CouchbaseRequest request = mock(ViewQueryRequest.class); Subject<CouchbaseResponse, CouchbaseResponse> response = AsyncSubject.create(); when(request.bucket()).thenReturn("default"); when(request.observable()).thenReturn(response); TestSubscriber<CouchbaseResponse> subscriber = new TestSubscriber<CouchbaseResponse>(); response.subscribe(subscriber); locator.locateAndDispatch(request, Collections.<Node>emptyList(), config, null, null); subscriber.awaitTerminalEvent(1, TimeUnit.SECONDS); List<Throwable> errors = subscriber.getOnErrorEvents(); assertEquals(1, errors.size()); assertTrue(errors.get(0) instanceof ServiceNotAvailableException); }
@Test public void shouldReleaseStoreRequestContentOnSuccess() throws Exception { ByteBuf content = Unpooled.copiedBuffer("content", CharsetUtil.UTF_8); FullBinaryMemcacheResponse response = new DefaultFullBinaryMemcacheResponse(KEY, Unpooled.EMPTY_BUFFER, content); response.setStatus(BinaryMemcacheResponseStatus.SUCCESS); UpsertRequest requestMock = mock(UpsertRequest.class); ByteBuf requestContent = Unpooled.copiedBuffer("content", CharsetUtil.UTF_8); when(requestMock.bucket()).thenReturn("bucket"); when(requestMock.observable()).thenReturn(AsyncSubject.<CouchbaseResponse>create()); when(requestMock.content()).thenReturn(requestContent); requestQueue.add(requestMock); assertEquals(1, content.refCnt()); assertEquals(1, requestContent.refCnt()); channel.writeInbound(response); assertEquals(1, content.refCnt()); assertEquals(0, requestContent.refCnt()); }
@Test public void shouldNotReleaseStoreRequestContentOnRetry() throws Exception { ByteBuf content = Unpooled.copiedBuffer("content", CharsetUtil.UTF_8); FullBinaryMemcacheResponse response = new DefaultFullBinaryMemcacheResponse(KEY, Unpooled.EMPTY_BUFFER, content); response.setStatus(KeyValueStatus.ERR_NOT_MY_VBUCKET.code()); UpsertRequest requestMock = mock(UpsertRequest.class); ByteBuf requestContent = Unpooled.copiedBuffer("content", CharsetUtil.UTF_8); when(requestMock.bucket()).thenReturn("bucket"); when(requestMock.observable()).thenReturn(AsyncSubject.<CouchbaseResponse>create()); when(requestMock.content()).thenReturn(requestContent); requestQueue.add(requestMock); assertEquals(1, content.refCnt()); assertEquals(1, requestContent.refCnt()); channel.writeInbound(response); assertEquals(1, content.refCnt()); assertEquals(1, requestContent.refCnt()); }
@Test public void shouldReleaseAppendRequestContentOnSuccess() throws Exception { ByteBuf content = Unpooled.copiedBuffer("content", CharsetUtil.UTF_8); FullBinaryMemcacheResponse response = new DefaultFullBinaryMemcacheResponse(KEY, Unpooled.EMPTY_BUFFER, content); response.setStatus(BinaryMemcacheResponseStatus.SUCCESS); AppendRequest requestMock = mock(AppendRequest.class); ByteBuf requestContent = Unpooled.copiedBuffer("content", CharsetUtil.UTF_8); when(requestMock.bucket()).thenReturn("bucket"); when(requestMock.observable()).thenReturn(AsyncSubject.<CouchbaseResponse>create()); when(requestMock.content()).thenReturn(requestContent); requestQueue.add(requestMock); assertEquals(1, content.refCnt()); assertEquals(1, requestContent.refCnt()); channel.writeInbound(response); assertEquals(1, content.refCnt()); assertEquals(0, requestContent.refCnt()); }
@Test public void shouldNotReleaseAppendRequestContentOnRetry() throws Exception { ByteBuf content = Unpooled.copiedBuffer("content", CharsetUtil.UTF_8); FullBinaryMemcacheResponse response = new DefaultFullBinaryMemcacheResponse(KEY, Unpooled.EMPTY_BUFFER, content); response.setStatus(KeyValueStatus.ERR_NOT_MY_VBUCKET.code()); AppendRequest requestMock = mock(AppendRequest.class); ByteBuf requestContent = Unpooled.copiedBuffer("content", CharsetUtil.UTF_8); when(requestMock.bucket()).thenReturn("bucket"); when(requestMock.observable()).thenReturn(AsyncSubject.<CouchbaseResponse>create()); requestQueue.add(requestMock); assertEquals(1, content.refCnt()); assertEquals(1, requestContent.refCnt()); channel.writeInbound(response); assertEquals(1, content.refCnt()); assertEquals(1, requestContent.refCnt()); }
@Test public void shouldReleasePrependRequestContentOnSuccess() throws Exception { ByteBuf content = Unpooled.copiedBuffer("content", CharsetUtil.UTF_8); FullBinaryMemcacheResponse response = new DefaultFullBinaryMemcacheResponse(KEY, Unpooled.EMPTY_BUFFER, content); response.setStatus(BinaryMemcacheResponseStatus.SUCCESS); PrependRequest requestMock = mock(PrependRequest.class); ByteBuf requestContent = Unpooled.copiedBuffer("content", CharsetUtil.UTF_8); when(requestMock.bucket()).thenReturn("bucket"); when(requestMock.observable()).thenReturn(AsyncSubject.<CouchbaseResponse>create()); when(requestMock.content()).thenReturn(requestContent); requestQueue.add(requestMock); assertEquals(1, content.refCnt()); assertEquals(1, requestContent.refCnt()); channel.writeInbound(response); assertEquals(1, content.refCnt()); assertEquals(0, requestContent.refCnt()); }
@Test public void shouldNotReleasePrependRequestContentOnRetry() throws Exception { ByteBuf content = Unpooled.copiedBuffer("content", CharsetUtil.UTF_8); FullBinaryMemcacheResponse response = new DefaultFullBinaryMemcacheResponse(KEY, Unpooled.EMPTY_BUFFER, content); response.setStatus(KeyValueStatus.ERR_NOT_MY_VBUCKET.code()); PrependRequest requestMock = mock(PrependRequest.class); ByteBuf requestContent = Unpooled.copiedBuffer("content", CharsetUtil.UTF_8); when(requestMock.bucket()).thenReturn("bucket"); when(requestMock.observable()).thenReturn(AsyncSubject.<CouchbaseResponse>create()); when(requestMock.content()).thenReturn(requestContent); requestQueue.add(requestMock); assertEquals(1, content.refCnt()); assertEquals(1, requestContent.refCnt()); channel.writeInbound(response); assertEquals(1, content.refCnt()); assertEquals(1, requestContent.refCnt()); }
@Test(expected = CouchbaseException.class) public void shouldFailWhenOpaqueDoesNotMatch() throws Exception { ByteBuf content = Unpooled.copiedBuffer("content", CharsetUtil.UTF_8); FullBinaryMemcacheResponse response = new DefaultFullBinaryMemcacheResponse(KEY, Unpooled.EMPTY_BUFFER, content); response.setStatus(BinaryMemcacheResponseStatus.SUCCESS); response.setOpaque(1); PrependRequest requestMock = mock(PrependRequest.class); ByteBuf requestContent = Unpooled.copiedBuffer("content", CharsetUtil.UTF_8); when(requestMock.bucket()).thenReturn("bucket"); AsyncSubject<CouchbaseResponse> responseSubject = AsyncSubject.<CouchbaseResponse>create(); when(requestMock.observable()).thenReturn(responseSubject); when(requestMock.content()).thenReturn(requestContent); when(requestMock.opaque()).thenReturn(3); requestQueue.add(requestMock); channel.writeInbound(response); assertEquals(0, content.refCnt()); responseSubject.toBlocking().single(); }
/** * Returns an observable that will be completed with a single value once the client terminates.<br> * This can be used to asynchronously wait for completion after {@link #close() close} was called. */ public Observable<Void> getTerminationObservable() { final AsyncSubject<Void> termSubject = AsyncSubject.create(); stateController.statusObservable().subscribe(new Observer<State>() { @Override public void onCompleted() { termSubject.onNext(null); termSubject.onCompleted(); } @Override public void onError(Throwable e) { termSubject.onNext(null); termSubject.onCompleted(); } @Override public void onNext(State t) { } }); return termSubject; }
public void performCall(final String procedure, final EnumSet<CallFlags> flags, final ArrayNode arguments, final ObjectNode argumentsKw, final AsyncSubject<Reply> resultSubject) { final long requestId = IdGenerator.newLinearId(lastRequestId, requestMap); lastRequestId = requestId; ObjectNode options = stateController.clientConfig().objectMapper().createObjectNode(); boolean discloseMe = flags != null && flags.contains(CallFlags.DiscloseMe) ? true : false; if (discloseMe) { options.put("disclose_me", discloseMe); } final CallMessage callMsg = new CallMessage(requestId, options, procedure, arguments, argumentsKw); requestMap.put(requestId, new RequestMapEntry(CallMessage.ID, resultSubject)); connectionController.sendMessage(callMsg, IWampConnectionPromise.Empty); }
private void initSubject(Observable<Data<List<Repo>>> source) { reposSubject = AsyncSubject.create(); subscription = source .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(reposSubject); }
private static void test04() { AsyncSubject<String> subject = AsyncSubject.create(); // ReplaySubject<String> subject = ReplaySubject.create(); // BehaviorSubject<String> subject = BehaviorSubject.create(); // PublishSubject<String> subject = PublishSubject.create(); Subscription subscription = subject.subscribe(new MySubscriber<String>()); subject.onNext("test01"); subject.onNext("test02"); }
@Test public void should_only_emits_the_last_item_and_call_onComplete_without_throwing_exception_when_AsyncSubject_without_error() { Subject<String, String> asyncSubjectSubject = AsyncSubject.<String>create(); asyncSubjectSubject.subscribe(mTestSubscriber); asyncSubjectSubject.onNext("One"); asyncSubjectSubject.onNext("Two"); asyncSubjectSubject.onCompleted(); mTestSubscriber.assertValues("Two"); mTestSubscriber.assertNoErrors(); mTestSubscriber.assertCompleted(); }
@Test public void should_only_throw_exception_when_AsyncSubject_whit_error() { Subject<String, String> asyncSubjectSubject = AsyncSubject.<String>create(); asyncSubjectSubject.subscribe(mTestSubscriber); asyncSubjectSubject.onNext("One"); asyncSubjectSubject.onError(new RuntimeException("Error occurs")); asyncSubjectSubject.onNext("Two"); mTestSubscriber.assertNoValues(); mTestSubscriber.assertError(RuntimeException.class); mTestSubscriber.assertNotCompleted(); }
/** * Create an observable to load SimilarityTest (basically this loads a picture of each hero in * the game to be used in image recognition) in the background. mSimilarityTestRx is subscribed * to this observable and will complete when the file has been loaded. */ private void startSimilarityTestLoading() { mSimilarityTestRx = AsyncSubject.create(); Observable.create(new Observable.OnSubscribe<SimilarityTest>() { @Override public void call(Subscriber<? super SimilarityTest> subscriber) { subscriber.onNext(new SimilarityTest(mMainActivityPresenter.getContext())); subscriber.onCompleted(); } }) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(mSimilarityTestRx); }
private void startSqlLoading() { mAdvantagesSqlRx = AsyncSubject.create(); Observable.create(new Observable.OnSubscribe<SqlLoader>() { @Override public void call(Subscriber<? super SqlLoader> subscriber) { subscriber.onNext(new SqlLoader(mMainActivityPresenter.getContext())); subscriber.onCompleted(); } }) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(mAdvantagesSqlRx); }
@Test public void testUnsubscribeBeforeCompleted() { AsyncSubject<String> subject = AsyncSubject.create(); @SuppressWarnings("unchecked") Subscriber<String> observer = mock(Subscriber.class); DisposableSubscriber<String> disposable = DisposableSubscriber.from(observer); subject.subscribe(disposable); subject.onNext("one"); subject.onNext("two"); disposable.dispose(); verify(observer, Mockito.never()).onNext(anyString()); verify(observer, Mockito.never()).onError(any(Throwable.class)); verify(observer, Mockito.never()).onComplete(); subject.onNext("three"); subject.onComplete(); verify(observer, Mockito.never()).onNext(anyString()); verify(observer, Mockito.never()).onError(any(Throwable.class)); verify(observer, Mockito.never()).onComplete(); assertFalse(subject.hasSubscribers()); }
public void testAsyncSubject() { AsyncSubject<Integer> s = AsyncSubject.create(); s.subscribe(new SubAction()); s.onNext(0); s.onCompleted(); s.onNext(1); s.onNext(2); // s.onCompleted(); }