/** * Get the value which %percent% of the values are less than. This works * just like median (where median represents the 50% point). A typical * desire is to see the 90% point - the value that 90% of the data points * are below, the remaining 10% are above. * * @param percent * number representing the wished percent (between <code>0</code> * and <code>1.0</code>) * @return the value which %percent% of the values are less than */ public T getPercentPoint(double percent) { if (count <= 0) { return ZERO; } if (percent >= 1.0) { return getMax(); } // use Math.round () instead of simple (long) to provide correct value rounding long target = Math.round (count * percent); try { for (Entry<T, MutableLong> val : valuesMap.entrySet()) { target -= val.getValue().longValue(); if (target <= 0){ return val.getKey(); } } } catch (ConcurrentModificationException ignored) { // ignored. May happen occasionally, but no harm done if so. } return ZERO; // TODO should this be getMin()? }
@Override public <STREAM extends WindowedStream<Tuple.WindowedTuple<Long>>> STREAM count(Option... opts) { Function.MapFunction<T, Tuple<Long>> kVMap = new Function.MapFunction<T, Tuple<Long>>() { @Override public Tuple<Long> f(T input) { if (input instanceof Tuple.TimestampedTuple) { return new Tuple.TimestampedTuple<>(((Tuple.TimestampedTuple)input).getTimestamp(), 1L); } else { return new Tuple.TimestampedTuple<>(System.currentTimeMillis(), 1L); } } }; WindowedStream<Tuple<Long>> innerstream = map(kVMap); WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createWindowedOperator(new SumLong()); return innerstream.addOperator(windowedOperator, windowedOperator.input, windowedOperator.output, opts); }
void bucketAccessed(long bucketId) { long now = System.currentTimeMillis(); if (accessedBucketIds.add(bucketId) || now - lastUpdateAccessTime > updateAccessTimeInterval) { synchronized (bucketLastAccess) { for (long id : accessedBucketIds) { MutableLong lastAccessTime = bucketLastAccess.get(id); if (lastAccessTime != null) { lastAccessTime.setValue(now); } else { bucketLastAccess.put(id, new MutableLong(now)); } } } accessedBucketIds.clear(); lastUpdateAccessTime = now; } }
@Test public void testSimpleRemoveEmpty() { WindowEndQueueManager<Query, Void> wqqm = new WindowEndQueueManager<>(); wqqm.setup(null); wqqm.beginWindow(0); QueryBundle<Query, Void, MutableLong> qb = wqqm.dequeue(); Query queryD = qb == null ? null : qb.getQuery(); Assert.assertEquals("The queries must match.", null, queryD); qb = wqqm.dequeue(); queryD = qb == null ? null : qb.getQuery(); Assert.assertEquals("The queries must match.", null, queryD); wqqm.endWindow(); wqqm.teardown(); }
@Test public void testSimpleAddOneRemove() { WindowEndQueueManager<Query, Void> wqqm = new WindowEndQueueManager<>(); wqqm.setup(null); wqqm.beginWindow(0); Query query = new MockQuery("1"); wqqm.enqueue(query, null, new MutableLong(1L)); Query queryD = wqqm.dequeue().getQuery(); QueryBundle<Query, Void, MutableLong> qb = wqqm.dequeue(); Query queryD1 = qb == null ? null : qb.getQuery(); wqqm.endWindow(); wqqm.teardown(); Assert.assertEquals("The queries must match.", query, queryD); Assert.assertEquals("The queries must match.", null, queryD1); }
@Override public void run() { int numLoops = totalTuples / batchSize; for (int loopCounter = 0, tupleCounter = 0; loopCounter < numLoops; loopCounter++, tupleCounter++) { for (int batchCounter = 0; batchCounter < batchSize; batchCounter++, tupleCounter++) { queueManager.enqueue(new MockQuery(tupleCounter + ""), null, new MutableLong(1L)); if (rand.nextDouble() < waitMillisProb) { try { Thread.sleep(1); } catch (InterruptedException ex) { throw new RuntimeException(ex); } } } } }
private WindowedOperatorImpl<Long, MutableLong, Long> createDefaultWindowedOperator() { WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = new WindowedOperatorImpl<>(); if (useSpillable) { sccImpl = new SpillableComplexComponentImpl(testMeta.timeStore); // TODO: We don't yet support Spillable data structures for window state storage because SpillableMapImpl does not yet support iterating over all keys. windowStateStorage = new InMemoryWindowedStorage<>(); SpillableWindowedPlainStorage<MutableLong> pds = new SpillableWindowedPlainStorage<>(); pds.setSpillableComplexComponent(sccImpl); plainDataStorage = pds; SpillableWindowedPlainStorage<Long> prs = new SpillableWindowedPlainStorage<>(); prs.setSpillableComplexComponent(sccImpl); plainRetractionStorage = prs; windowedOperator.addComponent("SpillableComplexComponent", sccImpl); } else { windowStateStorage = new InMemoryWindowedStorage<>(); plainDataStorage = new InMemoryWindowedStorage<>(); plainRetractionStorage = new InMemoryWindowedStorage<>(); } windowedOperator.setDataStorage(plainDataStorage); windowedOperator.setRetractionStorage(plainRetractionStorage); windowedOperator.setWindowStateStorage(windowStateStorage); windowedOperator.setAccumulation(new SumAccumulation()); return windowedOperator; }
@Test public void testValidation() throws Exception { WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = new WindowedOperatorImpl<>(); verifyValidationFailure(windowedOperator, "nothing is configured"); windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>()); verifyValidationFailure(windowedOperator, "data storage is not set"); windowedOperator.setDataStorage(new InMemoryWindowedStorage<MutableLong>()); verifyValidationFailure(windowedOperator, "accumulation is not set"); windowedOperator.setAccumulation(new SumAccumulation()); windowedOperator.validate(); windowedOperator.setTriggerOption(new TriggerOption().accumulatingAndRetractingFiredPanes()); verifyValidationFailure(windowedOperator, "retracting storage is not set for ACCUMULATING_AND_RETRACTING"); windowedOperator.setRetractionStorage(new InMemoryWindowedStorage<Long>()); windowedOperator.validate(); windowedOperator.setTriggerOption(new TriggerOption().discardingFiredPanes().firingOnlyUpdatedPanes()); verifyValidationFailure(windowedOperator, "DISCARDING is not valid for option firingOnlyUpdatedPanes"); windowedOperator.setTriggerOption(new TriggerOption().accumulatingFiredPanes().firingOnlyUpdatedPanes()); windowedOperator.setRetractionStorage(null); verifyValidationFailure(windowedOperator, "retracting storage is not set for option firingOnlyUpdatedPanes"); }
@Test public void testSlidingWindowAssignment() { WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator(); windowedOperator.setWindowOption(new WindowOption.SlidingTimeWindows(Duration.millis(1000), Duration.millis(200))); windowedOperator.setup(testMeta.operatorContext); Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(BASE + 1600L, 2L)); Collection<? extends Window> windows = windowedValue.getWindows(); Window[] winArray = windows.toArray(new Window[]{}); Assert.assertEquals(5, winArray.length); Assert.assertEquals(BASE + 800, winArray[0].getBeginTimestamp()); Assert.assertEquals(1000, winArray[0].getDurationMillis()); Assert.assertEquals(BASE + 1000, winArray[1].getBeginTimestamp()); Assert.assertEquals(1000, winArray[1].getDurationMillis()); Assert.assertEquals(BASE + 1200, winArray[2].getBeginTimestamp()); Assert.assertEquals(1000, winArray[2].getDurationMillis()); Assert.assertEquals(BASE + 1400, winArray[3].getBeginTimestamp()); Assert.assertEquals(1000, winArray[3].getDurationMillis()); Assert.assertEquals(BASE + 1600, winArray[4].getBeginTimestamp()); Assert.assertEquals(1000, winArray[4].getDurationMillis()); windowedOperator.teardown(); }
@Test public void testKeyedAccumulation() { KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = createDefaultKeyedWindowedOperator(false); windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000))); windowedOperator.setup(testMeta.operatorContext); windowedOperator.beginWindow(1); windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 100L, new KeyValPair<>("a", 2L))); windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 200L, new KeyValPair<>("a", 3L))); windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 300L, new KeyValPair<>("b", 4L))); windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 150L, new KeyValPair<>("b", 5L))); windowedOperator.endWindow(); Assert.assertEquals(1, keyedDataStorage.size()); Assert.assertEquals(5L, keyedDataStorage.get(new Window.TimeWindow(BASE, 1000), "a").longValue()); Assert.assertEquals(9L, keyedDataStorage.get(new Window.TimeWindow(BASE, 1000), "b").longValue()); windowedOperator.teardown(); }
@Override public void populateDAG(DAG dag, Configuration configuration) { WordGenerator inputOperator = new WordGenerator(); KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = new KeyedWindowedOperatorImpl<>(); Accumulation<Long, MutableLong, Long> sum = new SumAccumulation(); windowedOperator.setAccumulation(sum); windowedOperator.setDataStorage(new InMemoryWindowedKeyedStorage<String, MutableLong>()); windowedOperator.setRetractionStorage(new InMemoryWindowedKeyedStorage<String, Long>()); windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>()); windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.standardMinutes(1))); windowedOperator.setTriggerOption(TriggerOption.AtWatermark().withEarlyFiringsAtEvery(Duration.millis(1000)).accumulatingAndRetractingFiredPanes()); //windowedOperator.setAllowedLateness(Duration.millis(14000)); ConsoleOutputOperator outputOperator = new ConsoleOutputOperator(); dag.addOperator("inputOperator", inputOperator); dag.addOperator("windowedOperator", windowedOperator); dag.addOperator("outputOperator", outputOperator); dag.addStream("input_windowed", inputOperator.output, windowedOperator.input); dag.addStream("windowed_output", windowedOperator.output, outputOperator.input); }
@Override public void populateDAG(DAG dag, Configuration configuration) { RandomNumberPairGenerator inputOperator = new RandomNumberPairGenerator(); WindowedOperatorImpl<MutablePair<Double, Double>, MutablePair<MutableLong, MutableLong>, Double> windowedOperator = new WindowedOperatorImpl<>(); Accumulation<MutablePair<Double, Double>, MutablePair<MutableLong, MutableLong>, Double> piAccumulation = new PiAccumulation(); windowedOperator.setAccumulation(piAccumulation); windowedOperator.setDataStorage(new InMemoryWindowedStorage<MutablePair<MutableLong, MutableLong>>()); windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>()); windowedOperator.setWindowOption(new WindowOption.GlobalWindow()); windowedOperator.setTriggerOption(TriggerOption.AtWatermark().withEarlyFiringsAtEvery(Duration.millis(1000)).accumulatingFiredPanes()); ConsoleOutputOperator outputOperator = new ConsoleOutputOperator(); dag.addOperator("inputOperator", inputOperator); dag.addOperator("windowedOperator", windowedOperator); dag.addOperator("outputOperator", outputOperator); dag.addStream("input_windowed", inputOperator.output, windowedOperator.input); dag.addStream("windowed_output", windowedOperator.output, outputOperator.input); }
@Test public void SumTest() { SumInt si = new SumInt(); SumLong sl = new SumLong(); SumFloat sf = new SumFloat(); SumDouble sd = new SumDouble(); Assert.assertEquals(new MutableInt(10), si.accumulate(si.defaultAccumulatedValue(), 10)); Assert.assertEquals(new MutableInt(11), si.accumulate(new MutableInt(1), 10)); Assert.assertEquals(new MutableInt(22), si.merge(new MutableInt(1), new MutableInt(21))); Assert.assertEquals(new MutableLong(10L), sl.accumulate(sl.defaultAccumulatedValue(), 10L)); Assert.assertEquals(new MutableLong(22L), sl.accumulate(new MutableLong(2L), 20L)); Assert.assertEquals(new MutableLong(41L), sl.merge(new MutableLong(32L), new MutableLong(9L))); Assert.assertEquals(new MutableFloat(9.0F), sf.accumulate(sf.defaultAccumulatedValue(), 9.0F)); Assert.assertEquals(new MutableFloat(22.5F), sf.accumulate(new MutableFloat(2.5F), 20F)); Assert.assertEquals(new MutableFloat(41.0F), sf.merge(new MutableFloat(33.1F), new MutableFloat(7.9F))); Assert.assertEquals(new MutableDouble(9.0), sd.accumulate(sd.defaultAccumulatedValue(), 9.0)); Assert.assertEquals(new MutableDouble(22.5), sd.accumulate(new MutableDouble(2.5), 20.0)); Assert.assertEquals(new MutableDouble(41.0), sd.merge(new MutableDouble(33.1), new MutableDouble(7.9))); }
/** * Compute gauss mask 0. * * @param num * the num * @param sigma * the sigma * @return the double[] */ /* * num ist eigentlich pointer - aufrufende Funkion nimmt an, dass num geändert * wird. Übergebe es deswegen als MutableDouble aus CommonsLang */ public double[] compute_gauss_mask_0(MutableLong num, double sigma) { int i, n; double limit; double[] h; limit = LinesUtil.MASK_SIZE(LinesUtil.MAX_SIZE_MASK_0, sigma); /* Error < 0.001 on each side */ n = (int) limit; h = new double[2 * n + 1]; for (i = -n + 1; i <= n - 1; i++) h[n + i] = phi0(-i + 0.5, sigma) - phi0(-i - 0.5, sigma); h[0] = 1.0 - phi0(n - 0.5, sigma); h[2 * n] = phi0(-n + 0.5, sigma); num.setValue(n); return h; }
/** * Compute gauss mask 1. * * @param num * the num * @param sigma * the sigma * @return the double[] */ /* * num ist eigentlich pointer - aufrufende Funkion nimmt an, dass num geändert * wird. Übergebe es deswegen als MutableDouble aus CommonsLang */ public double[] compute_gauss_mask_1(MutableLong num, double sigma) { int i, n; double limit; double[] h; limit = LinesUtil.MASK_SIZE(LinesUtil.MAX_SIZE_MASK_1, sigma); /* Error < 0.001 on each side */ n = (int) limit; h = new double[2 * n + 1]; for (i = -n + 1; i <= n - 1; i++) h[n + i] = phi1(-i + 0.5, sigma) - phi1(-i - 0.5, sigma); h[0] = -phi1(n - 0.5, sigma); h[2 * n] = phi1(-n + 0.5, sigma); num.setValue(n); return h; }
/** * Compute gauss mask 2. * * @param num * the num * @param sigma * the sigma * @return the double[] */ /* * num ist eigentlich pointer - aufrufende Funkion nimmt an, dass num geändert * wird. Übergebe es deswegen als MutableDouble aus CommonsLang */ public double[] compute_gauss_mask_2(MutableLong num, double sigma) { int i, n; double limit; double[] h; limit = LinesUtil.MASK_SIZE(LinesUtil.MAX_SIZE_MASK_2, sigma); /* Error < 0.001 on each side */ n = (int) limit; h = new double[2 * n + 1]; for (i = -n + 1; i <= n - 1; i++) h[n + i] = phi2(-i + 0.5, sigma) - phi2(-i - 0.5, sigma); h[0] = -phi2(n - 0.5, sigma); h[2 * n] = phi2(-n + 0.5, sigma); num.setValue(n); return h; }
protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer) throws IOException { if (this.closed) { throw new IOException( "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString()); } MutableLong txidHolder = new MutableLong(); MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> { txidHolder.setValue(ringBuffer.next()); }); long txid = txidHolder.longValue(); try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) { FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore); entry.stampRegionSequenceId(we); ringBuffer.get(txid).load(entry); } finally { ringBuffer.publish(txid); } return txid; }
@Override public long freeMemory() { MutableLong totalBytesReleased = new MutableLong(0); ifNotClosed(() -> { for (FileBucket bucket : fileBuckets) { bucket.lockRead(); for (FileInfo fileInfo : bucket.getFiles()) { long bytesReleased = fileInfo.discardFileContents(); updateSizeOfCachedFileContents(-bytesReleased); totalBytesReleased.add(bytesReleased); } bucket.unlockRead(); } }); return totalBytesReleased.longValue(); }
/** * Get the value which %percent% of the values are less than. This works * just like median (where median represents the 50% point). A typical * desire is to see the 90% point - the value that 90% of the data points * are below, the remaining 10% are above. * * @param percent * @return the value which %percent% of the values are less than */ public T getPercentPoint(double percent) { if (count <= 0) { return ZERO; } if (percent >= 1.0) { return getMax(); } // use Math.round () instead of simple (long) to provide correct value rounding long target = Math.round (count * percent); try { for (Entry<T, MutableLong> val : valuesMap.entrySet()) { target -= val.getValue().longValue(); if (target <= 0){ return val.getKey(); } } } catch (ConcurrentModificationException ignored) { // ignored. May happen occasionally, but no harm done if so. } return ZERO; // TODO should this be getMin()? }
@Override public void add(String value) { if (values.containsKey(value)) { values.get(value).increment(); } else { values.put(value, new MutableLong(1)); } }
@Override public void merge(Accumulator<String, ConcurrentSkipListMap<String, MutableLong>> other) { other.getLocalValue().forEach((k, v) -> { if (values.containsKey(k)) { values.get(k).add(v.longValue()); } else { values.put(k, v); } }); }
/** * <p>Default constructor. Creates an empty design</p> * * @since 0.4.0 */ public NetPlan() { super(null, 0, 0, new AttributeMap()); this.netPlan = this; DEFAULT_ROUTING_TYPE = RoutingType.SOURCE_ROUTING; isModifiable = true; networkDescription = ""; networkName = ""; nextElementId = new MutableLong(1); layers = new ArrayList<NetworkLayer>(); nodes = new ArrayList<Node>(); srgs = new ArrayList<SharedRiskGroup>(); resources = new ArrayList<Resource>(); cache_nodesDown = new HashSet<Node>(); this.cache_type2Resources = new HashMap<String, Set<Resource>>(); this.cache_id2NodeMap = new HashMap<Long, Node>(); this.cache_id2ResourceMap = new HashMap<Long, Resource>(); this.cache_id2LayerMap = new HashMap<Long, NetworkLayer>(); this.cache_id2srgMap = new HashMap<Long, SharedRiskGroup>(); this.cache_id2LinkMap = new HashMap<Long, Link>(); this.cache_id2DemandMap = new HashMap<Long, Demand>(); this.cache_id2MulticastDemandMap = new HashMap<Long, MulticastDemand>(); this.cache_id2RouteMap = new HashMap<Long, Route>(); this.cache_id2MulticastTreeMap = new HashMap<Long, MulticastTree>(); this.cache_taggedElements = new HashMap<> (); this.cache_nodesPerSiteName = new HashMap<> (); this.cache_planningDomain2nodes = new HashMap<> (); interLayerCoupling = new DirectedAcyclicGraph<NetworkLayer, DemandLinkMapping>(DemandLinkMapping.class); defaultLayer = addLayer("Layer 0", null, null, null, null, null); }
/** * Returns the distribution of the values in the list. * * @return map containing either Integer or Long keys; entries are a Number array containing the key and the [Integer] count. * TODO - why is the key value also stored in the entry array? See Bug 53825 */ public Map<Number, Number[]> getDistribution() { Map<Number, Number[]> items = new HashMap<>(); for (Entry<T, MutableLong> entry : valuesMap.entrySet()) { Number[] dis = new Number[2]; dis[0] = entry.getKey(); dis[1] = entry.getValue(); items.put(entry.getKey(), dis); } return items; }
private void updateValueCount(T actualValue, long sampleCount) { MutableLong count = valuesMap.get(actualValue); if (count != null) { count.add(sampleCount); } else { // insert new value valuesMap.put(actualValue, new MutableLong(sampleCount)); } }
@Override public <K, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, Long>>>> STREAM countByKey(Function.ToKeyValue<T, K, Long> convertToKeyValue, Option... opts) { WindowedStream<Tuple<KeyValPair<K, Long>>> kvstream = map(convertToKeyValue); KeyedWindowedOperatorImpl<K, Long, MutableLong, Long> keyedWindowedOperator = createKeyedWindowedOperator(new SumLong()); return kvstream.addOperator(keyedWindowedOperator, keyedWindowedOperator.input, keyedWindowedOperator.output, opts); }
@Override public void endWindow() { for (QueueListNode<QueryBundle<QUERY_TYPE, META_QUERY, MutableLong>> tempNode = queryQueue.getHead(); tempNode != null; tempNode = tempNode.getNext()) { MutableLong qc = tempNode.getPayload().getQueueContext(); qc.decrement(); } }
@Override public boolean enqueue(QUERY query, META_QUERY metaQuery, MutableLong context) { if (context != null) { query.setCountdown(context.getValue()); } if (query.isOneTime()) { return super.enqueue(query, metaQuery, new MutableLong(1L)); } else { return super.enqueue(query, metaQuery, new MutableLong(query.getCountdown())); } }
@Override public boolean addingFilter(QueryBundle<QUERY, META_QUERY, MutableLong> queryBundle) { QueueListNode<QueryBundle<QUERY, META_QUERY, MutableLong>> queryNode = queryIDToNode.get(queryBundle.getQuery().getId()); if (queryNode == null) { return true; } queryNode.setPayload(queryBundle); return false; }
@Override public Result executeQuery(Query query, Void metaQuery, MutableLong queueContext) { return new DataResultSnapshot(query, currentData, queueContext.getValue()); }
@Test public void testSimpleAddRemove2() { WindowEndQueueManager<Query, Void> wqqm = new WindowEndQueueManager<>(); wqqm.setup(null); wqqm.beginWindow(0); Query query = new MockQuery("1"); wqqm.enqueue(query, null, new MutableLong(1L)); Query queryD = wqqm.dequeue().getQuery(); QueryBundle<Query, Void, MutableLong> qb = wqqm.dequeue(); Query queryD1 = qb == null ? null : qb.getQuery(); Query query1 = new MockQuery("2"); wqqm.enqueue(query1, null, new MutableLong(1L)); Query query1D = wqqm.dequeue().getQuery(); qb = wqqm.dequeue(); Query query1D1 = qb == null ? null : qb.getQuery(); wqqm.endWindow(); wqqm.teardown(); Assert.assertEquals("The queries must match.", query, queryD); Assert.assertEquals("The queries must match.", null, queryD1); Assert.assertEquals("The queries must match.", query1, query1D); Assert.assertEquals("The queries must match.", null, query1D1); }
@Test public void testSimpleAddAfterStarted() { WindowEndQueueManager<Query, Void> wqqm = new WindowEndQueueManager<>(); wqqm.setup(null); wqqm.beginWindow(0); Query query = new MockQuery("0"); wqqm.enqueue(query, null, new MutableLong(1L)); Query query1 = new MockQuery("1"); wqqm.enqueue(query1, null, new MutableLong(1L)); Query queryD = wqqm.dequeue().getQuery(); Query query2 = new MockQuery("2"); wqqm.enqueue(query2, null, new MutableLong(1L)); Query query1D = wqqm.dequeue().getQuery(); Query query2D = wqqm.dequeue().getQuery(); QueryBundle<Query, Void, MutableLong> qb = wqqm.dequeue(); Query query3D = qb == null ? null : qb.getQuery(); wqqm.endWindow(); wqqm.teardown(); Assert.assertEquals("The queries must match.", query, queryD); Assert.assertEquals("The queries must match.", query1, query1D); Assert.assertEquals("The queries must match.", query2, query2D); Assert.assertEquals("The queries must match.", null, query3D); }
@Override public MockResult executeQuery(MockQuery query, Void metaQuery, MutableLong queueContext) { if (rand.nextDouble() < waitMillisProb) { try { Thread.sleep(1); } catch (InterruptedException ex) { throw new RuntimeException(ex); } } return new MockResult(query); }
@Test public void testTimeBucketKeyExpiry() { final MutableLong purgeLessThanEqualTo = new MutableLong(-2); testMeta.timeBucketAssigner.setExpireBefore(Duration.standardSeconds(1)); testMeta.timeBucketAssigner.setBucketSpan(Duration.standardSeconds(1)); testMeta.timeBucketAssigner.setPurgeListener(new TimeBucketAssigner.PurgeListener() { @Override public void purgeTimeBucketsLessThanEqualTo(long timeBucket) { purgeLessThanEqualTo.setValue(timeBucket); } }); long referenceTime = testMeta.timeBucketAssigner.getReferenceInstant().getMillis(); testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext); Assert.assertEquals("purgeLessThanEqualTo", -2L, purgeLessThanEqualTo.longValue()); long time0 = Duration.standardSeconds(0).getMillis() + referenceTime; Assert.assertEquals("time bucket", 1, testMeta.timeBucketAssigner.getTimeBucket(time0) ); testMeta.timeBucketAssigner.endWindow(); Assert.assertEquals("purgeLessThanEqualTo", -1, purgeLessThanEqualTo.longValue()); long time1 = Duration.standardSeconds(9).getMillis() + referenceTime; Assert.assertEquals("time bucket", 10, testMeta.timeBucketAssigner.getTimeBucket(time1) ); testMeta.timeBucketAssigner.endWindow(); Assert.assertEquals("purgeLessThanEqualTo", 8, purgeLessThanEqualTo.longValue()); long time2 = Duration.standardSeconds(10).getMillis() + referenceTime; Assert.assertEquals("time bucket", 11, testMeta.timeBucketAssigner.getTimeBucket(time2) ); testMeta.timeBucketAssigner.endWindow(); Assert.assertEquals("purgeLessThanEqualTo", 9, purgeLessThanEqualTo.longValue()); //Check for expiry of time1 now Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucket(time1) ); testMeta.timeBucketAssigner.endWindow(); Assert.assertEquals("purgeLessThanEqualTo", 9, purgeLessThanEqualTo.longValue()); testMeta.timeBucketAssigner.teardown(); }
private KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> createDefaultKeyedWindowedOperator(boolean forSession) { KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = new KeyedWindowedOperatorImpl<>(); if (useSpillable) { sccImpl = new SpillableComplexComponentImpl(testMeta.timeStore); // TODO: We don't yet support Spillable data structures for window state storage because SpillableMapImpl does not yet support iterating over all keys. windowStateStorage = new InMemoryWindowedStorage<>(); if (forSession) { SpillableSessionWindowedStorage<String, MutableLong> sws = new SpillableSessionWindowedStorage<>(); sws.setSpillableComplexComponent(sccImpl); keyedDataStorage = sws; } else { SpillableWindowedKeyedStorage<String, MutableLong> kds = new SpillableWindowedKeyedStorage<>(); kds.setSpillableComplexComponent(sccImpl); keyedDataStorage = kds; } SpillableWindowedKeyedStorage<String, Long> krs = new SpillableWindowedKeyedStorage<>(); krs.setSpillableComplexComponent(sccImpl); keyedRetractionStorage = krs; windowedOperator.addComponent("SpillableComplexComponent", sccImpl); } else { windowStateStorage = new InMemoryWindowedStorage<>(); if (forSession) { keyedDataStorage = new InMemorySessionWindowedStorage<>(); } else { keyedDataStorage = new InMemoryWindowedKeyedStorage<>(); } keyedRetractionStorage = new InMemoryWindowedKeyedStorage<>(); } windowedOperator.setDataStorage(keyedDataStorage); windowedOperator.setRetractionStorage(keyedRetractionStorage); windowedOperator.setWindowStateStorage(windowStateStorage); windowedOperator.setAccumulation(new SumAccumulation()); return windowedOperator; }
@Test public void testImplicitWatermarks() { WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator(); CollectorTestSink controlSink = new CollectorTestSink(); windowedOperator.controlOutput.setSink(controlSink); windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000))); windowedOperator.setAllowedLateness(Duration.millis(1000)); windowedOperator.setImplicitWatermarkGenerator(new FixedDiffEventTimeWatermarkGen(100)); windowedOperator.setup(testMeta.operatorContext); windowedOperator.beginWindow(1); windowedOperator.endWindow(); Assert.assertEquals("We should get no watermark tuple", 0, controlSink.getCount(false)); windowedOperator.beginWindow(2); windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 100L, 2L)); windowedOperator.endWindow(); Assert.assertEquals("We should get one watermark tuple", 1, controlSink.getCount(false)); Assert.assertEquals("Check Watermark value", ((ControlTuple.Watermark)controlSink.collectedTuples.get(0)).getTimestamp(), BASE); windowedOperator.beginWindow(3); windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 900L, 4L)); windowedOperator.endWindow(); Assert.assertEquals("We should get two watermark tuples", 2, controlSink.getCount(false)); Assert.assertEquals("Check Watermark value", ((ControlTuple.Watermark)controlSink.collectedTuples.get(1)).getTimestamp(), BASE + 800); }
@Test public void testGlobalWindowAssignment() { WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator(); windowedOperator.setWindowOption(new WindowOption.GlobalWindow()); windowedOperator.setup(testMeta.operatorContext); Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(BASE + 1100L, 2L)); Collection<? extends Window> windows = windowedValue.getWindows(); Assert.assertEquals(1, windows.size()); Assert.assertEquals(Window.GlobalWindow.INSTANCE, windows.iterator().next()); windowedOperator.teardown(); }
@Test public void testTimeWindowAssignment() { WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator(); windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000))); windowedOperator.setup(testMeta.operatorContext); Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(BASE + 1100L, 2L)); Collection<? extends Window> windows = windowedValue.getWindows(); Assert.assertEquals(1, windows.size()); Window window = windows.iterator().next(); Assert.assertEquals(BASE + 1000, window.getBeginTimestamp()); Assert.assertEquals(1000, window.getDurationMillis()); }
@Override public MutablePair<MutableLong, MutableLong> accumulate(MutablePair<MutableLong, MutableLong> accumulatedValue, MutablePair<Double, Double> input) { if (input.getLeft() * input.getLeft() + input.getRight() * input.getRight() < 1) { accumulatedValue.getLeft().increment(); } accumulatedValue.getRight().increment(); return accumulatedValue; }
@Override public MutablePair<MutableLong, MutableLong> merge(MutablePair<MutableLong, MutableLong> accumulatedValue1, MutablePair<MutableLong, MutableLong> accumulatedValue2) { accumulatedValue1.getLeft().add(accumulatedValue2.getLeft()); accumulatedValue1.getRight().add(accumulatedValue2.getRight()); return accumulatedValue1; }
@Override public void populateDAG(DAG dag, Configuration configuration) { WordGenerator inputOperator = new WordGenerator(); KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = new KeyedWindowedOperatorImpl<>(); Accumulation<Long, MutableLong, Long> sum = new SumLong(); windowedOperator.setAccumulation(sum); windowedOperator.setDataStorage(new InMemoryWindowedKeyedStorage<String, MutableLong>()); windowedOperator.setRetractionStorage(new InMemoryWindowedKeyedStorage<String, Long>()); windowedOperator.setWindowStateStorage(new InMemoryWindowedStorage<WindowState>()); windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.standardMinutes(1))); windowedOperator.setTriggerOption(TriggerOption.AtWatermark().withEarlyFiringsAtEvery( Duration.millis(1000)).accumulatingAndRetractingFiredPanes()); ConsoleOutputOperator consoleOutput = new ConsoleOutputOperator(); GenericFileOutputOperator<Object> fileOutput = new GenericFileOutputOperator<>(); fileOutput.setConverter(new ToStringConverter()); dag.addOperator("inputOperator", inputOperator); dag.addOperator("windowedOperator", windowedOperator); dag.addOperator("consoleOutput", consoleOutput); dag.addOperator("output", fileOutput); dag.addStream("input_windowed", inputOperator.output, windowedOperator.input); dag.addStream("windowed_output", windowedOperator.output, consoleOutput.input, fileOutput.input); }