Published on

10 - Reactive Programming with Spring WebFlux

Authors
  • avatar
    Name
    Samreach YAN
    Twitter

Table of Contents

  1. Project Setup
  2. Docker Compose for PostgreSQL
  3. Reactive Programming Paradigm
  4. Reactor Core
  5. Reactive Controllers
  6. Reactive Data Access with Postgres
  7. Reactive Spring Security
  8. Reactive WebClient
  9. Testing Reactive Components

Prerequisites

  • JDK 17 or later
  • Maven 3.8+
  • Docker (for PostgreSQL)
  • Basic knowledge of Spring Boot and reactive programming

Project Setup

Folder Structure

spring-reactive-advanced/
├── src/
│   ├── main/
│   │   ├── java/com/example/reactive/
│   │   │   ├── config/
│   │   │   ├── controller/
│   │   │   ├── dto/
│   │   │   ├── exception/
│   │   │   ├── model/
│   │   │   ├── repository/
│   │   │   ├── security/
│   │   │   ├── service/
│   │   │   └── Application.java
│   │   └── resources/
│   │       ├── application.yml
│   │       └── schema.sql
│   └── test/
│       └── java/com/example/reactive/
├── docker-compose.yml
└── pom.xml

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.0</version>
        <relativePath/>
    </parent>

    <groupId>com.example</groupId>
    <artifactId>reactive-advanced</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-reactive-advanced</name>
    <description>Demo project for Spring Reactive Advanced</description>

    <properties>
        <java.version>17</java.version>
        <r2dbc.version>1.0.0.RELEASE</r2dbc.version>
    </properties>

    <dependencies>
        <!-- Spring Reactive Web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <!-- R2DBC PostgreSQL -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-r2dbc</artifactId>
        </dependency>
        <dependency>
            <groupId>io.r2dbc</groupId>
            <artifactId>r2dbc-postgresql</artifactId>
            <version>${r2dbc.version}</version>
        </dependency>

        <!-- Reactive Security -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-security</artifactId>
        </dependency>

        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!-- Testing -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

Docker Compose for PostgreSQL

docker-compose.yml

services:
  postgres:
    image: postgres:15-alpine
    container_name: reactive_postgres
    environment:
      POSTGRES_DB: reactive_db
      POSTGRES_USER: reactive_user
      POSTGRES_PASSWORD: reactive_pass
    ports:
      - '5432:5432'
    volumes:
      - postgres_data:/var/lib/postgresql/data
    healthcheck:
      test: ['CMD-SHELL', 'pg_isready -U reactive_user -d reactive_db']
      interval: 5s
      timeout: 5s
      retries: 5

volumes:
  postgres_data:

Run with:

docker-compose up -d

Reactive Programming Paradigm

Reactive programming is a programming paradigm oriented around data flows and the propagation of change. In Spring, we use Project Reactor which provides the Mono and Flux types.

Key concepts:

  • Publisher: Emits data (Flux for 0..N, Mono for 0..1)
  • Subscriber: Consumes data
  • Backpressure: Consumer controls the flow
  • Non-blocking: Asynchronous processing

Reactor Core

Basic Reactor Types

// Mono example
Mono<String> mono = Mono.just("Hello")
    .map(String::toUpperCase)
    .log();

// Flux example
Flux<Integer> flux = Flux.range(1, 5)
    .filter(i -> i % 2 == 0)
    .map(i -> i * 2);

Combining Publishers

// Merge flux
Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);
Flux<Integer> mergedFlux = Flux.merge(flux1, flux2);

// Zip mono
Mono<String> mono1 = Mono.just("Hello");
Mono<String> mono2 = Mono.just("World");
Mono<String> zippedMono = Mono.zip(mono1, mono2)
    .map(tuple -> tuple.getT1() + " " + tuple.getT2());

Error Handling

Flux.just(1, 2, 0, 4)
    .map(i -> 10 / i)
    .onErrorResume(e -> {
        System.err.println("Error: " + e.getMessage());
        return Flux.just(-1);
    })
    .subscribe();

Reactive Controllers

Model Class

package com.example.reactive.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Table;

@Data
@NoArgsConstructor
@AllArgsConstructor
@Table("products")
public class Product {
    @Id
    private Long id;
    private String name;
    private Double price;
}

DTO Class

package com.example.reactive.dto;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class ProductDto {
    private String name;
    private Double price;
}

Controller

package com.example.reactive.controller;

import com.example.reactive.dto.ProductDto;
import com.example.reactive.model.Product;
import com.example.reactive.service.ProductService;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController
@RequestMapping("/api/products")
@RequiredArgsConstructor
public class ProductController {

    private final ProductService productService;

    @GetMapping
    public Flux<Product> getAllProducts() {
        return productService.getAllProducts();
    }

    @GetMapping("/{id}")
    public Mono<Product> getProductById(@PathVariable Long id) {
        return productService.getProductById(id);
    }

    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    public Mono<Product> createProduct(@RequestBody ProductDto productDto) {
        return productService.createProduct(productDto);
    }

    @PutMapping("/{id}")
    public Mono<Product> updateProduct(@PathVariable Long id, @RequestBody ProductDto productDto) {
        return productService.updateProduct(id, productDto);
    }

    @DeleteMapping("/{id}")
    @ResponseStatus(HttpStatus.NO_CONTENT)
    public Mono<Void> deleteProduct(@PathVariable Long id) {
        return productService.deleteProduct(id);
    }
}

Reactive Data Access with Postgres

application.yml

spring:
  r2dbc:
    url: r2dbc:postgresql://localhost:5432/reactive_db
    username: reactive_user
    password: reactive_pass
    pool:
      initial-size: 5
      max-size: 10
  sql:
    init:
      mode: always
  security:
    user:
      name: admin
      password: password
      roles: ADMIN

schema.sql

CREATE TABLE IF NOT EXISTS products (
    id SERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    price DECIMAL(10, 2) NOT NULL
);

Repository

package com.example.reactive.repository;

import com.example.reactive.model.Product;
import org.springframework.data.r2dbc.repository.R2dbcRepository;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;

@Repository
public interface ProductRepository extends R2dbcRepository<Product, Long> {
    Flux<Product> findByNameContainingIgnoreCase(String name);
}

Service

package com.example.reactive.service;

import com.example.reactive.dto.ProductDto;
import com.example.reactive.model.Product;
import com.example.reactive.repository.ProductRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Service
@RequiredArgsConstructor
public class ProductService {

    private final ProductRepository productRepository;

    public Flux<Product> getAllProducts() {
        return productRepository.findAll();
    }

    public Mono<Product> getProductById(Long id) {
        return productRepository.findById(id);
    }

    public Mono<Product> createProduct(ProductDto productDto) {
        Product product = new Product();
        product.setName(productDto.getName());
        product.setPrice(productDto.getPrice());
        return productRepository.save(product);
    }

    public Mono<Product> updateProduct(Long id, ProductDto productDto) {
        return productRepository.findById(id)
                .flatMap(existingProduct -> {
                    existingProduct.setName(productDto.getName());
                    existingProduct.setPrice(productDto.getPrice());
                    return productRepository.save(existingProduct);
                });
    }

    public Mono<Void> deleteProduct(Long id) {
        return productRepository.deleteById(id);
    }
}

Reactive Spring Security

Security Config

package com.example.reactive.security;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.core.userdetails.MapReactiveUserDetailsService;
import org.springframework.security.core.userdetails.User;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.security.web.server.SecurityWebFilterChain;

@Configuration
@EnableWebFluxSecurity
public class SecurityConfig {

    @Bean
    public SecurityWebFilterChain securityWebFilterChain(ServerHttpSecurity http) {
        return http
                .csrf(ServerHttpSecurity.CsrfSpec::disable)
                .authorizeExchange(exchanges -> exchanges
                        .pathMatchers("/api/products/**").authenticated()
                        .anyExchange().permitAll()
                )
                .httpBasic(ServerHttpSecurity.HttpBasicSpec::disable)
                .formLogin(ServerHttpSecurity.FormLoginSpec::disable)
                .build();
    }

    @Bean
    public MapReactiveUserDetailsService userDetailsService() {
        UserDetails user = User.withUsername("user")
                .password(passwordEncoder().encode("password"))
                .roles("USER")
                .build();

        UserDetails admin = User.withUsername("admin")
                .password(passwordEncoder().encode("password"))
                .roles("ADMIN")
                .build();

        return new MapReactiveUserDetailsService(user, admin);
    }

    @Bean
    public PasswordEncoder passwordEncoder() {
        return new BCryptPasswordEncoder();
    }
}

Reactive WebClient

WebClient Config

package com.example.reactive.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;

@Configuration
public class WebClientConfig {

    @Bean
    public WebClient webClient() {
        return WebClient.builder()
                .baseUrl("https://jsonplaceholder.typicode.com")
                .build();
    }
}

Service using WebClient

package com.example.reactive.service;

import com.example.reactive.dto.PostDto;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;

@Service
@RequiredArgsConstructor
public class ExternalApiService {

    private final WebClient webClient;

    public Flux<PostDto> fetchPosts() {
        return webClient.get()
                .uri("/posts")
                .retrieve()
                .bodyToFlux(PostDto.class);
    }
}

DTO for External API

package com.example.reactive.dto;

import lombok.Data;

@Data
public class PostDto {
    private Long id;
    private Long userId;
    private String title;
    private String body;
}

Controller for External API

package com.example.reactive.controller;

import com.example.reactive.dto.PostDto;
import com.example.reactive.service.ExternalApiService;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;

@RestController
@RequestMapping("/api/external")
@RequiredArgsConstructor
public class ExternalApiController {

    private final ExternalApiService externalApiService;

    @GetMapping("/posts")
    public Flux<PostDto> getPosts() {
        return externalApiService.fetchPosts();
    }
}

Testing Reactive Components

Test Dependencies

Make sure these are in your pom.xml:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>

Repository Test

package com.example.reactive.repository;

import com.example.reactive.model.Product;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.data.r2dbc.DataR2dbcTest;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.math.BigDecimal;

@DataR2dbcTest
class ProductRepositoryTest {

    @Autowired
    private ProductRepository productRepository;

    @Test
    void shouldSaveAndFindProduct() {
        Product product = new Product();
        product.setName("Test Product");
        product.setPrice(19.99);

        Mono<Product> productMono = productRepository.save(product)
                .then(productRepository.findById(product.getId()));

        StepVerifier.create(productMono)
                .expectNextMatches(p -> p.getName().equals("Test Product") && p.getPrice() == 19.99)
                .verifyComplete();
    }
}

Controller Test

package com.example.reactive.controller;

import com.example.reactive.dto.ProductDto;
import com.example.reactive.model.Product;
import com.example.reactive.service.ProductService;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.http.MediaType;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;

@WebFluxTest(ProductController.class)
class ProductControllerTest {

    @Autowired
    private WebTestClient webTestClient;

    @MockBean
    private ProductService productService;

    @Test
    void shouldGetAllProducts() {
        Product product1 = new Product(1L, "Product 1", 10.0);
        Product product2 = new Product(2L, "Product 2", 20.0);

        Mockito.when(productService.getAllProducts()).thenReturn(Flux.fromIterable(List.of(product1, product2)));

        webTestClient.get()
                .uri("/api/products")
                .exchange()
                .expectStatus().isOk()
                .expectBodyList(Product.class)
                .hasSize(2)
                .contains(product1, product2);
    }

    @Test
    void shouldCreateProduct() {
        ProductDto productDto = new ProductDto("New Product", 30.0);
        Product product = new Product(1L, "New Product", 30.0);

        Mockito.when(productService.createProduct(productDto)).thenReturn(Mono.just(product));

        webTestClient.post()
                .uri("/api/products")
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(productDto)
                .exchange()
                .expectStatus().isCreated()
                .expectBody(Product.class)
                .isEqualTo(product);
    }
}

Service Test

package com.example.reactive.service;

import com.example.reactive.dto.ProductDto;
import com.example.reactive.model.Product;
import com.example.reactive.repository.ProductRepository;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class ProductServiceTest {

    @Mock
    private ProductRepository productRepository;

    @InjectMocks
    private ProductService productService;

    @Test
    void shouldReturnAllProducts() {
        Product product1 = new Product(1L, "Product 1", 10.0);
        Product product2 = new Product(2L, "Product 2", 20.0);

        when(productRepository.findAll()).thenReturn(Flux.just(product1, product2));

        Flux<Product> result = productService.getAllProducts();

        StepVerifier.create(result)
                .expectNext(product1)
                .expectNext(product2)
                .verifyComplete();
    }

    @Test
    void shouldCreateProduct() {
        ProductDto productDto = new ProductDto("New Product", 30.0);
        Product product = new Product(null, "New Product", 30.0);
        Product savedProduct = new Product(1L, "New Product", 30.0);

        when(productRepository.save(product)).thenReturn(Mono.just(savedProduct));

        Mono<Product> result = productService.createProduct(productDto);

        StepVerifier.create(result)
                .expectNext(savedProduct)
                .verifyComplete();
    }
}

WebClient Test

package com.example.reactive.service;

import com.example.reactive.config.WebClientConfig;
import com.example.reactive.dto.PostDto;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

import java.io.IOException;

class ExternalApiServiceTest {

    private static MockWebServer mockWebServer;
    private static ExternalApiService externalApiService;

    @BeforeAll
    static void setUp() throws IOException {
        mockWebServer = new MockWebServer();
        mockWebServer.start();

        WebClient webClient = WebClient.builder()
                .baseUrl(mockWebServer.url("/").toString())
                .build();

        externalApiService = new ExternalApiService(webClient);
    }

    @AfterAll
    static void tearDown() throws IOException {
        mockWebServer.shutdown();
    }

    @Test
    void shouldFetchPosts() {
        String responseBody = "[{\"id\":1,\"userId\":1,\"title\":\"Test Title\",\"body\":\"Test Body\"}]";

        mockWebServer.enqueue(new MockResponse()
                .setBody(responseBody)
                .addHeader("Content-Type", "application/json"));

        Flux<PostDto> posts = externalApiService.fetchPosts();

        StepVerifier.create(posts)
                .expectNextMatches(post ->
                    post.getId() == 1L &&
                    post.getTitle().equals("Test Title"))
                .verifyComplete();
    }
}

Conclusion

This tutorial covered all the essential aspects of building a reactive Spring Boot application:

  1. Set up a reactive Spring Boot project
  2. Configured PostgreSQL with R2DBC
  3. Implemented reactive controllers and services
  4. Added reactive security
  5. Used WebClient for external API calls
  6. Tested all reactive components

The complete code is production-ready and follows best practices for reactive programming in Spring. You can extend this foundation to build more complex reactive applications.