Table of Contents

Creación de APIs reactivas: WebFlux

Introducción a WebFlux

https://dzone.com/articles/build-reactive-rest-apis-with-spring-webflux

Programación reactiva con WebFlux

Product.java
@Data
@AllArgsConstructor
@NoArgsConstructor
@Document("products")
public class Product implements Serializable {
  @Id
  private String id;
  @Field
  private String name;
}
ProductValidator.java
public class ProductValidator implements Validator {
 
  @Override
  public boolean supports(Class<?> clazz) {
    return Product.class.isAssignableFrom(clazz);
  }
 
  @Override
  public void validate(Object target, Errors errors) {
    ValidationUtils.rejectIfEmptyOrWhitespace(errors, "name", "field.required");
  }
}
ErrorResponse.java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ErrorResponse {
  private int code;
  private String message;
}
ProductRepository.java
@Repository
public interface ProductRepository extends ReactiveMongoRepository<Product, String> {
}
ProductService.java
@Service
public class ProductService {
 
  @Autowired
  private ProductRepository productRepository;
 
  public Flux<Product> getAllProducts() {
    return productRepository.findAll();
  }
 
  public Mono<Product> getProduct(String id) {
    return productRepository.findById(id);
  }
 
  public Mono<Product> save(Product product) {
    Product newProduct = new Product();
    newProduct.setName(product.getName());
    return productRepository.save(newProduct);
  }
 
  public Mono<Void> deleteProduct(String id) {
    return productRepository.deleteById(id);
  }
 
  public Mono<Product> update(Mono<Product> product) {
    return product
      .flatMap((p) -> productRepository.findById(p.getId())
      .flatMap(product1 -> {
        product1.setName(p.getName());
        return productRepository.save(product1);
    }));
  }
}
ProductHandler.java
@Component
public class ProductHandler {
 
  @Autowired
  private ProductService productService;
 
  static Mono<ServerResponse> notFound = ServerResponse.notFound().build();
  private final Validator validator = new ProductValidator();
 
  public Mono<ServerResponse> getAllProducts(ServerRequest serverRequest) {
    return ServerResponse.ok()
       .contentType(MediaType.APPLICATION_JSON)
       .body(productService.getAllProducts(), Product.class);
  }
 
  public Mono<ServerResponse> getProduct(ServerRequest serverRequest) {
    String id = serverRequest.pathVariable("id");
    return productService.getProduct(id)
       .flatMap(p -> ServerResponse.ok()
           .contentType(MediaType.APPLICATION_JSON)
           .body(Mono.just(p), Product.class))
       .switchIfEmpty(ServerResponse.status(HttpStatus.NOT_FOUND)
           .contentType(MediaType.APPLICATION_JSON)
           .body(Mono.just(new ErrorResponse(404, "Product not found")), ErrorResponse.class));
  }
 
  public Mono<ServerResponse> createProduct(ServerRequest serverRequest) {
    Mono<Product> productToSave = serverRequest.bodyToMono(Product.class)
      .doOnNext(this::validate);
 
    return productToSave.flatMap(product ->
       ServerResponse.status(HttpStatus.CREATED)
         .contentType(MediaType.APPLICATION_JSON)
         .body(productService.save(product), Product.class));
  }
 
  private void validate(Product product) {
    Errors errors = new BeanPropertyBindingResult(product, "product");
    validator.validate(product, errors);
    if (errors.hasErrors()) {
      throw new ResponseStatusException(HttpStatus.BAD_REQUEST,
         errors.getAllErrors().toString());
    }
  }
 
  public Mono<ServerResponse> deleteProduct(ServerRequest serverRequest) {
    String id = serverRequest.pathVariable("id");
    return productService.deleteProduct(id)
      .flatMap(product -> ServerResponse.noContent().build());
  }
 
  public Mono<ServerResponse> updateProduct(ServerRequest serverRequest) {
    return productService.update(serverRequest.bodyToMono(Product.class)).flatMap(product ->
       ServerResponse.ok()
         .contentType(MediaType.APPLICATION_JSON)
         .body(fromObject(product)))
         .switchIfEmpty(notFound);
  }
}
ProductRouter.java
@Configuration
public class ProductRouter {
 
  @Bean
  public RouterFunction<ServerResponse> productsRoute(ProductHandler productHandler){
    return RouterFunctions
      .route(GET("/products").and(accept(MediaType.APPLICATION_JSON)), productHandler::getAllProducts)
      .andRoute(GET("/product/{id}").and(accept(MediaType.APPLICATION_JSON)), productHandler::getProduct)
      .andRoute(POST("/products").and(accept(MediaType.APPLICATION_JSON)), productHandler::createProduct)
      .andRoute(DELETE("/product/{id}").and(accept(MediaType.APPLICATION_JSON)), productHandler::deleteProduct)
      .andRoute(PUT("/product/{id}").and(accept(MediaType.APPLICATION_JSON)), productHandler::updateProduct);
  }
}
ReactiveApi
@SpringBootApplication
public class ReactiveApi {
 
  public static void main(String[] args) {
    SpringApplication.run(ReactiveApi.class, args);
  }
}
application.properties
server.port=8080
 
spring.data.mongodb.host=localhost
spring.data.mongodb.port=27017
spring.data.mongodb.database=reactive-api
 
server.error.include-message=always

Consumir una API reactiva con WebClient

@Data
@NoArgsConstructor
@ToString
public class Product {
 
    private String id;
    private String name;
}
WebClient webClient = WebClient.create("http://localhost:8080");
Flux<Product> productsFlux = webClient.get()
  .uri("/products")
  .retrieve()
  .bodyToFlux(Product.class);
 
productsFlux.doOnError((System.out::println))
  .subscribeOn(Schedulers.fromExecutor(Executors.newCachedThreadPool()))
  .doOnComplete(() -> System.out.println("Terminado"))
  .subscribe((product) -> {
    System.out.println("Haciendo algo con " + product.getId() + " . . .");
  });
productsFlux.doOnError((System.out::println))
  .subscribeOn(Schedulers.fromExecutor(Executors.newCachedThreadPool()))
  .doOnComplete(() -> System.out.println("Terminado"))
  .subscribe((product) -> System.out.println("Consumidor 2: " + product.getName()));

© 2022-2024 Santiago Faci