@Override protected AsyncRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) { setConf(conf); return new AsyncRpcClient(conf, new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { promise.setFailure(new RuntimeException("Injected fault")); } }); } }); }
@Override public void configureNewPipeline(ChannelPipeline pipeline) { serverPipelineConfigurator.configureNewPipeline(pipeline); pipeline.addLast(SSE_ENCODER_HANDLER_NAME, SERVER_SENT_EVENT_ENCODER); pipeline.addLast(SSE_RESPONSE_HEADERS_COMPLETER, new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (HttpServerResponse.class.isAssignableFrom(msg.getClass())) { @SuppressWarnings("rawtypes") HttpServerResponse rxResponse = (HttpServerResponse) msg; String contentTypeHeader = rxResponse.getHeaders().get(CONTENT_TYPE); if (null == contentTypeHeader) { rxResponse.getHeaders().set(CONTENT_TYPE, "text/event-stream"); } } super.write(ctx, msg, promise); } }); }
@Test public void invalid_http_call_should_result_in_expected_400_error() throws Exception { // given // Normal request, but fiddle with the first chunk as it's going out to remove the HTTP version and make it an // invalid HTTP call. NettyHttpClientRequestBuilder request = request() .withMethod(HttpMethod.GET) .withUri(BasicEndpoint.MATCHING_PATH) .withPipelineAdjuster( p -> p.addFirst(new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { String msgAsString = ((ByteBuf)msg).toString(CharsetUtil.UTF_8); if (msgAsString.contains("HTTP/1.1")) { msg = Unpooled.copiedBuffer(msgAsString.replace("HTTP/1.1", ""), CharsetUtil.UTF_8); } super.write(ctx, msg, promise); } }) ); // when NettyHttpClientResponse response = request.execute(downstreamServerConfig.endpointsPort(), 3000); // then verifyErrorReceived(response.payload, response.statusCode, new ApiErrorWithMetadata(SampleCoreApiError.MALFORMED_REQUEST, Pair.of("cause", "Invalid HTTP request")) ); }