Java 类rx.subjects.ReplaySubject 实例源码

项目:incubator-servicecomb-java-chassis    文件:BizkeeperHandlerDelegate.java   
protected HystrixObservable<Response> forceFallbackCommand(Invocation invocation) {
  return new HystrixObservable<Response>() {
    @Override
    public Observable<Response> observe() {
      ReplaySubject<Response> subject = ReplaySubject.create();
      final Subscription sourceSubscription = toObservable().subscribe(subject);
      return subject.doOnUnsubscribe(sourceSubscription::unsubscribe);
    }

    @Override
    public Observable<Response> toObservable() {
      return Observable.create(f -> {
        try {
          f.onNext(FallbackPolicyManager.getFallbackResponse(handler.groupname, invocation));
        } catch (Exception e) {
          f.onError(e);
        }
      });
    }
  };
}
项目:Asynchronous-Android-Programming    文件:SubjectActivity.java   
void replaySubject(){
    ReplaySubject<Integer> replaySub = ReplaySubject.create();
    replaySub.onNext(1);
    replaySub.onNext(2);
    Subscription subscription = replaySub.doOnSubscribe(new Action0() {
        @Override
        public void call() {
            Log.i(TAG, "Observer subscribed to ReplaySubject");
        }
    }).doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
            Log.i(TAG, "Observer unsubscribed to ReplaySubject ");
        }
    }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            Log.i(TAG, "New Event received from ReplaySubject: " + integer);
        }
    });
    replaySub.onNext(3);
    replaySub.onNext(4);
    subscription.unsubscribe();
    replaySub.onNext(5);
    replaySub.onCompleted();
}
项目:simplDb    文件:SimplDbRxTest.java   
@Test
public void observe() throws Exception {
    ReplaySubject<Cursor> collector = ReplaySubject.create();
    Subscription subscription = mSimplDb.get().observe(TableTest.class).subscribe(collector);
    mInsert.contentValues.put(DATA, "Data");
    mSimplDb.get().insert(mInsert).toBlocking().single();
    mInsert.contentValues.put(DATA, "data");
    mSimplDb.get().insert(mInsert).toBlocking().single();
    collector.onCompleted();
    subscription.unsubscribe();

    Cursor cursor = collector.toList().toBlocking().single().get(collector.size() - 1);
    assertEquals(2, cursor.getCount());
    cursor.moveToFirst();
    assertData("Data", cursor);
    cursor.moveToLast();
    assertData("data", cursor);
}
项目:RxRoboBase    文件:AuthenticationTest.java   
@Test
public void testObserveAuth() throws Exception {
    Observable<Boolean> isAuthenticated =
            observeAuth(firebaseAuth)
            .map(new Func1<FirebaseUser, Boolean>() {
                @Override
                public Boolean call(FirebaseUser user) {
                    return user != null;
                }
            });

    ReplaySubject<Boolean> userState = ReplaySubject.create();
    isAuthenticated.subscribe(userState);

    await(authAnonymously(firebaseAuth));
    firebaseAuth.signOut();

    List<Boolean> observedState = await(userState.take(3).toList());
    assertThat(observedState, contains(false, true, false));
}
项目:marvel    文件:SearchInteractorImpl.java   
@Override
public Observable<CharactersResponse> loadCharacter(String query,
                                                    String privateKey,
                                                    String publicKey,
                                                    long timestamp) {
    if (characterSubscription == null || characterSubscription.isUnsubscribed()) {
        characterSubject = ReplaySubject.create();

        // generate hash using timestamp and API keys
        String hash = HashGenerator.generate(timestamp, privateKey, publicKey);

        characterSubscription = api.getCharacters(query, publicKey, hash, timestamp)
                .subscribeOn(scheduler.backgroundThread())
                .subscribe(characterSubject);
    }

    return characterSubject.asObservable();
}
项目:fyber_mobile_offers    文件:AppGoogleAds.java   
@Override
public Observable<String> getAdIdObservable() {
    // Observable that emits Google Advertising adIdSubscription
    if (adIdSubscription == null || adIdSubscription.isUnsubscribed()) {
        adIdSubject = ReplaySubject.create();

        adIdSubscription = Observable
                .concat(getAdIdFromMemoryObservable(),
                        getAdIdFromGoogleObservable())
                .first(entity -> entity != null)
                // lets retry if something goes wrong!
                .retry(2)
                .subscribe(adIdSubject);
    }

    return adIdSubject.asObservable();
}
项目:fyber_mobile_offers    文件:AppGoogleAds.java   
@Override
public Observable<Boolean> getAdIdEnabledObservable() {
    // Observable that emits Google Tracking Enabled
    if (adIdEnabledSubscription == null || adIdEnabledSubscription.isUnsubscribed()) {
        adIdEnabledSubject = ReplaySubject.create();

        adIdEnabledSubscription = Observable
                .concat(getAdIdEnabledFromMemoryObservable(),
                        getAdIdEnabledFromGoogleObservable())
                .first(entity -> entity != null)
                // lets retry if something goes wrong!
                .retry(2)
                .subscribe(adIdEnabledSubject);
    }

    return adIdEnabledSubject.asObservable();
}
项目:android-buruberi    文件:RxTests.java   
@Test
public void operatorUnbufferedObserveOn() throws Exception {
    ReplaySubject<Integer> numbers = ReplaySubject.createWithSize(1);

    final AtomicInteger counter = new AtomicInteger();
    numbers.lift(new Rx.OperatorUnbufferedObserveOn<Integer>(Schedulers.immediate()))
            .subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer number) {
                    counter.addAndGet(number);
                }
            });

    numbers.onNext(3);
    numbers.onNext(3);
    numbers.onNext(3);

    assertEquals(9, counter.get());
}
项目:Jockey    文件:ObservableQueue.java   
/**
 * Creates an Observable stream from this queue. There should only ever be one subscriber to
 * this method. Calling this method twice will complete any previously opened observable
 * (leaving unprocessed elements in the queue).
 * @return An observable containing the contents of the queue in order
 */
public Observable<T> toObservable() {
    synchronized (mLock) {
        if (mSubject != null) {
            mSubject.onCompleted();
        }

        if (mQueue.isEmpty()) {
            mSubject = ReplaySubject.create();
        } else {
            mSubject = ReplaySubject.create(mQueue.size());

            for (T data : mQueue) {
                mSubject.onNext(data);
            }
        }
    }

    return mSubject.map(item -> {
        mQueue.remove();
        return item;
    }).asObservable();
}
项目:RxSamplesPractice    文件:Samples.java   
public static void replaySubject() {
    ReplaySubject<Integer> subject = ReplaySubject.create();
    subject.onNext(5);

    Action1<Integer> action1 = integer -> Log.i("From action1", String.valueOf(integer));
    Subscription subscription1 = subject.subscribe(action1);
    subject.onNext(10);

    Action1<Integer> action2 = integer -> Log.i("From action2", String.valueOf(integer));
    Subscription subscription2 = subject.subscribe(action2);
    subject.onNext(20);

    subscription1.unsubscribe();
    subject.onNext(40);

    subscription2.unsubscribe();
    subject.onNext(80);
}
项目:arctor    文件:WaitViewReplayTransformer.java   
@Override
public Observable<T> call(final Observable<T> observable) {
    final ReplaySubject<Notification<T>>
            subject = ReplaySubject.create();
    final Subscription subscription = observable.materialize().subscribe(subject);
    return view
            .switchMap(new Func1<Boolean, Observable<Notification<T>>>() {
                @Override
                public Observable<Notification<T>> call(final Boolean flag) {
                    if (flag) {
                        return subject;
                    } else {
                        return Observable.empty();
                    }
                }
            })
            .doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    subscription.unsubscribe();
                }
            })
            .dematerialize();
}
项目:htm.java    文件:Publisher.java   
/**
 * Builds and validates the structure of the expected header then
 * returns an {@link Observable} that can be used to submit info to the
 * {@link Network}
 * @return  a new Publisher
 */
public Publisher build() {
    subject = ReplaySubject.createWithSize(3);
    for(int i = 0;i < HEADER_SIZE;i++) {
        if(lines[i] == null) {
            throw new IllegalStateException("Header not properly formed (must contain 3 lines) see Header.java");
        }
        subject.onNext(lines[i]);
    }

    Publisher p = new Publisher();
    p.subject = subject;

    if(notifier != null) {
        notifier.accept(p);
    }

    return p;
}
项目:dokchess    文件:MinimaxParalleleSuche.java   
@Override
public final void zugSuchen(Stellung stellung, Observer<Zug> subject) {
    Collection<Zug> zuege = spielregeln.liefereGueltigeZuege(stellung);
    if (zuege.size() > 0) {
        ReplaySubject<BewerteterZug> suchErgebnisse = ReplaySubject.create();
        aktuelleSuchErgebnisse = suchErgebnisse;

        ErgebnisMelden melder = new ErgebnisMelden(subject, zuege.size());
        suchErgebnisse.subscribe(melder);

        for (Zug zug : zuege) {
            EinzelnenZugUntersuchen zugUntersuchen = new EinzelnenZugUntersuchen(stellung, zug, suchErgebnisse);
            suchErgebnisse.subscribe(zugUntersuchen);
            executorService.execute(zugUntersuchen);
        }
    } else {
        subject.onCompleted();
    }
}
项目:rain-or-shine    文件:CoordinatedWeatherManager.java   
private Observable<List<String>> getCityNames() {
    if (cityNames == null) {
        cityNames = ReplaySubject.create(1);

        geoNamesClient.getCities()
                .flatMap(cities -> Observable.from(cities.geonames))
                .map(city -> {
                    String format = String.format("%s,%s", city.name, city.countrycode);
                    return format;
                })
                .toList()
                .subscribe(cityNames);
    }
    Log.d(TAG, "Returning city names");
    return cityNames.asObservable();
}
项目:RxNetty    文件:RemoteObservableTest.java   
@Test
public void testServeListObservables(){
    // setup
    Observable<Integer> os1 = Observable.range(0, 101);
    Observable<Integer> os2 = Observable.range(100, 101);
    List<Observable<Integer>> toServe = new LinkedList<Observable<Integer>>();
    toServe.add(os1);
    toServe.add(os2);
    ReplaySubject <List<Observable<Integer>>> subject = ReplaySubject.create();
    subject.onNext(toServe);
    // serve
    PortSelectorWithinRange portSelector = new PortSelectorWithinRange(8000, 9000);
    int serverPort = portSelector.acquirePort();
    RemoteObservable.serveMany(serverPort, subject, Codecs.integer());
    // connect
    Observable<Integer> oc = RemoteObservable.connect("localhost", serverPort, Codecs.integer());
    // assert
    Observable.sumInteger(oc).toBlockingObservable().forEach(new Action1<Integer>(){
        @Override
        public void call(Integer t1) {
            Assert.assertEquals(20200, t1.intValue()); // sum of number 0-200
        }
    });
}
项目:LiteReader    文件:RxFile.java   
public ReplaySubject<String> saveBitmap(final Bitmap bitmap, final File folder, final String name) {
    bitmapSubject = ReplaySubject.create();
    try {
        String path = FileUtil.savePhoto(bitmap, folder, name);
        onSuccess(path);
    } catch (IOException e) {
        e.printStackTrace();
        onFail(e);
    }
    return bitmapSubject;
}
项目:RxBusLib    文件:SubscriberReplayEvent.java   
@Override
    protected final void initObservable() {
        subject = ReplaySubject.create();
        subject.onBackpressureBuffer()
                .observeOn(EventThread.getScheduler(observeThread))
                .subscribeOn(EventThread.getScheduler(subscribeThread))
//                .subscribe(new Observer<Object>() {
//                    @Override
//                    public void onCompleted() {
//
//                    }
//
//                    @Override
//                    public void onError(Throwable e) {
//
//                    }
//
//                    @Override
//                    public void onNext(Object event) {
//                        try {
//                            if (valid) {
//                                handleEvent(event);
//                            }
//                        } catch (InvocationTargetException e) {
//                            throwRuntimeException("Could not dispatch event: " + event.getClass() + " to subscriber " + SubscriberReplayEvent.this, e);
//                        }
//                    }
//                });
                .subscribe(event -> {
                    try {
                        if (valid) {
                            handleEvent(event);
                        }
                    } catch (InvocationTargetException e) {
                        throwRuntimeException("Could not dispatch event: " + event.getClass() + " to subscriber " + SubscriberReplayEvent.this, e);
                    }
                });
    }
项目:bcg    文件:UserInteractorImpl.java   
@Override
public Observable<Response<LoginResponse>> login(String email, String password) {
    if (loginSubscription == null || loginSubscription.isUnsubscribed()) {
        loginSubject = ReplaySubject.create();

        loginSubscription = api.login(email, password)
                .subscribeOn(scheduler.backgroundThread())
                .subscribe(loginSubject);
    }

    return loginSubject.asObservable();
}
项目:bcg    文件:UserInteractorImpl.java   
@Override
public Observable<Response<ProfileResponse>> getProfile(String token, int userId) {
    if (profileSubscription == null || profileSubscription.isUnsubscribed()) {
        profileSubject = ReplaySubject.create();

        profileSubscription = api.getProfile(token, userId)
                .subscribeOn(scheduler.backgroundThread())
                .subscribe(profileSubject);
    }

    return profileSubject.asObservable();
}
项目:bcg    文件:UserInteractorImpl.java   
@Override
public Observable<Response<AvatarResponse>> setAvatar(String token, int userId, String avatarBase64) {
    if (avatarSubscription == null || avatarSubscription.isUnsubscribed()) {
        avatarSubject = ReplaySubject.create();

        avatarSubscription = api.setAvatar(token, userId, avatarBase64)
                .subscribeOn(scheduler.backgroundThread())
                .subscribe(avatarSubject);
    }

    return avatarSubject.asObservable();
}
项目:Mondo    文件:FakeObservableCache.java   
@NonNull
private ReplaySubject<Result<T>> getOrCreateSubjectFor(Class<T> itemClass) {
    if (subjects.get(itemClass) != null) {
        return subjects.get(itemClass);
    }
    ReplaySubject<Result<T>> subject = ReplaySubject.create();
    subjects.put(itemClass, subject);
    return subject;
}
项目:fetlife-oss    文件:FetLifeAccountManager.java   
public Observable<Account> createAccount(final Activity callingActivity, final User me, final String user, final String password)
{
    ReplaySubject<Account> subject = ReplaySubject.create();

    // Create the account if necessary
    Account account = new Account(user, accountType);
    final Bundle extraData = new Bundle();
    extraData.putString(USER_DATA, gson.toJson(me));
    boolean accountCreated = accountManager.addAccountExplicitly(account, password, extraData);
    if(accountCreated) {
        FetLife.getBus().post(new AccountCreatedEvent(account));
    }else{
        // If we didn't create the account, at least make sure to update it
        accountManager.setPassword(account, password);
    }

    subject.onNext(account);
    setCurrentAccount(account);

    if(callingActivity != null)
    {
        // Check whether we were launched explicitly to create the account
        Bundle extras = callingActivity.getIntent().getExtras();
        if(extras != null && accountCreated)
        {
            AccountAuthenticatorResponse response = extras.getParcelable(AccountManager.KEY_ACCOUNT_AUTHENTICATOR_RESPONSE);
            if(response != null)
            {
                // Let them know we succeeded if so...
                Bundle result = new Bundle();
                result.putString(AccountManager.KEY_ACCOUNT_NAME, user);
                result.putString(AccountManager.KEY_ACCOUNT_TYPE, accountType);
                response.onResult(result);
            }
        }
    }

    return subject.asObservable();
}
项目:weather_app    文件:WeatherInteractorImpl.java   
@Override
public Observable<WeatherMix> loadWeather(String city) {
    if (weatherSubscription == null || weatherSubscription.isUnsubscribed()) {
        weatherSubject = ReplaySubject.create();

        weatherSubscription = Observable.concat(memoryWeather(), diskWeather(), networkWeather(city))
                .first(entity -> entity != null && isSameCity(city, entity) && isUpToDate(entity))
                .subscribe(weatherSubject);
    }

    return weatherSubject.asObservable();

}
项目:weather_app    文件:WeatherInteractorImpl.java   
@Override
public Observable<WeatherHistory> loadWeatherHistory(String city, long start, long end) {
    if (weatherHistorySubscription == null || weatherHistorySubscription.isUnsubscribed()) {
        weatherHistorySubject = ReplaySubject.create();

        weatherHistorySubscription = networkWeatherHistory(city, start, end)
                .subscribe(weatherHistorySubject);
    }

    return weatherHistorySubject.asObservable();

}
项目:RxRoboBase    文件:WriteTests.java   
@Test
public void testObserve() throws Exception {
    ReplaySubject<String> values = ReplaySubject.create();

    observe(reference)
            .map(new Func1<DataSnapshot, String>() {
                @Override
                public String call(DataSnapshot dataSnapshot) {
                    return dataSnapshot.getValue(String.class);
                }
            })
            .distinctUntilChanged()
            .subscribe(values);

    Observable
            .concat(
                    setValue(reference, null),
                    setValue(reference, "foo"),
                    setValue(reference, "bar"),
                    setValue(reference, null)
            )
            .subscribe(Subscribers.<Void>empty());

    List<String> observedValues = await(values.take(4).toList());

    assertThat(observedValues, contains(null, "foo", "bar", null));
}
项目:lumber-mill    文件:VertxHttpRequestWrapper.java   
public Observable<Net.HttpResponse> write (ByteString data) {

    Buffer buffer = Buffer.buffer (data.toByteArray ());
      ReplaySubject<Net.HttpResponse> subject = ReplaySubject.createWithSize(1);

      request.handler (response ->  {

          if (!okStatuses.contains (response.statusCode ())) {
            throw new IllegalStateException (format("Unexpected statusCode %s and message %S",
              response.statusCode (), response.statusMessage ()));
          }

          response.bodyHandler (body -> {

            subject.onNext (
              new Net.HttpResponse () {

                @Override
                public BytesEvent data () {
                  return Codecs.BYTES.from (body.getBytes ());
                }

                @Override
                public int status () {
                  return response.statusCode ();
                }
            });
            subject.onCompleted ();
          });
        })
        .exceptionHandler ( ex -> subject.onError (ex))
        .putHeader ("Content-Length", String.valueOf (buffer.length ()))
        .end (buffer);

      return subject;
    }
项目:quandoo    文件:TablesInteractorImpl.java   
@Override
public Observable<boolean[]> loadTables() {
    if (tablesSubscription == null || tablesSubscription.isUnsubscribed()) {
        tablesSubject = ReplaySubject.create();

        tablesSubscription = api.getTables()
                .subscribeOn(scheduler.backgroundThread())
                .observeOn(scheduler.mainThread())
                .subscribe(tablesSubject);
    }

    return tablesSubject.asObservable();
}
项目:quandoo    文件:CustomersInteractorImpl.java   
@Override
public Observable<CustomerResponse[]> loadCustomers() {
    if (customersSubscription == null || customersSubscription.isUnsubscribed()) {
        customersSubject = ReplaySubject.create();

        customersSubscription = api.getCustomers()
                .subscribeOn(scheduler.backgroundThread())
                .observeOn(scheduler.mainThread())
                .subscribe(customersSubject);
    }

    return customersSubject.asObservable();
}
项目:PocketBeer    文件:RxReplaySubjectTester.java   
@Test
public void should_emits_all_items_and_call_onComplete_without_throwing_exception_when_ReplaySubject_without_error() {
  Subject<String, String> replaySubject = ReplaySubject.<String>create();

  replaySubject.subscribe(mTestSubscriber);
  replaySubject.onNext("One");
  replaySubject.onNext("Two");
  replaySubject.onCompleted();

  mTestSubscriber.assertValues("One", "Two");
  mTestSubscriber.assertNoErrors();
  mTestSubscriber.assertCompleted();
}
项目:PocketBeer    文件:RxReplaySubjectTester.java   
@Test
public void should_still_emits_all_items_and_throw_exception_but_not_onComplete_when_ReplaySubject_with_error() {
  Subject<String, String> replaySubject = ReplaySubject.<String>create();

  replaySubject.subscribe(mTestSubscriber);
  replaySubject.onNext("One");
  replaySubject.onNext("Two");
  replaySubject.onError(new RuntimeException("Error occurs"));
  replaySubject.onCompleted();

  mTestSubscriber.assertValues("One", "Two");
  mTestSubscriber.assertError(RuntimeException.class);
  mTestSubscriber.assertNotCompleted();
}
项目:PocketBeer    文件:RxReplaySubjectTester.java   
@Test
public void should_also_emit_item_from_subscriber_when_subscriber_executes_onNext_from_the_same_thread() {
  Subject<String, String> replaySubject = ReplaySubject.<String>create();

  replaySubject.subscribe(mTestSubscriber);
  replaySubject.onNext("One");
  replaySubject.onNext("Two");
  mTestSubscriber.onNext("Three");
  replaySubject.onCompleted();

  mTestSubscriber.assertValues("One", "Two", "Three");
  mTestSubscriber.assertNoErrors();
  mTestSubscriber.assertCompleted();
}
项目:progscrape-android    文件:Model.java   
private RefreshableRemoteData<Feed> getFeed(String tag) {
    ReplaySubject<RemoteStatus> statusSubject = ReplaySubject.createWithSize(1);
    ReplaySubject<Feed> feedSubject = ReplaySubject.createWithSize(1);
    PublishSubject<Void> refresh = PublishSubject.create();
    refresh.observeOn(scheduler).forEach($ -> {
        CacheMode mode = CacheMode.DISABLE_CACHE;
        fetch(tag, feedSubject, statusSubject, mode);
    });
    fetch(tag, feedSubject, statusSubject, CacheMode.ENABLE_CACHE);
    return new RefreshableRemoteData<>(feedSubject, statusSubject, refresh);
}
项目:progscrape-android    文件:Model.java   
private void fetch(String tag, ReplaySubject<Feed> feedSubject, ReplaySubject<RemoteStatus> statusSubject, CacheMode mode) {
    statusSubject.onNext(RemoteStatus.LOADING);
    Action1<Feed> action = (x) -> {
        statusSubject.onNext(RemoteStatus.NOT_LOADING);
        feedSubject.onNext(x);
    };
    if (tag == null) {
        rest.defaultFeed(mode).subscribe(action);
    } else {
        rest.search(mode, tag).subscribe(action);
    }
}
项目:dhis2-android-eventcapture    文件:RxRulesEngine.java   
public Observable<Boolean> init(String eventUid) {
    return eventInteractor.get(eventUid)
            .switchMap(new Func1<Event, Observable<? extends Boolean>>() {
                @Override
                public Observable<? extends Boolean> call(final Event event) {
                    final OrganisationUnit organisationUnit = new OrganisationUnit();
                    final Program program = new Program();

                    organisationUnit.setUId(event.getOrgUnit());
                    program.setUId(event.getProgram());

                    return Observable.zip(loadRulesEngine(program),
                            eventInteractor.list(organisationUnit, program),
                            new Func2<RuleEngine, List<Event>, Boolean>() {
                                @Override
                                public Boolean call(RuleEngine engine, List<Event> events) {
                                    // assign rules engine
                                    ruleEngine = engine;
                                    currentEvent = event;

                                    // clear events map
                                    eventsMap.clear();

                                    // put all existing events into map
                                    eventsMap.putAll(ModelUtils.toMap(eventInteractor.list(
                                            organisationUnit, program).toBlocking().first()));

                                    // ruleEffectSubject = BehaviorSubject.create();
                                    ruleEffectSubject = ReplaySubject.createWithSize(1);
                                    ruleEffectSubject.subscribeOn(Schedulers.computation());
                                    ruleEffectSubject.observeOn(AndroidSchedulers.mainThread());

                                    return true;
                                }
                            });
                }
            });
}
项目:dhis2-android-eventcapture    文件:LocationProviderImpl.java   
@SuppressWarnings("MissingPermission")
@Override
public void stopUpdates() {
    Log.d(TAG, "stopUpdates()");
    ((LocationManager) context
            .getSystemService(Context.LOCATION_SERVICE))
            .removeUpdates(locationListener);
    locationSubject.onCompleted();
    locationSubject = ReplaySubject.createWithSize(BUFFER_SIZE);
}
项目:RxJavaFlow    文件:ObservableTests.java   
@Test
public void testErrorThrownIssue1685() {
    Subject<Object, Object> subject = ReplaySubject.create();

    Observable.error(new RuntimeException("oops"))
        .materialize()
        .delay(1, TimeUnit.SECONDS)
        .dematerialize()
        .subscribe(subject);

    subject.subscribe();
    subject.materialize().toBlocking().first();

    System.out.println("Done");
}
项目:RxJavaFlow    文件:OnSubscribeRefCountTest.java   
@Test
public void testAlreadyUnsubscribedInterleavesWithClient() {
    ReplaySubject<Integer> source = ReplaySubject.create();

    Subscriber<Integer> done = Subscribers.empty();
    done.unsubscribe();

    @SuppressWarnings("unchecked")
    Observer<Integer> o = mock(Observer.class);
    InOrder inOrder = inOrder(o);

    Observable<Integer> result = source.publish().refCount();

    result.subscribe(o);

    source.onNext(1);

    result.subscribe(done);

    source.onNext(2);
    source.onComplete();

    inOrder.verify(o).onNext(1);
    inOrder.verify(o).onNext(2);
    inOrder.verify(o).onComplete();
    verify(o, never()).onError(any(Throwable.class));
}
项目:ground-control    文件:CompositeRequestManager.java   
/**
 * @param composite The subject containing multiple child subjects.
 * @param subscription The subscription to cancel the main composite subject.
 * @param unsubscribeAction Action performed when a child unsubscribes.
 */
public CompositeRequestManager(
    ReplaySubject<ENTITY> composite,
    Subscription subscription,
    Action0 unsubscribeAction
) {
    this.composite = composite;
    this.subscription = subscription;
    this.unsubscribeAction = unsubscribeAction;
}
项目:ground-control    文件:SubscriptionFactory.java   
/**
 * Create or join with previous subscription for a collection of the entity.
 *
 * Ensures that the request logic is only run once and additional calls made
 * before completion will be added as a subscriber causing the observer
 * callbacks to be invoked, but not the on-subscribe logic.
 *
 * @param onSubscribe Logic to run for the request.
 * @param observer Callback to invoke on request events.
 * @param key A unique key to identify this request type separate from others.
 * @return A subscription for the observer that may be unsubscribed if necessary.
 */
final public Subscription createCollectionSubscription(
    OnSubscribe<List<ENTITY>> onSubscribe,
    Observer<List<ENTITY>> observer,
    String key
) {
    this.logger.trace("Creating collection subscription for Key: " + key);
    Observable<List<ENTITY>> callback = Observable.create(onSubscribe);
    callback = callback.subscribeOn(this.subscribeScheduler);
    callback = callback.observeOn(this.observeScheduler);


    CompositeRequestManager<List<ENTITY>> previousRequest = this.collectionRequests.get(key);
    Subscription subscription;
    if (null == previousRequest) {
        this.logger.debug("No previous request to join.");
        ReplaySubject<List<ENTITY>> composite = ReplaySubject.create();
        Action0 unsubscribeCleanup = new UnsubscribeCleanup<List<ENTITY>>(this.logger, this.collectionRequests, key);
        Action0 completeCleanup = new CompleteCleanup<List<ENTITY>>(this.logger, this.collectionRequests, key);

        callback = callback.doOnCompleted(completeCleanup);

        Subscription mainSubscription = callback.subscribe(composite);
        CompositeRequestManager<List<ENTITY>> requestManager = new CompositeRequestManager<List<ENTITY>>(
            composite,
            mainSubscription,
            unsubscribeCleanup
        );
        this.collectionRequests.put(key, requestManager);
        subscription = requestManager.subscribe(observer);
    } else {
        this.logger.debug("Joining with previous request.");
        subscription = previousRequest.subscribe(observer);
    }

    return subscription;
}
项目:ground-control    文件:SubscriptionFactory.java   
/**
 * Create or join with previous subscription for an entity.
 *
 * Ensures that the request logic is only run once and additional calls made
 * before completion will be added as a subscriber causing the observer
 * callbacks to be invoked, but not the on-subscribe logic.
 *
 * @param onSubscribe Logic to run for the request.
 * @param observer Callback to invoke on request events.
 * @param key A unique key to identify this request type separate from others.
 * @return A subscription for the observer that may be unsubscribed if necessary.
 */
final public Subscription createSubscription(
    OnSubscribe<ENTITY> onSubscribe,
    Observer<ENTITY> observer,
    String key
) {
    this.logger.trace("Creating subscription for Key: " + key);
    Observable<ENTITY> callback = Observable.create(onSubscribe);
    callback = callback.subscribeOn(this.subscribeScheduler);
    callback = callback.observeOn(this.observeScheduler);

    CompositeRequestManager<ENTITY> previousRequest = this.requests.get(key);
    Subscription subscription;
    if (null == previousRequest) {
        this.logger.debug("No previous request to join.");
        ReplaySubject<ENTITY> composite = ReplaySubject.create();
        Action0 unsubscribeCleanup = new UnsubscribeCleanup<ENTITY>(this.logger, this.requests, key);
        Action0 completeCleanup = new CompleteCleanup<ENTITY>(this.logger, this.requests, key);

        callback = callback.doOnCompleted(completeCleanup);

        Subscription mainSubscription = callback.subscribe(composite);
        CompositeRequestManager<ENTITY> manager = new CompositeRequestManager<ENTITY>(
            composite,
            mainSubscription,
            unsubscribeCleanup
        );
        this.requests.put(key, manager);
        subscription = manager.subscribe(observer);
    } else {
        this.logger.debug("Joining with previous request.");
        subscription = previousRequest.subscribe(observer);
    }

    return subscription;
}