@Override @SuppressWarnings("unchecked") public void call(final Subscriber<? super T> subscriber) { final PropertyChangeListener changeListener = new PropertyChangeListener() { @Override public void propertyChange(PropertyChangeEvent event) { if (!subscriber.isUnsubscribed() && property.equals(event.getPropertyName())) { subscriber.onNext((T) event.getNewValue()); } } }; propertyChangeSupport.addPropertyChangeListener(changeListener); subscriber.add(BooleanSubscription.create(new Action0() { @Override public void call() { propertyChangeSupport.removePropertyChangeListener(changeListener); } })); }
private Observable<ManagedState> getState(final Id id) { return Observable.create(new Observable.OnSubscribe<ManagedState>() { @Override public void call(final Subscriber<? super ManagedState> subscriber) { final ManagedState state = state(id, true); subscriber.add(BooleanSubscription.create(new Action0() { @Override public void call() { state.removeSubscriber(subscriber); } })); int publishCount = state.publishCount; state.addSubscriber(subscriber); // check to avoid double-publishing if (publishCount == state.publishCount) { subscriber.onNext(state); } } }); }
@Override public void call(Subscriber<? super T> subscriber) { final ValueEventListener eventListener = query.addValueEventListener(new RxValueListener<>(subscriber, marshaller)); subscriber.add(BooleanSubscription.create(new Action0() { @Override public void call() { query.removeEventListener(eventListener); } })); }
private AddUnsubscribe clearOnUnsubscribe(final Object observable) { return new AddUnsubscribe( BooleanSubscription.create( new Action0() { @Override public void call() { mapSubject.get(observable).first.clear(); } } ) ); }
public Subscription outSubscription() { return BooleanSubscription.create(new Action0() { @Override public void call() { unsubscribe(); subscribers = subscribers.removing(Bridge.this); } }); }
private InnerImmediateScheduler() { this.innerSubscription = new BooleanSubscription(); }
private InnerCurrentThreadScheduler() { this.counter = new AtomicInteger(); this.queue = new PriorityBlockingQueue(); this.innerSubscription = new BooleanSubscription(); this.wip = new AtomicInteger(); }
private InnerTestScheduler() { this.s = new BooleanSubscription(); }
/** * The error from the user provided Observer is not handled by the subscribe method try/catch. * * It is handled by the AtomicObserver that wraps the provided Observer. * * Result: Passes (if AtomicObserver functionality exists) */ @Test public void testCustomObservableWithErrorInObserverAsynchronous() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); final AtomicInteger count = new AtomicInteger(); final AtomicReference<Throwable> error = new AtomicReference<Throwable>(); Observable.create(new OnSubscribe<String>() { @Override public void call(final Subscriber<? super String> observer) { final BooleanSubscription s = new BooleanSubscription(); new Thread(new Runnable() { @Override public void run() { try { if (!s.isUnsubscribed()) { observer.onNext("1"); observer.onNext("2"); observer.onNext("three"); observer.onNext("4"); observer.onComplete(); } } finally { latch.countDown(); } } }).start(); } }).subscribe(new Subscriber<String>() { @Override public void onComplete() { System.out.println("completed"); } @Override public void onError(Throwable e) { error.set(e); System.out.println("error"); e.printStackTrace(); } @Override public void onNext(String v) { int num = Integer.parseInt(v); System.out.println(num); // doSomething(num); count.incrementAndGet(); } }); // wait for async sequence to complete latch.await(); assertEquals(2, count.get()); assertNotNull(error.get()); if (!(error.get() instanceof NumberFormatException)) { fail("It should be a NumberFormatException"); } }
/** * The error from the user provided Observer is not handled by the subscribe method try/catch. * * It is handled by the AtomicObserver that wraps the provided Observer. * * Result: Passes (if AtomicObserver functionality exists) */ @Test public void testCustomObservableWithErrorInObserverAsynchronous() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); final AtomicInteger count = new AtomicInteger(); final AtomicReference<Throwable> error = new AtomicReference<Throwable>(); Observable.create(new OnSubscribeFunc<String>() { @Override public Subscription onSubscribe(final Observer<? super String> observer) { final BooleanSubscription s = new BooleanSubscription(); new Thread(new Runnable() { @Override public void run() { try { if (!s.isUnsubscribed()) { observer.onNext("1"); observer.onNext("2"); observer.onNext("three"); observer.onNext("4"); observer.onCompleted(); } } finally { latch.countDown(); } } }).start(); return s; } }).subscribe(new Subscriber<String>() { @Override public void onCompleted() { System.out.println("completed"); } @Override public void onError(Throwable e) { error.set(e); System.out.println("error"); e.printStackTrace(); } @Override public void onNext(String v) { int num = Integer.parseInt(v); System.out.println(num); // doSomething(num); count.incrementAndGet(); } }); // wait for async sequence to complete latch.await(); assertEquals(2, count.get()); assertNotNull(error.get()); if (!(error.get() instanceof NumberFormatException)) { fail("It should be a NumberFormatException"); } }