/** * 用SerializedSubject包装PublishSubject,序列化 */ private RxBus() { //private final PublishSubject<Object> _bus = PublishSubject.create(); // If multiple threads are going to emit events to this // then it must be made thread-safe like this instead _bus = new SerializedSubject<>(PublishSubject.create()); }
/** * Creates a new request with the given method. * * @param method the request {@link Method} to use * @param url URL to fetch the Object * @param clazz the Object class to return */ private ObjectRequest(int method, String url, Class clazz) { super(method, url, null); mClazz = clazz; mHasCache = BaseApplication.getVolleyCache().get(getCacheKey()) != null; setShouldCache(false); addEntryListener(); setRetryPolicy(new DefaultRetryPolicy(DEFAULT_TIMEOUT_MS, DEFAULT_MAX_RETRIES, DEFAULT_BACKOFF_MULT)); mSubject = new SerializedSubject<>(PublishSubject.create()); }
/** * Sends an event to the bus * <p> * @param event the event that should be broadcasted to the bus * @param key the key this event should be broadcasted to * @param sendToDefaultBusAsWell if true, all observers of the event class will receive this event as well */ public synchronized <T> void sendEvent(T event, Integer key, boolean sendToDefaultBusAsWell) { RXBusEventIsNullException.checkEvent(event); RXBusKeyIsNullException.checkKey(key); // 1) send to key bound bus SerializedSubject subject = getSubject(new RXQueueKey(event.getClass(), key), false); // only send event, if subject exists => this means someone has at least once subscribed to it if (subject != null) subject.onNext(event); // 2) send to unbound bus if (sendToDefaultBusAsWell) sendEvent(event); }
/** * Sends an event to the bus * <p> * @param event the event that should be broadcasted to the bus * @param key the key this event should be broadcasted to * @param sendToDefaultBusAsWell if true, all observers of the event class will receive this event as well */ public synchronized <T> void sendEvent(T event, String key, boolean sendToDefaultBusAsWell) { RXBusEventIsNullException.checkEvent(event); RXBusKeyIsNullException.checkKey(key); // 1) send to key bound bus SerializedSubject subject = getSubject(new RXQueueKey(event.getClass(), key), false); // only send event, if subject exists => this means someone has at least once subscribed to it if (subject != null) subject.onNext(event); // 2) send to unbound bus if (sendToDefaultBusAsWell) sendEvent(event); }
public Observable<LogRow> getLogs() { SerializedSubject<LogRow, LogRow> subject = new SerializedSubject<>(PublishSubject.create()); ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(Example_8_Model.class.getSimpleName() + "-thread-%d").build(); final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), threadFactory); IntStream.range(1, Runtime.getRuntime().availableProcessors() + 1).forEach(value -> { executorService.submit(() -> { SysOutUtils.sysout(Thread.currentThread().getName() + " will briefly start creating lots of log rows..."); VariousUtils.sleep(1000); long incrementingNumber = 1; while (true) { subject.onNext(new LogRow( DateTimeFormatter.ISO_DATE_TIME.format(LocalDateTime.now()), "Status " + Integer.toString(ThreadLocalRandom.current().nextInt(1, 5)), "Action " + incrementingNumber + " from " + Thread.currentThread().getName())); } }); }); return subject; }
private RxBusManager(){ mSubject = new SerializedSubject<>(PublishSubject.create()); mSubscriptionMap = new HashMap<>(); }
private RxBus() { mSubject = new SerializedSubject<>(PublishSubject.create()); }
public RxBus() { bus = new SerializedSubject<>(PublishSubject.create()); }
/** * Constructor. * * @since 2016/7/5 */ public RxBus() { // Subject that, once an Observer has subscribed, // emits all subsequently observed items to the subscriber. // PublishSubject is not thread-safe, // PublishSubject need to be converted into SerializedSubject this.mBus = new SerializedSubject<>(PublishSubject.create()); }
/** * Sends an event to the bus * ATTENTION: all observers that are observing the class of the event will retrieve it * <p> * @param event the event that should be broadcasted to the bus */ public synchronized <T> void sendEvent(T event) { RXBusEventIsNullException.checkEvent(event); SerializedSubject subject = getSubject(event.getClass(), false); // only send event, if subject exists => this means someone has at least once subscribed to it if (subject != null) subject.onNext(event); }
/** * Get an observable that observes all events of the the class the * <p> * @param eventClass the class of event you want to observe * @return an Observable, that will observe all events of the @param key class */ public synchronized <T> Observable<T> observeEvent(Class<T> eventClass) { RXBusEventIsNullException.checkEvent(eventClass); SerializedSubject subject = getSubject(eventClass, true); return subject; }
/** * Get an observable that observes all events that are send with the key and are of the type of the event class * <p> * @param key the event key you want to observe * @return an Observable, that will observe all events of the @param key class */ public synchronized <T> Observable<T> observeEvent(RXQueueKey key) { if (key == null) throw new RuntimeException("You can't use a null key"); SerializedSubject subject = getSubject(key, true); return subject; }
private synchronized SerializedSubject getSubject(Class<?> key, boolean createIfMissing) { // 1) look if key already has a publisher subject, if so, return it if (mSubjectsClasses.containsKey(key)) return mSubjectsClasses.get(key); // 2) else, create a new one and put it into the map else if (createIfMissing) { SerializedSubject subject = new SerializedSubject(PublishSubject.create()); mSubjectsClasses.put(key, subject); return subject; } else return null; }
private synchronized SerializedSubject getSubject(RXQueueKey key, boolean createIfMissing) { // 1) look if key already has a publisher subject, if so, return it if (mSubjectsKeys.containsKey(key)) return mSubjectsKeys.get(key); // 2) else, create a new one and put it into the map else if (createIfMissing) { SerializedSubject subject = new SerializedSubject(PublishSubject.create()); mSubjectsKeys.put(key, subject); return subject; } else return null; }
@Override protected void onCreate(@Nullable Bundle savedInstanceState) { super.onCreate(savedInstanceState); presenter = new ListNotePresenter(Inject.get().notesStore()); clickedNotesSubject = new SerializedSubject<>(PublishSubject.<String>create()); setContentView(R.layout.view_listnotes); }
private RxBus() { bus = new SerializedSubject<>(PublishSubject.create()); }
private RxBus() { BUS = new SerializedSubject<>(PublishSubject.create()); }
public <E> void post(E event){ //subject.onNext(event); Class clazz = event.getClass(); Subject<E,E> subject = subjectHashMap.get(clazz); if(subject==null){ subject = new SerializedSubject<>(PublishSubject.<E>create()); subjectHashMap.put(clazz,subject); } subject.onNext(event); }
public <E> Observable<E> observeEvents(Class<E> eventClass) { Subject<E,E> subject = subjectHashMap.get(eventClass); if(subject==null){ subject = new SerializedSubject<>(PublishSubject.<E>create()); subjectHashMap.put(eventClass,subject); } if(eventClass.isAssignableFrom(UniqueEvent.class)&&subject.hasObservers()){ return Observable.empty();//if the event should only be received by one observer } return subject.ofType(eventClass);//pass only events of specified type, filter all other }
@Override public synchronized void start() { if (!this.running) { this.subject = new SerializedSubject(PublishSubject.create()); Observable<?> outputStream = this.processor.process(this.subject); this.subscription = outputStream.subscribe(new Action1<Object>() { @Override public void call(Object outputObject) { if (ClassUtils.isAssignable(Message.class, outputObject.getClass())) { getOutputChannel().send((Message) outputObject); } else { getOutputChannel().send(MessageBuilder.withPayload(outputObject).build()); } } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { SubjectMessageHandler.this.logger.error(throwable.getMessage(), throwable); } }, new Action0() { @Override public void call() { SubjectMessageHandler.this.logger .info("Subscription close for [" + SubjectMessageHandler.this.subscription + "]"); } }); this.running = true; } }
@Before public void setUp() { mFragment = spy(new SimpleDatafeedFragment()); mSubject = spy(new SerializedSubject<>(PublishSubject.create())); mFragment.setObservable(mSubject); mController = FragmentTestDriver.getController(mFragment) .makeTestActivityController() .makeActivity(); mActivity = mController.getActivity(); mRefreshController = mActivity.getRefreshController(); mStatusController = mActivity.getStatusController(); mEventBus = mActivity.getEventBus(); }
public RxBus() { mSubject = new SerializedSubject<>(PublishSubject.create()); }
public RxBus() { mSubject = new SerializedSubject<>(PublishSubject.create()); mSubscriptionMap = new HashMap<>(); }
private RxBus(){ observable=new SerializedSubject<>(PublishSubject.create()); }
public SerializedSubject getObservable(){ return observable; }
/** * PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者 */ public RxBus() { mSubject = new SerializedSubject<>(PublishSubject.create()); }
private void rebuild() { bus = new SerializedSubject(PublishSubject.create()); stickyEvents.clear(); }
private RxBus() { bus = new SerializedSubject<>(PublishSubject.create()); stickyBus = new SerializedSubject<>(BehaviorSubject.create()); }
private RxBus() { subject = new SerializedSubject<>(PublishSubject.create()); }
/** * Create new progress subject. */ private void createProgressSubject() { if (progressSubject == null) progressSubject = new SerializedSubject<>(BehaviorSubject.create()); }