@Before public void beforeMethod() { failureCallbackMock = mock(FailureCallback.class); inObj = new Exception("kaboom"); throwExceptionDuringCall = false; currentSpanStackWhenFailureCallbackWasCalled = new ArrayList<>(); currentMdcInfoWhenFailureCallbackWasCalled = new ArrayList<>(); doAnswer(invocation -> { currentSpanStackWhenFailureCallbackWasCalled.add(Tracer.getInstance().getCurrentSpanStackCopy()); currentMdcInfoWhenFailureCallbackWasCalled.add(MDC.getCopyOfContextMap()); if (throwExceptionDuringCall) throw new RuntimeException("kaboom"); return null; }).when(failureCallbackMock).onFailure(inObj); resetTracing(); }
@Override public void addCallback(SuccessCallback<? super V> successCallback, FailureCallback failureCallback) { try { successCallback.onSuccess(this.value); } catch (Throwable ex) { failureCallback.onFailure(ex); } }
private void simulateMqttConnectionSuccess() { doAnswer(invocationOnMock -> { SuccessCallback<String> originalArgument = invocationOnMock.getArgumentAt(0, SuccessCallback.class); originalArgument.onSuccess("connectedClientId"); return null; }).when(mockOpenLpwaMqttProvider).connect(Matchers.any(), any(FailureCallback.class)); }
private void simulateMqttConnectionFailure() { doAnswer(invocationOnMock -> { FailureCallback originalArgument = invocationOnMock.getArgumentAt(1, FailureCallback.class); originalArgument.onFailure(new Exception("simulateMqttConnectionFailure")); return null; }).when(mockOpenLpwaMqttProvider).connect(Matchers.any(), any(FailureCallback.class)); }
private void simulateMqttDisconnectionSuccess() { doAnswer(invocationOnMock -> { SuccessCallback<String> originalArgument = invocationOnMock.getArgumentAt(0, SuccessCallback.class); originalArgument.onSuccess("disconnectedClientId"); return null; }).when(mockOpenLpwaMqttProvider).disconnect(Matchers.any(), any(FailureCallback.class)); }
private void simulateMqttDisconnectionFailure() { doAnswer(invocationOnMock -> { FailureCallback originalArgument = invocationOnMock.getArgumentAt(1, FailureCallback.class); originalArgument.onFailure(new Exception("simulateMqttDisconnectionFailure")); return null; }).when(mockOpenLpwaMqttProvider).disconnect(Matchers.any(), any(FailureCallback.class)); }
private void simulateMqttSubscriptionSuccess() { doAnswer(invocationOnMock -> { SuccessCallback<String> originalArgument = invocationOnMock.getArgumentAt(0, SuccessCallback.class); originalArgument.onSuccess("subscribedClientId"); return null; }).when(mockOpenLpwaMqttProvider).subscribe(Matchers.any(), any(FailureCallback.class)); }
private void simulateMqttSubscriptionFailure() { doAnswer(invocationOnMock -> { FailureCallback originalArgument = invocationOnMock.getArgumentAt(1, FailureCallback.class); originalArgument.onFailure(new Exception("simulateMqttSubscriptionFailure")); return null; }).when(mockOpenLpwaMqttProvider).subscribe(Matchers.any(), any(FailureCallback.class)); }
private void simulateOpenLpwaProviderGetDeviceInformationFailure() throws ConfigurationException { ArgumentCaptor<FailureCallback> failureArg = ArgumentCaptor.forClass(FailureCallback.class); ListenableFuture<DeviceInfo> responseFuture = mock(ListenableFuture.class); when(mockLpwaProvider.getDeviceInformation(anyString())).thenReturn(responseFuture); doAnswer(invocationOnMock -> { FailureCallback originalArgument = invocationOnMock.getArgumentAt(1, FailureCallback.class); originalArgument.onFailure(new Exception()); return null; }).when(responseFuture).addCallback(any(), failureArg.capture()); }
private void simulateNgsiManagerSubscribeToCommandsFailure() throws AgentException { ArgumentCaptor<FailureCallback> failureArg = ArgumentCaptor.forClass(FailureCallback.class); ListenableFuture<SubscribeContextResponse> responseFuture = mock(ListenableFuture.class); when(mockNgsiManager.subscribeToCommands(any(Device.class))).thenReturn(responseFuture); doAnswer(invocationOnMock -> { FailureCallback originalArgument = invocationOnMock.getArgumentAt(1, FailureCallback.class); originalArgument.onFailure(new Exception()); return null; }).when(responseFuture).addCallback(any(), failureArg.capture()); }
private void simulateNgsiManagerUnsubscribeFailure() throws AgentException { ArgumentCaptor<FailureCallback> failureArg = ArgumentCaptor.forClass(FailureCallback.class); ListenableFuture<UnsubscribeContextResponse> responseFuture = mock(ListenableFuture.class); when(mockNgsiManager.unsubscribe(anyString())).thenReturn(responseFuture); doAnswer(invocationOnMock -> { FailureCallback originalArgument = invocationOnMock.getArgumentAt(1, FailureCallback.class); originalArgument.onFailure(new Exception()); return null; }).when(responseFuture).addCallback(any(), failureArg.capture()); }
private void simulateOpenLpwaProviderRegisterDeviceCommandFailure() throws ConfigurationException { ArgumentCaptor<FailureCallback> failureArg = ArgumentCaptor.forClass(FailureCallback.class); ListenableFuture<DeviceCommand> responseFuture = mock(ListenableFuture.class); when(mockLpwaProvider.registerDeviceCommand(anyString(), any(RegisterDeviceCommandParameter.class))).thenReturn(responseFuture); doAnswer(invocationOnMock -> { FailureCallback originalArgument = invocationOnMock.getArgumentAt(1, FailureCallback.class); originalArgument.onFailure(new Exception()); return null; }).when(responseFuture).addCallback(any(), failureArg.capture()); }
/** * Constructor that uses the given trace and MDC information, which will be associated with the thread when the * given operation is executed. * * <p>The operation you pass in cannot be null (an {@link IllegalArgumentException} will be thrown if you pass in * null for the operation). * * <p>The trace and/or MDC info can be null and no error will be thrown, however any trace or MDC info that is null * means the corresponding info will not be available to the thread when the operation is executed. */ public FailureCallbackWithTracing(FailureCallback origFailureCallback, Deque<Span> spanStackForExecution, Map<String, String> mdcContextMapForExecution) { if (origFailureCallback == null) throw new IllegalArgumentException("origFailureCallback cannot be null"); this.origFailureCallback = origFailureCallback; this.spanStackForExecution = spanStackForExecution; this.mdcContextMapForExecution = mdcContextMapForExecution; }
@Before public void beforeMethod() { resetTracing(); httpMessageMock = mock(HttpMessage.class); headersMock = mock(HttpHeaders.class); doReturn(headersMock).when(httpMessageMock).getHeaders(); successCallbackMock = mock(SuccessCallback.class); failureCallbackMock = mock(FailureCallback.class); listenableFutureCallbackMock = mock(ListenableFutureCallback.class); }
private void verifyFailureCallbackWithTracing(FailureCallback result, FailureCallback expectedCoreInstance, Deque<Span> expectedSpanStack, Map<String, String> expectedMdcInfo) { assertThat(result).isInstanceOf(FailureCallbackWithTracing.class); assertThat(Whitebox.getInternalState(result, "origFailureCallback")).isSameAs(expectedCoreInstance); assertThat(Whitebox.getInternalState(result, "spanStackForExecution")).isEqualTo(expectedSpanStack); assertThat(Whitebox.getInternalState(result, "mdcContextMapForExecution")).isEqualTo(expectedMdcInfo); }
@Test public void failureCallbackWithTracing_using_current_thread_info_works_as_expected() { // given Pair<Deque<Span>, Map<String, String>> setupInfo = setupCurrentThreadWithTracingInfo(); // when FailureCallback result = failureCallbackWithTracing(failureCallbackMock); // then verifyFailureCallbackWithTracing(result, failureCallbackMock, setupInfo.getLeft(), setupInfo.getRight()); }
@Test public void failureCallbackWithTracing_pair_works_as_expected() { // given Pair<Deque<Span>, Map<String, String>> setupInfo = generateTracingInfo(); // when FailureCallback result = failureCallbackWithTracing(failureCallbackMock, setupInfo); // then verifyFailureCallbackWithTracing(result, failureCallbackMock, setupInfo.getLeft(), setupInfo.getRight()); }
@Test public void failureCallbackWithTracing_separate_args_works_as_expected() { // given Pair<Deque<Span>, Map<String, String>> setupInfo = generateTracingInfo(); // when FailureCallback result = failureCallbackWithTracing( failureCallbackMock, setupInfo.getLeft(), setupInfo.getRight() ); // then verifyFailureCallbackWithTracing(result, failureCallbackMock, setupInfo.getLeft(), setupInfo.getRight()); }
@Override public void addCallback(SuccessCallback<? super V> successCallback, FailureCallback failureCallback) { try { if (this.executionException != null) { Throwable cause = this.executionException.getCause(); failureCallback.onFailure(cause != null ? cause : this.executionException); } else { successCallback.onSuccess(this.value); } } catch (Throwable ex) { // Ignore } }
@Override public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) { this.registry.addSuccessCallback(successCallback); this.registry.addFailureCallback(failureCallback); }
public void addFailureCallback(FailureCallback callback) { this.callbacks.addFailureCallback(callback); }
@Override public void addCallback(SuccessCallback<? super ClientHttpResponse> successCallback, FailureCallback failureCallback) { this.callback.addSuccessCallback(successCallback); this.callback.addFailureCallback(failureCallback); }
/** * @return A {@link FailureCallback} that wraps the given original so that the given distributed tracing and MDC * information is registered with the thread and therefore available during execution and unregistered after * execution. */ public static FailureCallback failureCallbackWithTracing(FailureCallback failureCallback, Deque<Span> spanStackToLink, Map<String, String> mdcContextMapToLink) { return new FailureCallbackWithTracing(failureCallback, spanStackToLink, mdcContextMapToLink); }
@Override protected <T> ListenableFuture<T> doExecute(URI url, HttpMethod method, AsyncRequestCallback requestCallback, ResponseExtractor<T> responseExtractor) throws RestClientException { final ListenableFuture<T> future = super.doExecute(url, method, requestCallback, responseExtractor); final Span span = this.tracer.getCurrentSpan(); future.addCallback(new TraceListenableFutureCallback<>(this.tracer, span, this.errorParser)); // potential race can happen here if (span != null && span.equals(this.tracer.getCurrentSpan())) { Span parent = this.tracer.detach(span); if (parent != null) { this.tracer.continueSpan(parent); } } return new ListenableFuture<T>() { @Override public boolean cancel(boolean mayInterruptIfRunning) { return future.cancel(mayInterruptIfRunning); } @Override public boolean isCancelled() { return future.isCancelled(); } @Override public boolean isDone() { return future.isDone(); } @Override public T get() throws InterruptedException, ExecutionException { return future.get(); } @Override public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return future.get(timeout, unit); } @Override public void addCallback(ListenableFutureCallback<? super T> callback) { future.addCallback(new TraceListenableFutureCallbackWrapper<>(TraceAsyncRestTemplate.this.tracer, span, callback)); } @Override public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) { future.addCallback( new TraceSuccessCallback<>(TraceAsyncRestTemplate.this.tracer, span, successCallback), new TraceFailureCallback(TraceAsyncRestTemplate.this.tracer, span, failureCallback)); } }; }
private TraceFailureCallback(Tracer tracer, Span parent, FailureCallback delegate) { this.tracer = tracer; this.parent = parent; this.delegate = delegate; }
@Override public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) { getWrappedFuture().addCallbacks(successCallback::onSuccess, failureCallback::onFailure); }
@Override public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) { getWrappedFuture().addCallback(successCallback, failureCallback); }
/** * Constructor that uses the given trace and MDC information, which will be associated with the thread when the * given operation is executed. * * <p>The operation you pass in cannot be null (an {@link IllegalArgumentException} will be thrown if you pass in * null for the operation). * * <p>The {@link Pair} can be null, or you can pass null for the left and/or right side of the pair, and no error * will be thrown. Any trace or MDC info that is null means the corresponding info will not be available to the * thread when the operation is executed however. * * <p>You can pass in a {@link TracingState} for clearer less verbose code since it extends * {@code Pair<Deque<Span>, Map<String, String>>}. */ public FailureCallbackWithTracing(FailureCallback origFailureCallback, Pair<Deque<Span>, Map<String, String>> originalThreadInfo) { this( origFailureCallback, (originalThreadInfo == null) ? null : originalThreadInfo.getLeft(), (originalThreadInfo == null) ? null : originalThreadInfo.getRight() ); }
/** * Constructor that extracts the current tracing and MDC information from the current thread using {@link * Tracer#getCurrentSpanStackCopy()} and {@link MDC#getCopyOfContextMap()}, and forwards the information to * the {@link FailureCallbackWithTracing#FailureCallbackWithTracing(FailureCallback, Deque, Map)} * constructor. That tracing and MDC information will be associated with the thread when the given operation is * executed. * * <p>The operation you pass in cannot be null (an {@link IllegalArgumentException} will be thrown if you pass in * null for the operation). */ public FailureCallbackWithTracing(FailureCallback origFailureCallback) { this(origFailureCallback, Tracer.getInstance().getCurrentSpanStackCopy(), MDC.getCopyOfContextMap()); }
/** * Equivalent to calling {@code new FailureCallbackWithTracing(origFailureCallback)} - this allows you to do a static method * import for cleaner looking code in some cases. This method ultimately extracts the current tracing and MDC * information from the current thread using {@link Tracer#getCurrentSpanStackCopy()} and {@link * MDC#getCopyOfContextMap()}. That tracing and MDC information will be associated with the thread when the given * operation is executed. * * <p>The operation you pass in cannot be null (an {@link IllegalArgumentException} will be thrown if you pass in * null for the operation). * * @return {@code new FailureCallbackWithTracing(origFailureCallback)}. * @see FailureCallbackWithTracing#FailureCallbackWithTracing(FailureCallback) * @see FailureCallbackWithTracing */ public static FailureCallbackWithTracing withTracing(FailureCallback origFailureCallback) { return new FailureCallbackWithTracing(origFailureCallback); }
/** * Equivalent to calling {@code new FailureCallbackWithTracing(origFailureCallback, originalThreadInfo)} - this allows you * to do a static method import for cleaner looking code in some cases. This method uses the given trace and MDC * information, which will be associated with the thread when the given operation is executed. * * <p>The operation you pass in cannot be null (an {@link IllegalArgumentException} will be thrown if you pass in * null for the operation). * * <p>The {@link Pair} can be null, or you can pass null for the left and/or right side of the pair, and no error * will be thrown. Any trace or MDC info that is null means the corresponding info will not be available to the * thread when the operation is executed however. * * <p>You can pass in a {@link TracingState} for clearer less verbose code since it extends * {@code Pair<Deque<Span>, Map<String, String>>}. * * @return {@code new FailureCallbackWithTracing(origFailureCallback, originalThreadInfo)}. * @see FailureCallbackWithTracing#FailureCallbackWithTracing(FailureCallback, Pair) * @see FailureCallbackWithTracing */ public static FailureCallbackWithTracing withTracing(FailureCallback origFailureCallback, Pair<Deque<Span>, Map<String, String>> originalThreadInfo) { return new FailureCallbackWithTracing(origFailureCallback, originalThreadInfo); }
/** * Equivalent to calling {@code * new FailureCallbackWithTracing(origFailureCallback, spanStackForExecution, mdcContextMapForExecution)} - * this allows you to do a static method import for cleaner looking code in some cases. This method uses the given * trace and MDC information, which will be associated with the thread when the given operation is executed. * * <p>The operation you pass in cannot be null (an {@link IllegalArgumentException} will be thrown if you pass in * null for the operation). * * <p>The trace and/or MDC info can be null and no error will be thrown, however any trace or MDC info that is null * means the corresponding info will not be available to the thread when the operation is executed. * * @return {@code new FailureCallbackWithTracing(origFailureCallback, spanStackForExecution, mdcContextMapForExecution)}. * @see FailureCallbackWithTracing#FailureCallbackWithTracing(FailureCallback, Deque, Map) * @see FailureCallbackWithTracing */ public static FailureCallbackWithTracing withTracing(FailureCallback origFailureCallback, Deque<Span> spanStackForExecution, Map<String, String> mdcContextMapForExecution) { return new FailureCallbackWithTracing(origFailureCallback, spanStackForExecution, mdcContextMapForExecution); }
/** * @return A {@link FailureCallback} that wraps the given original so that the <b>current thread's</b> tracing and MDC * information is registered with the thread and therefore available during execution and unregistered after * execution. * * <p>NOTE: The current thread's tracing and MDC info will be extracted using {@link * Tracer#getCurrentSpanStackCopy()} and {@link MDC#getCopyOfContextMap()}. */ public static FailureCallback failureCallbackWithTracing(FailureCallback failureCallback) { return new FailureCallbackWithTracing(failureCallback); }
/** * @return A {@link FailureCallback} that wraps the given original so that the given distributed tracing and MDC * information is registered with the thread and therefore available during execution and unregistered after * execution. You can pass in a {@link TracingState} for clearer less verbose code since it extends * {@code Pair<Deque<Span>, Map<String, String>>}. */ public static FailureCallback failureCallbackWithTracing(FailureCallback failureCallback, Pair<Deque<Span>, Map<String, String>> threadInfoToLink) { return new FailureCallbackWithTracing(failureCallback, threadInfoToLink); }