Acceso a Datos

2º DAM - Curso 2021-2022

User Tools

Site Tools


apuntes:webflux

Creación de APIs reactivas: WebFlux

Introducción a WebFlux

Programación reactiva con WebFlux

@Data
@NoArgsConstructor
@Document(value = "products")
public class Product {
 
    @Id
    private String id;
    @Field
    private String name;
    @Field
    private String description;
    @Field
    private String category;
    @Field
    private float price;
    @Field(name = "creation_date")
    private LocalDateTime creationDate;
}
@Repository
public interface ProductRepository extends ReactiveMongoRepository<Product, Long> {
 
    Flux<Product> findAll();
    Mono<Product> findByName(String name);
}
@Service
public class ProductServiceImpl implements ProductService {
 
    @Autowired
    private ProductRepository productRepository;
 
    @Override
    public Flux<Product> findAllProducts() {
        return productRepository.findAll();
    }
 
    @Override
    public Mono<Product> findProductByName(String name) {
        return productRepository.findByName(name);
    }
 
    @Override
    public Mono<Product> findProduct(long id) {
        return productRepository.findById(id);
    }
}
@RestController
public class ProductController {
 
    private final Logger logger = LoggerFactory.getLogger(ProductController.class);
 
    @Autowired
    private ProductService productService;
 
    @GetMapping(value = "/products", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Product> getProducts() {
        return productService.findAllProducts();
    }
 
    @GetMapping("/products/{id}")
    public Mono<Product> getProduct(@PathParam("id") long id) {
        return productService.findProduct(id);
    }
}
@SpringBootApplication
@EnableReactiveMongoRepositories
public class MyshopApplication {
 
	public static void main(String[] args) {
		SpringApplication.run(MyshopApplication.class, args);
	}
}
# Puerto donde escucha el servidor una vez se inicie
server.port=8081
 
# Configuración MongoDB
spring.data.mongodb.host=localhost
spring.data.mongodb.port=27017
spring.data.mongodb.database=myshop-webflux
@Data
@NoArgsConstructor
@ToString
public class Product {
 
    private String id;
    private String name;
    private String description;
    private String category;
    private float price;
    private LocalDateTime creationDate;
}
WebClient webClient = WebClient.create("http://localhost:8081");
        Flux<Product> productsFlux = webClient.get()
                .uri("/products")
                .retrieve()
                .bodyToFlux(Product.class);
 
        productsFlux.doOnError((System.out::println))
                .subscribeOn(Schedulers.fromExecutor(Executors.newCachedThreadPool()))
                .doOnComplete(() -> System.out.println("Terminado"))
                .subscribe((product) -> {
                    // Simulamos una operación costosa en tiempo
                    try {
                        System.out.println("Haciendo algo con " + product.getName() + " . . .");
                        Thread.sleep(5000);
                    } catch (InterruptedException ie) {
                        ie.printStackTrace();
                    }
 
                    System.out.println("Consumidor 1: " + product.getName());
                });
productsFlux.doOnError((System.out::println))
                .subscribeOn(Schedulers.fromExecutor(Executors.newCachedThreadPool()))
                .doOnComplete(() -> System.out.println("Terminado"))
                .subscribe((product) -> System.out.println("Consumidor 2: " + product.getName()));
System.out.println(productsFlux.count().block());

© 2022-2022 Santiago Faci

apuntes/webflux.txt · Last modified: 13/02/2022 17:34 by Santiago Faci