使用Spring WebFlux构建响应式REST API


在本文中,我们将看到如何使用Spring WebFlux构建响应式REST API。在进入反应式API之前,让我们看看系统是如何发展的,传统REST实现遇到的问题以及现代API的需求。

如果您查看从旧版系统到下文所述的现代系统的期望,

legacy-modern-syetems-expectations.png

现代系统的期望是:应将应用程序分布式,Cloud Native,拥抱以实现高可用性和可伸缩性。因此,有效利用系统资源至关重要。进入为什么要使用响应式编程来构建REST API?让我们看看传统的REST API请求处理是如何工作的。

traditional-rest-api-requestresponse.png

以下是传统REST API遇到的问题,

阻止和同步→请求正在阻止和同步。请求线程将等待任何阻塞的I / O,并且直到I / O等待结束后,线程才可以释放以将响应返回给调用方。 每个请求的线程数→ Web容器使用每个请求 的线程数模型。这限制了要处理的并发请求的数量。除了某些请求之外,容器还会对请求进行排队,这些请求最终会影响API的性能。 处理高并发用户的限制 → 由于Web容器使用每个请求线程数模型,因此我们无法处理高并发请求。 无法更好地利用系统资源 → 线程将因I / O而阻塞并处于空闲状态。但是,Web容器不能接受更多请求。在这种情况下,我们将无法有效地利用系统资源。 没有反压支持→ 我们无法从客户端或服务器施加反压。如果突然出现大量请求,则服务器或客户端可能会中断。之后,用户将无法访问该应用程序。如果我们有背压支持,则应用程序应在重负载期间持续运行,而不是无法使用。 让我们看看如何使用反应式编程解决上述问题。以下是我们使用反应式API可获得的优势。

异步和非阻塞→ 反应式编程为编写异步和非阻塞应用程序提供了灵活性。 事件/消息驱动→ 系统将为任何活动生成事件或消息。例如,来自数据库的数据被视为事件流。 支持背压→ 我们可以通过施加背压来 优雅地处理从一个系统到另一个系统的压力,从而避免拒绝服务。 可预测的应用程序响应时间→ 由于线程是异步且非阻塞的,因此在负载下应用程序响应时间是可预测的。 更好地利用系统资源→ 由于线程是异步且非阻塞的,因此不会为I / O占用 线程。使用更少的线程,我们可以支持更多的用户请求。 根据负载缩放 远离每个请求的线程 → 通过反应式API,我们正在远离每个请求的线程模型,因为线程是异步且非阻塞的。发出请求后,它将与服务器一起创建事件,并且请求线程将被释放以处理其他请求。 现在,让我们看看反应式编程是如何工作的。在下面的示例中,一旦应用程序调用了从数据源获取数据的操作,线程将立即返回,并且来自数据源的数据将作为数据/事件流出现。在这里,应用程序是订阅者,数据源是发布者。数据流完成后,onComplete将触发事件。

back-pressure-on-reactive-streams.png

下面是另一种情况,onError如果发生任何异常,发布者将触发事件。 data-flow-as-an-eventmessage-driven-stream-2.png

在某些情况下,可能没有发布者要交付的任何物品。例如,从数据库中删除一个项目。在这种情况下,发布者将立即触发onComplete/onError事件,而无需调用onNext事件,因为没有数据可返回。

data-flow-as-an-eventmessage-driven-stream-3.png

现在,让我们看看什么是背压 以及如何将背压应用于反应性物流。 例如,我们有一个客户端应用程序正在从另一个服务请求数据。该服务能够以1000TPS的速率发布事件,但是客户端应用程序能够以200TPS的速率处理事件。

在这种情况下,客户端应用程序应缓冲其余数据以进行处理。在随后的调用中,客户端应用程序可能会缓冲更多数据,并最终耗尽内存。这会对依赖于客户端应用程序的其他应用程序造成级联效应。为了避免这种情况,客户端应用程序可以要求服务在事件末尾缓冲事件,并以客户端应用程序的速率推送事件。这称为背压。下图描述了相同的内容。

back-pressure-on-reactive-streams.png

现在,我们将看到反应流规范及其实现之一,称为Project Reactor。反应流规范定义了以下接口。让我们查看这些接口的详细信息。

发布者→ 发布者是数量可能不受限制的序列元素的提供者,可按其订阅者的要求发布它们

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

Subscriber → Subscriber 是数量可能不受限制的已排序元素的使用者。

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

Subscription → 订阅表示订阅者订阅发布者的一对一生命周期。

public interface Subscription {
    public void request(long n);
    public void cancel();
}

处理器 → 处理器代表一个处理阶段-既是订户又是发布者,并且服从两者的合同。

反应流规格的类图如下所示。 screen-shot-2020-06-07-at-112348-am.png

反应性流规范具有许多实现。Project Reactor是实现之一。该反应堆完全无阻塞,可提供有效的需求管理。Reactor提供了两个反应式和可组合的API,即Flux [N]和Mono [0 | 1],它们广泛实现了Reactive Extensions。Reactor提供了用于HTTP(包括Websockets),TCP和UDP的非阻塞,反压就绪的网络引擎。它非常适合微服务架构。

通量→ 它是Publisher带有rx运算符的反应性流,它发出0到N个元素,然后完成(成功或有错误)。助焊剂的大理石图如下所示。

project-reactor-flux-marble-diagram.png Mono → 它是Publisher具有基本rx运算符的反应式流,它通过发出0到1个元素或有错误而成功完成。Mono的大理石图如下所示。

project-reactor-mono-marble-diagram.png 由于Spring 5.x随Reactor实施一起提供,因此,如果我们想使用带有Spring servlet堆栈的命令式编程来构建REST API,它仍然支持。下图说明了Spring如何支持反应式和servlet堆栈实现。

-reactive.png

现在,我们将看到一个应用程序,以公开响应式REST API。在此应用程序中,我们使用了:

  • 带有WebFlux的Spring Boot
  • 具有响应式支持的Cassandra的Spring数据
  • 卡桑德拉数据库

下面是应用程序的高级体系结构。

-webflux.png 响应式演示应用程序工作流程

让我们看一下build.gradle文件,以查看与Spring WebFlux一起使用的依赖项。

plugins {
    id 'org.springframework.boot' version '2.2.6.RELEASE'
    id 'io.spring.dependency-management' version '1.0.9.RELEASE'
    id 'java'
}
group = 'org.smarttechie'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'
repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-data-cassandra-reactive'
    implementation 'org.springframework.boot:spring-boot-starter-webflux'
    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
    testImplementation 'io.projectreactor:reactor-test'
}

test {
    useJUnitPlatform()
}

在此应用程序中,我公开了以下提到的API。您可以从GitHub下载源代码 。

Endpoint URI Response
Create a Product /product Created product as Mono
All products /products returns all products as Flux
Delate a product /product/{id} Mono
Update a product /product/{id} Updated product as Mono
package org.smarttechie.controller;
import org.smarttechie.model.Product;
import org.smarttechie.repository.ProductRepository;
import org.smarttechie.service.ProductService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
public class ProductController {

    @Autowired
    private ProductService productService;

    /**
     * This endpoint allows to create a product.
     * @param product - to create
     * @return - the created product
     */
    @PostMapping("/product")
    @ResponseStatus(HttpStatus.CREATED)
    public Mono<Product> createProduct(@RequestBody Product product){
        return productService.save(product);
    }

    /**
     * This endpoint gives all the products
     * @return - the list of products available
     */
    @GetMapping("/products")
    public Flux<Product> getAllProducts(){
        return productService.getAllProducts();
    }
    /**
     * This endpoint allows to delete a product
     * @param id - to delete
     * @return
     */
    @DeleteMapping("/product/{id}")
    public Mono<Void> deleteProduct(@PathVariable int id){
        return productService.deleteProduct(id);
    }
    /**
     * This endpoint allows to update a product
     * @param product - to update
     * @return - the updated product
     */
    @PutMapping("product/{id}")
    public Mono<ResponseEntity<Product>> updateProduct(@RequestBody Product product){
        return productService.update(product);
    }
}

在构建反应式API时,我们可以使用功能风格的编程模型来构建API,而无需使用RestController。在这种情况下,我们需要具有一个路由器和一个处理程序组件,如下所示。

package org.smarttechie.router;
import org.smarttechie.handler.ProductHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import static org.springframework.web.reactive.function.server.RequestPredicates.*;
@Configuration
public class ProductRouter {
    /**
     * The router configuration for the product handler.
     * @param productHandler
     * @return
     */
    @Bean
    public RouterFunction<ServerResponse> productsRoute(ProductHandler productHandler){

        return RouterFunctions
                .route(GET("/products").and(accept(MediaType.APPLICATION_JSON))
                        ,productHandler::getAllProducts)
                .andRoute(POST("/product").and(accept(MediaType.APPLICATION_JSON))
                        ,productHandler::createProduct)
                .andRoute(DELETE("/product/{id}").and(accept(MediaType.APPLICATION_JSON))
                        ,productHandler::deleteProduct)
                .andRoute(PUT("/product/{id}").and(accept(MediaType.APPLICATION_JSON))
                        ,productHandler::updateProduct);
    }
}
package org.smarttechie.handler;
import org.smarttechie.model.Product;
import org.smarttechie.service.ProductService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
import static org.springframework.web.reactive.function.BodyInserters.fromObject;
@Component
public class ProductHandler {
    @Autowired
    private ProductService productService;
    static Mono<ServerResponse> notFound = ServerResponse.notFound().build();
    /**
     * The handler to get all the available products.
     * @param serverRequest
     * @return - all the products info as part of ServerResponse
     */
    public Mono<ServerResponse> getAllProducts(ServerRequest serverRequest) {
        return ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(productService.getAllProducts(), Product.class);
    }
    /**
     * The handler to create a product
     * @param serverRequest
     * @return - return the created product as part of ServerResponse
     */
    public Mono<ServerResponse> createProduct(ServerRequest serverRequest) {

        Mono<Product> productToSave = serverRequest.bodyToMono(Product.class);

        return productToSave.flatMap(product ->
                ServerResponse.ok()
                        .contentType(MediaType.APPLICATION_JSON)
                        .body(productService.save(product), Product.class));

    }
    /**
     * The handler to delete a product based on the product id.
     * @param serverRequest
     * @return - return the deleted product as part of ServerResponse
     */
    public Mono<ServerResponse> deleteProduct(ServerRequest serverRequest) {

        String id = serverRequest.pathVariable("id");
        Mono<Void> deleteItem = productService.deleteProduct(Integer.parseInt(id));

        return ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(deleteItem, Void.class);
    }

    /**
     * The handler to update a product.
     * @param serverRequest
     * @return - The updated product as part of ServerResponse
     */
    public Mono<ServerResponse> updateProduct(ServerRequest serverRequest) {
        return productService.update(serverRequest.bodyToMono(Product.class)).flatMap(product ->
                ServerResponse.ok()
                        .contentType(MediaType.APPLICATION_JSON)
                        .body(fromObject(product)))
                .switchIfEmpty(notFound);
    }
}

到目前为止,我们已经看到了如何公开响应式REST API。通过这种实现,我使用Gatling在反应式API和非反应式API(使用Spring RestController构建非反应式API)上做了一个简单的基准测试。以下是反应式和非反应式API之间的比较指标。这不是一个广泛的基准测试。因此,在采用之前,请确保对您的用例进行广泛的基准测试。

-reactive.png

Gatling负载测试脚本也可以在GitHub上获得,以供您参考。到此,我结束了有关“使用Spring WebFlux构建反应性REST API ”的文章。我们将在另一个主题上见面。到那时,快乐学习!!


原文链接:http://codingdict.com