apuntes:webflux
Table of Contents
Creación de APIs reactivas: WebFlux
Introducción a WebFlux
Programación reactiva con WebFlux
@Data @AllArgsConstructor @NoArgsConstructor @Document(value = "bikes") public class Bike { @Id private String id; @Field private boolean available; @Field @NotNull @PositiveOrZero private int kilometers; @Field @NotNull private int battery; @Field(name = "baby_chair") private boolean babyChair; @Field(name = "station_id") @Positive private int stationId; }
@Repository public interface BikeRepository extends ReactiveMongoRepository<Bike, String> { Flux<Bike> findAll(); Flux<Bike> findByStationId(int stationId); Flux<Bike> findByBabyChair(boolean babyChair); Flux<Bike> findByBatteryGreaterThan(int battery); }
@Service public class BikeServiceImpl implements BikeService { @Autowired private BikeRepository bikeRepository; @Override public Flux<Bike> findAllBikes() { return bikeRepository.findAll(); } @Override public Flux<Bike> findAllBikes(int stationId) { return bikeRepository.findByStationId(stationId); } @Override public Mono<Bike> findBike(String id) throws BikeNotFoundException { return bikeRepository.findById(id).onErrorReturn(new Bike()); } @Override public void repairBike(Bike bike) { } @Override public Mono<Bike> addBike(Bike bike) { return bikeRepository.save(bike); } @Override public Mono<Bike> deleteBike(String id) throws BikeNotFoundException { Mono<Bike> bike = bikeRepository.findById(id).onErrorReturn(new Bike()); bikeRepository.delete(bike.block()); return bike; } @Override public Mono<Bike> modifyBike(String id, Bike newBike) throws BikeNotFoundException { Mono<Bike> monoBike = bikeRepository.findById(id).onErrorReturn(new Bike()); Bike bike = monoBike.block(); bike.setAvailable(newBike.isAvailable()); bike.setBabyChair(newBike.isBabyChair()); bike.setBattery(newBike.getBattery()); bike.setKilometers(newBike.getKilometers()); bike.setStationId(newBike.getStationId()); return bikeRepository.save(bike); } }
@RestController public class BikeController { private final Logger logger = LoggerFactory.getLogger(BikeController.class); @Autowired private BikeService bikeService; @Autowired private RouteService routeService; @GetMapping("/bikes") public ResponseEntity<Flux<Bike>> getBikesByStationId(@RequestParam(name = "station", defaultValue = "0") int stationId) { Flux<Bike> bikes; if (stationId == 0) { bikes = bikeService.findAllBikes(); } else { bikes = bikeService.findAllBikes(stationId); } return ResponseEntity.ok(bikes); } @GetMapping("/bike/{id}") public ResponseEntity<Mono<Bike>> getBike(@PathVariable String id) throws BikeNotFoundException { Mono<Bike> bike = bikeService.findBike(id); return ResponseEntity.ok(bike); } @DeleteMapping("/bike/{id}") public ResponseEntity<Mono<Bike>> removeBike(@PathVariable String id) throws BikeNotFoundException { Mono<Bike> bike = bikeService.deleteBike(id); return ResponseEntity.ok(bike); } @PostMapping("/bikes") public ResponseEntity<Mono<Bike>> addBike(@Valid @RequestBody Bike bike) { Mono<Bike> newBike = bikeService.addBike(bike); return ResponseEntity.ok(newBike); } @PutMapping("/bike/{id}") public ResponseEntity<Mono<Bike>> modifyBike(@RequestBody Bike bike, @PathVariable String id) throws BikeNotFoundException { // TODO Falta controlar algún error 400 Mono<Bike> newBike = bikeService.modifyBike(id, bike); return ResponseEntity.ok(newBike); } @ExceptionHandler(BikeNotFoundException.class) public ResponseEntity<ErrorResponse> handleBikeNotFoundException(BikeNotFoundException bnfe) { ErrorResponse errorResponse = ErrorResponse.generalError(101, bnfe.getMessage()); logger.error(bnfe.getMessage(), bnfe); return new ResponseEntity<>(errorResponse, HttpStatus.NOT_FOUND); } }
@SpringBootApplication @EnableReactiveMongoRepositories public class BikesApplication { public static void main(String[] args) { SpringApplication.run(BikesApplication.class, args); } }
# Puerto donde escucha el servidor una vez se inicie server.port=8080 # Configuración MongoDB spring.data.mongodb.host=localhost spring.data.mongodb.port=27017 spring.data.mongodb.database=bikes-webflux
@Data @NoArgsConstructor @ToString public class Product { private String id; private String name; private String description; private String category; private float price; private LocalDateTime creationDate; }
WebClient webClient = WebClient.create("http://localhost:8080"); Flux<Bike> bikesFlux = webClient.get() .uri("/bikes") .retrieve() .bodyToFlux(Bike.class); bikesFlux.doOnError((System.out::println)) .subscribeOn(Schedulers.fromExecutor(Executors.newCachedThreadPool())) .doOnComplete(() -> System.out.println("Terminado")) .subscribe((bike) -> { System.out.println("Haciendo algo con " + bike.getId() + " . . ."); });
bikesFlux.doOnError((System.out::println)) .subscribeOn(Schedulers.fromExecutor(Executors.newCachedThreadPool())) .doOnComplete(() -> System.out.println("Terminado")) .subscribe((product) -> System.out.println("Consumidor 2: " + product.getName()));
System.out.println(bikesFlux.count().block());
© 2022-2023 Santiago Faci
apuntes/webflux.txt · Last modified: 09/10/2022 19:42 by Santiago Faci