public CyclicBuffer<E> getOrCreate(String key, long timestamp) { TEntry<E> te = getFromEitherList(key); if (te == null) { CyclicBuffer<E> cb = new CyclicBuffer<E>(bufferSize); te = new TEntry<E>(key, cb, timestamp); liveList.add(te); if (liveList.size() > maxComponents) { Collections.sort(liveList); liveList.remove(0); } } else { te.timestamp = timestamp; Collections.sort(liveList); } return te.value; }
void play(SimulationEvent simulationEvent, ComponentTracker<CyclicBuffer<Object>> tracker) { String key = simulationEvent.key; long timestamp = simulationEvent.timestamp; EventType eventType = simulationEvent.eventType; switch (eventType) { case INSERT: tracker.getOrCreate(key, timestamp); break; case END_OF_LIFE: tracker.endOfLife(key); break; case REMOVE_STALE: tracker.removeStaleComponents(timestamp); break; } }
@Test public void smoke() { long now = 3000; CyclicBuffer<Object> cb = tracker.getOrCreate(key, now); assertEquals(cb, tracker.getOrCreate(key, now++)); now += CyclicBufferTracker.DEFAULT_TIMEOUT + 1000; tracker.removeStaleComponents(now); assertEquals(0, tracker.liveKeysAsOrderedList().size()); assertEquals(0, tracker.getComponentCount()); }
@Test public void destroy() { long now = 3000; CyclicBuffer<Object> cb = tracker.getOrCreate(key, now); cb.add(new Object()); assertEquals(1, cb.length()); tracker.endOfLife(key); now += CyclicBufferTracker.LINGERING_TIMEOUT + 10; tracker.removeStaleComponents(now); assertEquals(0, tracker.liveKeysAsOrderedList().size()); assertEquals(0, tracker.getComponentCount()); assertEquals(0, cb.length()); }
/** * Perform SMTPAppender specific appending actions, mainly adding the event to * a cyclic buffer. */ protected void subAppend(CyclicBuffer<ILoggingEvent> cb, ILoggingEvent event) { if(includeCallerData) { event.getCallerData(); } event.prepareForDeferredProcessing(); cb.add(event); }
@Override protected void fillBuffer(CyclicBuffer<ILoggingEvent> cb, StringBuffer sbuf) { int len = cb.length(); for (int i = 0; i < len; i++) { ILoggingEvent event = cb.get(); sbuf.append(layout.doLayout(event)); } }
@Override protected void fillBuffer(CyclicBuffer<IAccessEvent> cb, StringBuffer sbuf) { int len = cb.length(); for (int i = 0; i < len; i++) { // sbuf.append(MimeUtility.encodeText(layout.format(cb.getOrCreate()))); IAccessEvent event = cb.get(); sbuf.append(layout.doLayout(event)); } }
/** * Perform SMTPAppender specific appending actions, delegating some of them to a subclass and checking if the event * triggers an e-mail to be sent. */ protected void append(E eventObject) { if (!checkEntryConditions()) { return; } String key = discriminator.getDiscriminatingValue(eventObject); long now = System.currentTimeMillis(); final CyclicBuffer<E> cb = cbTracker.getOrCreate(key, now); subAppend(cb, eventObject); try { if (eventEvaluator.evaluate(eventObject)) { // clone the CyclicBuffer before sending out asynchronously CyclicBuffer<E> cbClone = new CyclicBuffer<E>(cb); // see http://jira.qos.ch/browse/LBCLASSIC-221 cb.clear(); if (asynchronousSending) { // perform actual sending asynchronously SenderRunnable senderRunnable = new SenderRunnable(cbClone, eventObject); context.getExecutorService().execute(senderRunnable); } else { // synchronous sending sendBuffer(cbClone, eventObject); } } } catch (EvaluationException ex) { errorCount++; if (errorCount < CoreConstants.MAX_ERROR_COUNT) { addError("SMTPAppender's EventEvaluator threw an Exception-", ex); } } // immediately remove the buffer if asked by the user if (eventMarksEndOfLife(eventObject)) { cbTracker.endOfLife(key); } cbTracker.removeStaleComponents(now); if ((lastTrackerStatusPrint + delayBetweenStatusMessages) < now) { addInfo("SMTPAppender [" + name + "] is tracking [" + cbTracker.getComponentCount() + "] buffers"); lastTrackerStatusPrint = now; // quadruple 'delay' assuming less than max delay if (delayBetweenStatusMessages < MAX_DELAY_BETWEEN_STATUS_MESSAGES) { delayBetweenStatusMessages *= 4; } } }
SenderRunnable(CyclicBuffer<E> cyclicBuffer, E e) { this.cyclicBuffer = cyclicBuffer; this.e = e; }
@Override protected void processPriorToRemoval(CyclicBuffer<E> component) { component.clear(); }
@Override protected CyclicBuffer<E> buildComponent(String key) { return new CyclicBuffer<E>(bufferSize); }
@Override protected boolean isComponentStale(CyclicBuffer<E> eCyclicBuffer) { return false; }
/** * Perform SMTPAppender specific appending actions, delegating some of them to * a subclass and checking if the event triggers an e-mail to be sent. */ protected void append(E eventObject) { if (!checkEntryConditions()) { return; } String key = discriminator.getDiscriminatingValue(eventObject); long now = System.currentTimeMillis(); final CyclicBuffer<E> cb = cbTracker.getOrCreate(key, now); subAppend(cb, eventObject); try { if (eventEvaluator.evaluate(eventObject)) { // clone the CyclicBuffer before sending out asynchronously CyclicBuffer<E> cbClone = new CyclicBuffer<E>(cb); // see http://jira.qos.ch/browse/LBCLASSIC-221 cb.clear(); if (asynchronousSending) { // perform actual sending asynchronously SenderRunnable senderRunnable = new SenderRunnable(cbClone, eventObject); context.getExecutorService().execute(senderRunnable); } else { // synchronous sending sendBuffer(cbClone, eventObject); } } } catch (EvaluationException ex) { errorCount++; if (errorCount < CoreConstants.MAX_ERROR_COUNT) { addError("SMTPAppender's EventEvaluator threw an Exception-", ex); } } // immediately remove the buffer if asked by the user if (eventMarksEndOfLife(eventObject)) { cbTracker.endOfLife(key); } cbTracker.removeStaleComponents(now); if (lastTrackerStatusPrint + delayBetweenStatusMessages < now) { addInfo("SMTPAppender [" + name + "] is tracking [" + cbTracker.getComponentCount() + "] buffers"); lastTrackerStatusPrint = now; // quadruple 'delay' assuming less than max delay if (delayBetweenStatusMessages < MAX_DELAY_BETWEEN_STATUS_MESSAGES) { delayBetweenStatusMessages *= 4; } } }
public void start() { cb = new CyclicBuffer<E>(maxSize); super.start(); }
public CyclicBuffer<E> find(String key) { TEntry<E> te = getFromEitherList(key); if(te == null) return null; else return te.value; }
TEntry(String k, CyclicBuffer<E> v, long timestamp) { this.key = k; this.value = v; this.timestamp = timestamp; }
/** * Perform SMTPAppender specific appending actions, mainly adding the event to * the appropriate cyclic buffer. */ @Override protected void subAppend(CyclicBuffer<IAccessEvent> cb, IAccessEvent event) { cb.add(event); }
protected abstract void subAppend(CyclicBuffer<E> cb, E eventObject);
protected abstract void fillBuffer(CyclicBuffer<E> cb, StringBuffer sbuf);
abstract protected void subAppend(CyclicBuffer<E> cb, E eventObject);
abstract protected void fillBuffer(CyclicBuffer<E> cb, StringBuffer sbuf);