protected HystrixObservable<Response> forceFallbackCommand(Invocation invocation) { return new HystrixObservable<Response>() { @Override public Observable<Response> observe() { ReplaySubject<Response> subject = ReplaySubject.create(); final Subscription sourceSubscription = toObservable().subscribe(subject); return subject.doOnUnsubscribe(sourceSubscription::unsubscribe); } @Override public Observable<Response> toObservable() { return Observable.create(f -> { try { f.onNext(FallbackPolicyManager.getFallbackResponse(handler.groupname, invocation)); } catch (Exception e) { f.onError(e); } }); } }; }
void replaySubject(){ ReplaySubject<Integer> replaySub = ReplaySubject.create(); replaySub.onNext(1); replaySub.onNext(2); Subscription subscription = replaySub.doOnSubscribe(new Action0() { @Override public void call() { Log.i(TAG, "Observer subscribed to ReplaySubject"); } }).doOnUnsubscribe(new Action0() { @Override public void call() { Log.i(TAG, "Observer unsubscribed to ReplaySubject "); } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i(TAG, "New Event received from ReplaySubject: " + integer); } }); replaySub.onNext(3); replaySub.onNext(4); subscription.unsubscribe(); replaySub.onNext(5); replaySub.onCompleted(); }
@Test public void observe() throws Exception { ReplaySubject<Cursor> collector = ReplaySubject.create(); Subscription subscription = mSimplDb.get().observe(TableTest.class).subscribe(collector); mInsert.contentValues.put(DATA, "Data"); mSimplDb.get().insert(mInsert).toBlocking().single(); mInsert.contentValues.put(DATA, "data"); mSimplDb.get().insert(mInsert).toBlocking().single(); collector.onCompleted(); subscription.unsubscribe(); Cursor cursor = collector.toList().toBlocking().single().get(collector.size() - 1); assertEquals(2, cursor.getCount()); cursor.moveToFirst(); assertData("Data", cursor); cursor.moveToLast(); assertData("data", cursor); }
@Test public void testObserveAuth() throws Exception { Observable<Boolean> isAuthenticated = observeAuth(firebaseAuth) .map(new Func1<FirebaseUser, Boolean>() { @Override public Boolean call(FirebaseUser user) { return user != null; } }); ReplaySubject<Boolean> userState = ReplaySubject.create(); isAuthenticated.subscribe(userState); await(authAnonymously(firebaseAuth)); firebaseAuth.signOut(); List<Boolean> observedState = await(userState.take(3).toList()); assertThat(observedState, contains(false, true, false)); }
@Override public Observable<CharactersResponse> loadCharacter(String query, String privateKey, String publicKey, long timestamp) { if (characterSubscription == null || characterSubscription.isUnsubscribed()) { characterSubject = ReplaySubject.create(); // generate hash using timestamp and API keys String hash = HashGenerator.generate(timestamp, privateKey, publicKey); characterSubscription = api.getCharacters(query, publicKey, hash, timestamp) .subscribeOn(scheduler.backgroundThread()) .subscribe(characterSubject); } return characterSubject.asObservable(); }
@Override public Observable<String> getAdIdObservable() { // Observable that emits Google Advertising adIdSubscription if (adIdSubscription == null || adIdSubscription.isUnsubscribed()) { adIdSubject = ReplaySubject.create(); adIdSubscription = Observable .concat(getAdIdFromMemoryObservable(), getAdIdFromGoogleObservable()) .first(entity -> entity != null) // lets retry if something goes wrong! .retry(2) .subscribe(adIdSubject); } return adIdSubject.asObservable(); }
@Override public Observable<Boolean> getAdIdEnabledObservable() { // Observable that emits Google Tracking Enabled if (adIdEnabledSubscription == null || adIdEnabledSubscription.isUnsubscribed()) { adIdEnabledSubject = ReplaySubject.create(); adIdEnabledSubscription = Observable .concat(getAdIdEnabledFromMemoryObservable(), getAdIdEnabledFromGoogleObservable()) .first(entity -> entity != null) // lets retry if something goes wrong! .retry(2) .subscribe(adIdEnabledSubject); } return adIdEnabledSubject.asObservable(); }
@Test public void operatorUnbufferedObserveOn() throws Exception { ReplaySubject<Integer> numbers = ReplaySubject.createWithSize(1); final AtomicInteger counter = new AtomicInteger(); numbers.lift(new Rx.OperatorUnbufferedObserveOn<Integer>(Schedulers.immediate())) .subscribe(new Action1<Integer>() { @Override public void call(Integer number) { counter.addAndGet(number); } }); numbers.onNext(3); numbers.onNext(3); numbers.onNext(3); assertEquals(9, counter.get()); }
/** * Creates an Observable stream from this queue. There should only ever be one subscriber to * this method. Calling this method twice will complete any previously opened observable * (leaving unprocessed elements in the queue). * @return An observable containing the contents of the queue in order */ public Observable<T> toObservable() { synchronized (mLock) { if (mSubject != null) { mSubject.onCompleted(); } if (mQueue.isEmpty()) { mSubject = ReplaySubject.create(); } else { mSubject = ReplaySubject.create(mQueue.size()); for (T data : mQueue) { mSubject.onNext(data); } } } return mSubject.map(item -> { mQueue.remove(); return item; }).asObservable(); }
public static void replaySubject() { ReplaySubject<Integer> subject = ReplaySubject.create(); subject.onNext(5); Action1<Integer> action1 = integer -> Log.i("From action1", String.valueOf(integer)); Subscription subscription1 = subject.subscribe(action1); subject.onNext(10); Action1<Integer> action2 = integer -> Log.i("From action2", String.valueOf(integer)); Subscription subscription2 = subject.subscribe(action2); subject.onNext(20); subscription1.unsubscribe(); subject.onNext(40); subscription2.unsubscribe(); subject.onNext(80); }
@Override public Observable<T> call(final Observable<T> observable) { final ReplaySubject<Notification<T>> subject = ReplaySubject.create(); final Subscription subscription = observable.materialize().subscribe(subject); return view .switchMap(new Func1<Boolean, Observable<Notification<T>>>() { @Override public Observable<Notification<T>> call(final Boolean flag) { if (flag) { return subject; } else { return Observable.empty(); } } }) .doOnUnsubscribe(new Action0() { @Override public void call() { subscription.unsubscribe(); } }) .dematerialize(); }
/** * Builds and validates the structure of the expected header then * returns an {@link Observable} that can be used to submit info to the * {@link Network} * @return a new Publisher */ public Publisher build() { subject = ReplaySubject.createWithSize(3); for(int i = 0;i < HEADER_SIZE;i++) { if(lines[i] == null) { throw new IllegalStateException("Header not properly formed (must contain 3 lines) see Header.java"); } subject.onNext(lines[i]); } Publisher p = new Publisher(); p.subject = subject; if(notifier != null) { notifier.accept(p); } return p; }
@Override public final void zugSuchen(Stellung stellung, Observer<Zug> subject) { Collection<Zug> zuege = spielregeln.liefereGueltigeZuege(stellung); if (zuege.size() > 0) { ReplaySubject<BewerteterZug> suchErgebnisse = ReplaySubject.create(); aktuelleSuchErgebnisse = suchErgebnisse; ErgebnisMelden melder = new ErgebnisMelden(subject, zuege.size()); suchErgebnisse.subscribe(melder); for (Zug zug : zuege) { EinzelnenZugUntersuchen zugUntersuchen = new EinzelnenZugUntersuchen(stellung, zug, suchErgebnisse); suchErgebnisse.subscribe(zugUntersuchen); executorService.execute(zugUntersuchen); } } else { subject.onCompleted(); } }
private Observable<List<String>> getCityNames() { if (cityNames == null) { cityNames = ReplaySubject.create(1); geoNamesClient.getCities() .flatMap(cities -> Observable.from(cities.geonames)) .map(city -> { String format = String.format("%s,%s", city.name, city.countrycode); return format; }) .toList() .subscribe(cityNames); } Log.d(TAG, "Returning city names"); return cityNames.asObservable(); }
@Test public void testServeListObservables(){ // setup Observable<Integer> os1 = Observable.range(0, 101); Observable<Integer> os2 = Observable.range(100, 101); List<Observable<Integer>> toServe = new LinkedList<Observable<Integer>>(); toServe.add(os1); toServe.add(os2); ReplaySubject <List<Observable<Integer>>> subject = ReplaySubject.create(); subject.onNext(toServe); // serve PortSelectorWithinRange portSelector = new PortSelectorWithinRange(8000, 9000); int serverPort = portSelector.acquirePort(); RemoteObservable.serveMany(serverPort, subject, Codecs.integer()); // connect Observable<Integer> oc = RemoteObservable.connect("localhost", serverPort, Codecs.integer()); // assert Observable.sumInteger(oc).toBlockingObservable().forEach(new Action1<Integer>(){ @Override public void call(Integer t1) { Assert.assertEquals(20200, t1.intValue()); // sum of number 0-200 } }); }
public ReplaySubject<String> saveBitmap(final Bitmap bitmap, final File folder, final String name) { bitmapSubject = ReplaySubject.create(); try { String path = FileUtil.savePhoto(bitmap, folder, name); onSuccess(path); } catch (IOException e) { e.printStackTrace(); onFail(e); } return bitmapSubject; }
@Override protected final void initObservable() { subject = ReplaySubject.create(); subject.onBackpressureBuffer() .observeOn(EventThread.getScheduler(observeThread)) .subscribeOn(EventThread.getScheduler(subscribeThread)) // .subscribe(new Observer<Object>() { // @Override // public void onCompleted() { // // } // // @Override // public void onError(Throwable e) { // // } // // @Override // public void onNext(Object event) { // try { // if (valid) { // handleEvent(event); // } // } catch (InvocationTargetException e) { // throwRuntimeException("Could not dispatch event: " + event.getClass() + " to subscriber " + SubscriberReplayEvent.this, e); // } // } // }); .subscribe(event -> { try { if (valid) { handleEvent(event); } } catch (InvocationTargetException e) { throwRuntimeException("Could not dispatch event: " + event.getClass() + " to subscriber " + SubscriberReplayEvent.this, e); } }); }
@Override public Observable<Response<LoginResponse>> login(String email, String password) { if (loginSubscription == null || loginSubscription.isUnsubscribed()) { loginSubject = ReplaySubject.create(); loginSubscription = api.login(email, password) .subscribeOn(scheduler.backgroundThread()) .subscribe(loginSubject); } return loginSubject.asObservable(); }
@Override public Observable<Response<ProfileResponse>> getProfile(String token, int userId) { if (profileSubscription == null || profileSubscription.isUnsubscribed()) { profileSubject = ReplaySubject.create(); profileSubscription = api.getProfile(token, userId) .subscribeOn(scheduler.backgroundThread()) .subscribe(profileSubject); } return profileSubject.asObservable(); }
@Override public Observable<Response<AvatarResponse>> setAvatar(String token, int userId, String avatarBase64) { if (avatarSubscription == null || avatarSubscription.isUnsubscribed()) { avatarSubject = ReplaySubject.create(); avatarSubscription = api.setAvatar(token, userId, avatarBase64) .subscribeOn(scheduler.backgroundThread()) .subscribe(avatarSubject); } return avatarSubject.asObservable(); }
@NonNull private ReplaySubject<Result<T>> getOrCreateSubjectFor(Class<T> itemClass) { if (subjects.get(itemClass) != null) { return subjects.get(itemClass); } ReplaySubject<Result<T>> subject = ReplaySubject.create(); subjects.put(itemClass, subject); return subject; }
public Observable<Account> createAccount(final Activity callingActivity, final User me, final String user, final String password) { ReplaySubject<Account> subject = ReplaySubject.create(); // Create the account if necessary Account account = new Account(user, accountType); final Bundle extraData = new Bundle(); extraData.putString(USER_DATA, gson.toJson(me)); boolean accountCreated = accountManager.addAccountExplicitly(account, password, extraData); if(accountCreated) { FetLife.getBus().post(new AccountCreatedEvent(account)); }else{ // If we didn't create the account, at least make sure to update it accountManager.setPassword(account, password); } subject.onNext(account); setCurrentAccount(account); if(callingActivity != null) { // Check whether we were launched explicitly to create the account Bundle extras = callingActivity.getIntent().getExtras(); if(extras != null && accountCreated) { AccountAuthenticatorResponse response = extras.getParcelable(AccountManager.KEY_ACCOUNT_AUTHENTICATOR_RESPONSE); if(response != null) { // Let them know we succeeded if so... Bundle result = new Bundle(); result.putString(AccountManager.KEY_ACCOUNT_NAME, user); result.putString(AccountManager.KEY_ACCOUNT_TYPE, accountType); response.onResult(result); } } } return subject.asObservable(); }
@Override public Observable<WeatherMix> loadWeather(String city) { if (weatherSubscription == null || weatherSubscription.isUnsubscribed()) { weatherSubject = ReplaySubject.create(); weatherSubscription = Observable.concat(memoryWeather(), diskWeather(), networkWeather(city)) .first(entity -> entity != null && isSameCity(city, entity) && isUpToDate(entity)) .subscribe(weatherSubject); } return weatherSubject.asObservable(); }
@Override public Observable<WeatherHistory> loadWeatherHistory(String city, long start, long end) { if (weatherHistorySubscription == null || weatherHistorySubscription.isUnsubscribed()) { weatherHistorySubject = ReplaySubject.create(); weatherHistorySubscription = networkWeatherHistory(city, start, end) .subscribe(weatherHistorySubject); } return weatherHistorySubject.asObservable(); }
@Test public void testObserve() throws Exception { ReplaySubject<String> values = ReplaySubject.create(); observe(reference) .map(new Func1<DataSnapshot, String>() { @Override public String call(DataSnapshot dataSnapshot) { return dataSnapshot.getValue(String.class); } }) .distinctUntilChanged() .subscribe(values); Observable .concat( setValue(reference, null), setValue(reference, "foo"), setValue(reference, "bar"), setValue(reference, null) ) .subscribe(Subscribers.<Void>empty()); List<String> observedValues = await(values.take(4).toList()); assertThat(observedValues, contains(null, "foo", "bar", null)); }
public Observable<Net.HttpResponse> write (ByteString data) { Buffer buffer = Buffer.buffer (data.toByteArray ()); ReplaySubject<Net.HttpResponse> subject = ReplaySubject.createWithSize(1); request.handler (response -> { if (!okStatuses.contains (response.statusCode ())) { throw new IllegalStateException (format("Unexpected statusCode %s and message %S", response.statusCode (), response.statusMessage ())); } response.bodyHandler (body -> { subject.onNext ( new Net.HttpResponse () { @Override public BytesEvent data () { return Codecs.BYTES.from (body.getBytes ()); } @Override public int status () { return response.statusCode (); } }); subject.onCompleted (); }); }) .exceptionHandler ( ex -> subject.onError (ex)) .putHeader ("Content-Length", String.valueOf (buffer.length ())) .end (buffer); return subject; }
@Override public Observable<boolean[]> loadTables() { if (tablesSubscription == null || tablesSubscription.isUnsubscribed()) { tablesSubject = ReplaySubject.create(); tablesSubscription = api.getTables() .subscribeOn(scheduler.backgroundThread()) .observeOn(scheduler.mainThread()) .subscribe(tablesSubject); } return tablesSubject.asObservable(); }
@Override public Observable<CustomerResponse[]> loadCustomers() { if (customersSubscription == null || customersSubscription.isUnsubscribed()) { customersSubject = ReplaySubject.create(); customersSubscription = api.getCustomers() .subscribeOn(scheduler.backgroundThread()) .observeOn(scheduler.mainThread()) .subscribe(customersSubject); } return customersSubject.asObservable(); }
@Test public void should_emits_all_items_and_call_onComplete_without_throwing_exception_when_ReplaySubject_without_error() { Subject<String, String> replaySubject = ReplaySubject.<String>create(); replaySubject.subscribe(mTestSubscriber); replaySubject.onNext("One"); replaySubject.onNext("Two"); replaySubject.onCompleted(); mTestSubscriber.assertValues("One", "Two"); mTestSubscriber.assertNoErrors(); mTestSubscriber.assertCompleted(); }
@Test public void should_still_emits_all_items_and_throw_exception_but_not_onComplete_when_ReplaySubject_with_error() { Subject<String, String> replaySubject = ReplaySubject.<String>create(); replaySubject.subscribe(mTestSubscriber); replaySubject.onNext("One"); replaySubject.onNext("Two"); replaySubject.onError(new RuntimeException("Error occurs")); replaySubject.onCompleted(); mTestSubscriber.assertValues("One", "Two"); mTestSubscriber.assertError(RuntimeException.class); mTestSubscriber.assertNotCompleted(); }
@Test public void should_also_emit_item_from_subscriber_when_subscriber_executes_onNext_from_the_same_thread() { Subject<String, String> replaySubject = ReplaySubject.<String>create(); replaySubject.subscribe(mTestSubscriber); replaySubject.onNext("One"); replaySubject.onNext("Two"); mTestSubscriber.onNext("Three"); replaySubject.onCompleted(); mTestSubscriber.assertValues("One", "Two", "Three"); mTestSubscriber.assertNoErrors(); mTestSubscriber.assertCompleted(); }
private RefreshableRemoteData<Feed> getFeed(String tag) { ReplaySubject<RemoteStatus> statusSubject = ReplaySubject.createWithSize(1); ReplaySubject<Feed> feedSubject = ReplaySubject.createWithSize(1); PublishSubject<Void> refresh = PublishSubject.create(); refresh.observeOn(scheduler).forEach($ -> { CacheMode mode = CacheMode.DISABLE_CACHE; fetch(tag, feedSubject, statusSubject, mode); }); fetch(tag, feedSubject, statusSubject, CacheMode.ENABLE_CACHE); return new RefreshableRemoteData<>(feedSubject, statusSubject, refresh); }
private void fetch(String tag, ReplaySubject<Feed> feedSubject, ReplaySubject<RemoteStatus> statusSubject, CacheMode mode) { statusSubject.onNext(RemoteStatus.LOADING); Action1<Feed> action = (x) -> { statusSubject.onNext(RemoteStatus.NOT_LOADING); feedSubject.onNext(x); }; if (tag == null) { rest.defaultFeed(mode).subscribe(action); } else { rest.search(mode, tag).subscribe(action); } }
public Observable<Boolean> init(String eventUid) { return eventInteractor.get(eventUid) .switchMap(new Func1<Event, Observable<? extends Boolean>>() { @Override public Observable<? extends Boolean> call(final Event event) { final OrganisationUnit organisationUnit = new OrganisationUnit(); final Program program = new Program(); organisationUnit.setUId(event.getOrgUnit()); program.setUId(event.getProgram()); return Observable.zip(loadRulesEngine(program), eventInteractor.list(organisationUnit, program), new Func2<RuleEngine, List<Event>, Boolean>() { @Override public Boolean call(RuleEngine engine, List<Event> events) { // assign rules engine ruleEngine = engine; currentEvent = event; // clear events map eventsMap.clear(); // put all existing events into map eventsMap.putAll(ModelUtils.toMap(eventInteractor.list( organisationUnit, program).toBlocking().first())); // ruleEffectSubject = BehaviorSubject.create(); ruleEffectSubject = ReplaySubject.createWithSize(1); ruleEffectSubject.subscribeOn(Schedulers.computation()); ruleEffectSubject.observeOn(AndroidSchedulers.mainThread()); return true; } }); } }); }
@SuppressWarnings("MissingPermission") @Override public void stopUpdates() { Log.d(TAG, "stopUpdates()"); ((LocationManager) context .getSystemService(Context.LOCATION_SERVICE)) .removeUpdates(locationListener); locationSubject.onCompleted(); locationSubject = ReplaySubject.createWithSize(BUFFER_SIZE); }
@Test public void testErrorThrownIssue1685() { Subject<Object, Object> subject = ReplaySubject.create(); Observable.error(new RuntimeException("oops")) .materialize() .delay(1, TimeUnit.SECONDS) .dematerialize() .subscribe(subject); subject.subscribe(); subject.materialize().toBlocking().first(); System.out.println("Done"); }
@Test public void testAlreadyUnsubscribedInterleavesWithClient() { ReplaySubject<Integer> source = ReplaySubject.create(); Subscriber<Integer> done = Subscribers.empty(); done.unsubscribe(); @SuppressWarnings("unchecked") Observer<Integer> o = mock(Observer.class); InOrder inOrder = inOrder(o); Observable<Integer> result = source.publish().refCount(); result.subscribe(o); source.onNext(1); result.subscribe(done); source.onNext(2); source.onComplete(); inOrder.verify(o).onNext(1); inOrder.verify(o).onNext(2); inOrder.verify(o).onComplete(); verify(o, never()).onError(any(Throwable.class)); }
/** * @param composite The subject containing multiple child subjects. * @param subscription The subscription to cancel the main composite subject. * @param unsubscribeAction Action performed when a child unsubscribes. */ public CompositeRequestManager( ReplaySubject<ENTITY> composite, Subscription subscription, Action0 unsubscribeAction ) { this.composite = composite; this.subscription = subscription; this.unsubscribeAction = unsubscribeAction; }
/** * Create or join with previous subscription for a collection of the entity. * * Ensures that the request logic is only run once and additional calls made * before completion will be added as a subscriber causing the observer * callbacks to be invoked, but not the on-subscribe logic. * * @param onSubscribe Logic to run for the request. * @param observer Callback to invoke on request events. * @param key A unique key to identify this request type separate from others. * @return A subscription for the observer that may be unsubscribed if necessary. */ final public Subscription createCollectionSubscription( OnSubscribe<List<ENTITY>> onSubscribe, Observer<List<ENTITY>> observer, String key ) { this.logger.trace("Creating collection subscription for Key: " + key); Observable<List<ENTITY>> callback = Observable.create(onSubscribe); callback = callback.subscribeOn(this.subscribeScheduler); callback = callback.observeOn(this.observeScheduler); CompositeRequestManager<List<ENTITY>> previousRequest = this.collectionRequests.get(key); Subscription subscription; if (null == previousRequest) { this.logger.debug("No previous request to join."); ReplaySubject<List<ENTITY>> composite = ReplaySubject.create(); Action0 unsubscribeCleanup = new UnsubscribeCleanup<List<ENTITY>>(this.logger, this.collectionRequests, key); Action0 completeCleanup = new CompleteCleanup<List<ENTITY>>(this.logger, this.collectionRequests, key); callback = callback.doOnCompleted(completeCleanup); Subscription mainSubscription = callback.subscribe(composite); CompositeRequestManager<List<ENTITY>> requestManager = new CompositeRequestManager<List<ENTITY>>( composite, mainSubscription, unsubscribeCleanup ); this.collectionRequests.put(key, requestManager); subscription = requestManager.subscribe(observer); } else { this.logger.debug("Joining with previous request."); subscription = previousRequest.subscribe(observer); } return subscription; }
/** * Create or join with previous subscription for an entity. * * Ensures that the request logic is only run once and additional calls made * before completion will be added as a subscriber causing the observer * callbacks to be invoked, but not the on-subscribe logic. * * @param onSubscribe Logic to run for the request. * @param observer Callback to invoke on request events. * @param key A unique key to identify this request type separate from others. * @return A subscription for the observer that may be unsubscribed if necessary. */ final public Subscription createSubscription( OnSubscribe<ENTITY> onSubscribe, Observer<ENTITY> observer, String key ) { this.logger.trace("Creating subscription for Key: " + key); Observable<ENTITY> callback = Observable.create(onSubscribe); callback = callback.subscribeOn(this.subscribeScheduler); callback = callback.observeOn(this.observeScheduler); CompositeRequestManager<ENTITY> previousRequest = this.requests.get(key); Subscription subscription; if (null == previousRequest) { this.logger.debug("No previous request to join."); ReplaySubject<ENTITY> composite = ReplaySubject.create(); Action0 unsubscribeCleanup = new UnsubscribeCleanup<ENTITY>(this.logger, this.requests, key); Action0 completeCleanup = new CompleteCleanup<ENTITY>(this.logger, this.requests, key); callback = callback.doOnCompleted(completeCleanup); Subscription mainSubscription = callback.subscribe(composite); CompositeRequestManager<ENTITY> manager = new CompositeRequestManager<ENTITY>( composite, mainSubscription, unsubscribeCleanup ); this.requests.put(key, manager); subscription = manager.subscribe(observer); } else { this.logger.debug("Joining with previous request."); subscription = previousRequest.subscribe(observer); } return subscription; }