@Test public void test6SourcesOverload() { Observable<Integer> s1 = Observable.just(1); Observable<Integer> s2 = Observable.just(2); Observable<Integer> s3 = Observable.just(3); Observable<Integer> s4 = Observable.just(4); Observable<Integer> s5 = Observable.just(5); Observable<Integer> s6 = Observable.just(6); Observable<List<Integer>> result = Observable.combineLatest(s1, s2, s3, s4, s5, s6, new Func6<Integer, Integer, Integer, Integer, Integer, Integer, List<Integer>>() { @Override public List<Integer> call(Integer t1, Integer t2, Integer t3, Integer t4, Integer t5, Integer t6) { return Arrays.asList(t1, t2, t3, t4, t5, t6); } }); @SuppressWarnings("unchecked") Observer<Object> o = mock(Observer.class); result.subscribe(o); verify(o).onNext(Arrays.asList(1, 2, 3, 4, 5, 6)); verify(o).onComplete(); verify(o, never()).onError(any(Throwable.class)); }
@SuppressWarnings("unchecked") private <T> Func6<MetricId<T>, Long, Long, Integer, Order, Integer, Observable<Row>> getDataPointFinder( MetricType<T> metricType) { Func6<MetricId<T>, Long, Long, Integer, Order, Integer, Observable<Row>> finder; finder = (Func6<MetricId<T>, Long, Long, Integer, Order, Integer, Observable<Row>>) dataPointFinders .get(metricType); if (finder == null) { throw new UnsupportedOperationException(metricType.getText()); } return finder; }
public static <T1, T2, T3, T4, T5, T6, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> combineFunction) { return combineLatest(Arrays.asList(new Observable[]{o1, o2, o3, o4, o5, o6}), Functions.fromFunc(combineFunction)); }
public static <T1, T2, T3, T4, T5, T6, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> zipFunction) { return just(new Observable[]{o1, o2, o3, o4, o5, o6}).lift(new OperatorZip(zipFunction)); }
public static final <T1, T2, T3, T4, T5, T6, R> Single<R> zip(Single<? extends T1> o1, Single<? extends T2> o2, Single<? extends T3> o3, Single<? extends T4> o4, Single<? extends T5> o5, Single<? extends T6> o6, Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> zipFunction) { return just(new Observable[]{asObservable(o1), asObservable(o2), asObservable(o3), asObservable(o4), asObservable(o5), asObservable(o6)}).lift(new OperatorZip((Func6) zipFunction)); }
public OperatorZip(Func6 f) { this.zipFunction = Functions.fromFunc(f); }
public static final <T1, T2, T3, T4, T5, T6, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> combineFunction) { return combineLatest(Arrays.asList(new Observable[]{o1, o2, o3, o4, o5, o6}), Functions.fromFunc(combineFunction)); }
public static final <T1, T2, T3, T4, T5, T6, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> zipFunction) { return just(new Observable[]{o1, o2, o3, o4, o5, o6}).lift(new OperatorZip(zipFunction)); }
@SuppressWarnings("unchecked") public static final <T1, T2, T3, T4, T5, T6, R> Observable<R> combineSinglesDelayError(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> combineFunction) { return combineSinglesDelayError(asList(o1.single(), o2.single(), o3.single(), o4.single(), o5.single(), o6.single()), fromFunc(combineFunction)); }
@SuppressWarnings({ "unchecked", "rawtypes" }) public OperatorZip(Func6 f) { this.zipFunction = Functions.fromFunc(f); }
public void startUp(Session session, String keyspace, boolean resetDb, boolean createSchema, HawkularMetricRegistry metricRegistry) { session.execute("USE " + keyspace); log.infoKeyspaceUsed(keyspace); metricsTasks = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4, new MetricsThreadFactory())); loadDataRetentions(); this.metricRegistry = metricRegistry; pointsInserter = ImmutableMap .<MetricType<?>, Func1<Observable<? extends Metric<?>>, Observable<Integer>>>builder() .put(GAUGE, metric -> { @SuppressWarnings("unchecked") Observable<Metric<Double>> gauge = (Observable<Metric<Double>>) metric; return dataAccess.insertData(gauge); }) .put(COUNTER, metric -> { @SuppressWarnings("unchecked") Observable<Metric<Long>> counter = (Observable<Metric<Long>>) metric; return dataAccess.insertData(counter); }) .put(AVAILABILITY, metric -> { @SuppressWarnings("unchecked") Observable<Metric<AvailabilityType>> avail = (Observable<Metric<AvailabilityType>>) metric; return dataAccess.insertData(avail); }) .put(STRING, metric -> { @SuppressWarnings("unchecked") Observable<Metric<String>> string = (Observable<Metric<String>>) metric; return dataAccess.insertStringDatas(string, this::getTTL, maxStringSize); }) .build(); dataPointFinders = ImmutableMap .<MetricType<?>, Func6<? extends MetricId<?>, Long, Long, Integer, Order, Integer, Observable<Row>>>builder() .put(STRING, (metricId, start, end, limit, order, pageSize) -> { @SuppressWarnings("unchecked") MetricId<String> stringId = (MetricId<String>) metricId; return dataAccess.findStringData(stringId, start, end, limit, order, pageSize); }) .build(); dataPointMappers = ImmutableMap.<MetricType<?>, Func1<Row, ? extends DataPoint<?>>> builder() .put(GAUGE, Functions::getGaugeDataPoint) .put(AVAILABILITY, Functions::getAvailabilityDataPoint) .put(COUNTER, Functions::getCounterDataPoint) .put(STRING, Functions::getStringDataPoint) .build(); tempDataPointMappers = ImmutableMap.<MetricType<?>, Func1<Row, ? extends DataPoint<?>>> builder() .put(GAUGE, Functions::getTempGaugeDataPoint) .put(COUNTER, Functions::getTempCounterDataPoint) .put(AVAILABILITY, Functions::getTempAvailabilityDataPoint) .build(); initConfiguration(session); setDefaultTTL(session, keyspace); initMetrics(); verifyAndCreateTempTables(); tagQueryParser = new SimpleTagQueryParser(this.dataAccess, this); expresssionTagQueryParser = new ExpressionTagQueryParser(this.dataAccess, this); }
@Override public <T> Observable<DataPoint<T>> findDataPoints(MetricId<T> metricId, long start, long end, int limit, Order order, int pageSize) { Timer.Context context = getRawDataReadLatency().time(); checkArgument(isValidTimeRange(start, end), "Invalid time range"); Order safeOrder = (null == order) ? Order.ASC : order; MetricType<T> metricType = metricId.getType(); Func1<Row, DataPoint<T>> mapper = getDataPointMapper(metricType); if (metricType == GAUGE || metricType == AVAILABILITY || metricType == COUNTER) { long sliceStart = DateTimeService.getTimeSlice(start, Duration.standardHours(2)); Func1<Row, DataPoint<T>> tempMapper = (Func1<Row, DataPoint<T>>) tempDataPointMappers.get(metricType); // Calls mostly deprecated methods.. // Observable<DataPoint<T>> uncompressedPoints = dataAccess.findOldData(metricId, start, end, limit, safeOrder, // pageSize).map(mapper).doOnError(Throwable::printStackTrace); Observable<DataPoint<T>> compressedPoints = dataAccess.findCompressedData(metricId, sliceStart, end, limit, safeOrder) .compose(new DataPointDecompressTransformer(metricType, safeOrder, limit, start, end)); Observable<DataPoint<T>> tempStoragePoints = dataAccess.findTempData(metricId, start, end, limit, safeOrder, pageSize) .map(tempMapper); Comparator<DataPoint<T>> comparator = getDataPointComparator(safeOrder); List<Observable<? extends DataPoint<T>>> sources = new ArrayList<>(3); // sources.add(uncompressedPoints); sources.add(compressedPoints); sources.add(tempStoragePoints); Observable<DataPoint<T>> dataPoints = SortedMerge.create(sources, comparator, false) .distinctUntilChanged( (tDataPoint, tDataPoint2) -> comparator.compare(tDataPoint, tDataPoint2) == 0); if (limit > 0) { dataPoints = dataPoints.take(limit); } return dataPoints; } Func6<MetricId<T>, Long, Long, Integer, Order, Integer, Observable<Row>> finder = getDataPointFinder(metricType); Observable<DataPoint<T>> results = finder.call(metricId, start, end, limit, safeOrder, pageSize).map(mapper); return results.doOnCompleted(context::stop); }