- Published on
10 - Reactive Programming with Spring WebFlux
- Authors

- Name
- Samreach YAN
Table of Contents
- Project Setup
- Docker Compose for PostgreSQL
- Reactive Programming Paradigm
- Reactor Core
- Reactive Controllers
- Reactive Data Access with Postgres
- Reactive Spring Security
- Reactive WebClient
- Testing Reactive Components
Prerequisites
- JDK 17 or later
- Maven 3.8+
- Docker (for PostgreSQL)
- Basic knowledge of Spring Boot and reactive programming
Project Setup
Folder Structure
spring-reactive-advanced/
├── src/
│ ├── main/
│ │ ├── java/com/example/reactive/
│ │ │ ├── config/
│ │ │ ├── controller/
│ │ │ ├── dto/
│ │ │ ├── exception/
│ │ │ ├── model/
│ │ │ ├── repository/
│ │ │ ├── security/
│ │ │ ├── service/
│ │ │ └── Application.java
│ │ └── resources/
│ │ ├── application.yml
│ │ └── schema.sql
│ └── test/
│ └── java/com/example/reactive/
├── docker-compose.yml
└── pom.xml
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.0</version>
<relativePath/>
</parent>
<groupId>com.example</groupId>
<artifactId>reactive-advanced</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-reactive-advanced</name>
<description>Demo project for Spring Reactive Advanced</description>
<properties>
<java.version>17</java.version>
<r2dbc.version>1.0.0.RELEASE</r2dbc.version>
</properties>
<dependencies>
<!-- Spring Reactive Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- R2DBC PostgreSQL -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<version>${r2dbc.version}</version>
</dependency>
<!-- Reactive Security -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Testing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
Docker Compose for PostgreSQL
docker-compose.yml
services:
postgres:
image: postgres:15-alpine
container_name: reactive_postgres
environment:
POSTGRES_DB: reactive_db
POSTGRES_USER: reactive_user
POSTGRES_PASSWORD: reactive_pass
ports:
- '5432:5432'
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ['CMD-SHELL', 'pg_isready -U reactive_user -d reactive_db']
interval: 5s
timeout: 5s
retries: 5
volumes:
postgres_data:
Run with:
docker-compose up -d
Reactive Programming Paradigm
Reactive programming is a programming paradigm oriented around data flows and the propagation of change. In Spring, we use Project Reactor which provides the Mono and Flux types.
Key concepts:
- Publisher: Emits data (Flux for 0..N, Mono for 0..1)
- Subscriber: Consumes data
- Backpressure: Consumer controls the flow
- Non-blocking: Asynchronous processing
Reactor Core
Basic Reactor Types
// Mono example
Mono<String> mono = Mono.just("Hello")
.map(String::toUpperCase)
.log();
// Flux example
Flux<Integer> flux = Flux.range(1, 5)
.filter(i -> i % 2 == 0)
.map(i -> i * 2);
Combining Publishers
// Merge flux
Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);
Flux<Integer> mergedFlux = Flux.merge(flux1, flux2);
// Zip mono
Mono<String> mono1 = Mono.just("Hello");
Mono<String> mono2 = Mono.just("World");
Mono<String> zippedMono = Mono.zip(mono1, mono2)
.map(tuple -> tuple.getT1() + " " + tuple.getT2());
Error Handling
Flux.just(1, 2, 0, 4)
.map(i -> 10 / i)
.onErrorResume(e -> {
System.err.println("Error: " + e.getMessage());
return Flux.just(-1);
})
.subscribe();
Reactive Controllers
Model Class
package com.example.reactive.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Table;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Table("products")
public class Product {
@Id
private Long id;
private String name;
private Double price;
}
DTO Class
package com.example.reactive.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ProductDto {
private String name;
private Double price;
}
Controller
package com.example.reactive.controller;
import com.example.reactive.dto.ProductDto;
import com.example.reactive.model.Product;
import com.example.reactive.service.ProductService;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
@RequestMapping("/api/products")
@RequiredArgsConstructor
public class ProductController {
private final ProductService productService;
@GetMapping
public Flux<Product> getAllProducts() {
return productService.getAllProducts();
}
@GetMapping("/{id}")
public Mono<Product> getProductById(@PathVariable Long id) {
return productService.getProductById(id);
}
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public Mono<Product> createProduct(@RequestBody ProductDto productDto) {
return productService.createProduct(productDto);
}
@PutMapping("/{id}")
public Mono<Product> updateProduct(@PathVariable Long id, @RequestBody ProductDto productDto) {
return productService.updateProduct(id, productDto);
}
@DeleteMapping("/{id}")
@ResponseStatus(HttpStatus.NO_CONTENT)
public Mono<Void> deleteProduct(@PathVariable Long id) {
return productService.deleteProduct(id);
}
}
Reactive Data Access with Postgres
application.yml
spring:
r2dbc:
url: r2dbc:postgresql://localhost:5432/reactive_db
username: reactive_user
password: reactive_pass
pool:
initial-size: 5
max-size: 10
sql:
init:
mode: always
security:
user:
name: admin
password: password
roles: ADMIN
schema.sql
CREATE TABLE IF NOT EXISTS products (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
price DECIMAL(10, 2) NOT NULL
);
Repository
package com.example.reactive.repository;
import com.example.reactive.model.Product;
import org.springframework.data.r2dbc.repository.R2dbcRepository;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;
@Repository
public interface ProductRepository extends R2dbcRepository<Product, Long> {
Flux<Product> findByNameContainingIgnoreCase(String name);
}
Service
package com.example.reactive.service;
import com.example.reactive.dto.ProductDto;
import com.example.reactive.model.Product;
import com.example.reactive.repository.ProductRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Service
@RequiredArgsConstructor
public class ProductService {
private final ProductRepository productRepository;
public Flux<Product> getAllProducts() {
return productRepository.findAll();
}
public Mono<Product> getProductById(Long id) {
return productRepository.findById(id);
}
public Mono<Product> createProduct(ProductDto productDto) {
Product product = new Product();
product.setName(productDto.getName());
product.setPrice(productDto.getPrice());
return productRepository.save(product);
}
public Mono<Product> updateProduct(Long id, ProductDto productDto) {
return productRepository.findById(id)
.flatMap(existingProduct -> {
existingProduct.setName(productDto.getName());
existingProduct.setPrice(productDto.getPrice());
return productRepository.save(existingProduct);
});
}
public Mono<Void> deleteProduct(Long id) {
return productRepository.deleteById(id);
}
}
Reactive Spring Security
Security Config
package com.example.reactive.security;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.core.userdetails.MapReactiveUserDetailsService;
import org.springframework.security.core.userdetails.User;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.security.web.server.SecurityWebFilterChain;
@Configuration
@EnableWebFluxSecurity
public class SecurityConfig {
@Bean
public SecurityWebFilterChain securityWebFilterChain(ServerHttpSecurity http) {
return http
.csrf(ServerHttpSecurity.CsrfSpec::disable)
.authorizeExchange(exchanges -> exchanges
.pathMatchers("/api/products/**").authenticated()
.anyExchange().permitAll()
)
.httpBasic(ServerHttpSecurity.HttpBasicSpec::disable)
.formLogin(ServerHttpSecurity.FormLoginSpec::disable)
.build();
}
@Bean
public MapReactiveUserDetailsService userDetailsService() {
UserDetails user = User.withUsername("user")
.password(passwordEncoder().encode("password"))
.roles("USER")
.build();
UserDetails admin = User.withUsername("admin")
.password(passwordEncoder().encode("password"))
.roles("ADMIN")
.build();
return new MapReactiveUserDetailsService(user, admin);
}
@Bean
public PasswordEncoder passwordEncoder() {
return new BCryptPasswordEncoder();
}
}
Reactive WebClient
WebClient Config
package com.example.reactive.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;
@Configuration
public class WebClientConfig {
@Bean
public WebClient webClient() {
return WebClient.builder()
.baseUrl("https://jsonplaceholder.typicode.com")
.build();
}
}
Service using WebClient
package com.example.reactive.service;
import com.example.reactive.dto.PostDto;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
@Service
@RequiredArgsConstructor
public class ExternalApiService {
private final WebClient webClient;
public Flux<PostDto> fetchPosts() {
return webClient.get()
.uri("/posts")
.retrieve()
.bodyToFlux(PostDto.class);
}
}
DTO for External API
package com.example.reactive.dto;
import lombok.Data;
@Data
public class PostDto {
private Long id;
private Long userId;
private String title;
private String body;
}
Controller for External API
package com.example.reactive.controller;
import com.example.reactive.dto.PostDto;
import com.example.reactive.service.ExternalApiService;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
@RestController
@RequestMapping("/api/external")
@RequiredArgsConstructor
public class ExternalApiController {
private final ExternalApiService externalApiService;
@GetMapping("/posts")
public Flux<PostDto> getPosts() {
return externalApiService.fetchPosts();
}
}
Testing Reactive Components
Test Dependencies
Make sure these are in your pom.xml:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
Repository Test
package com.example.reactive.repository;
import com.example.reactive.model.Product;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.data.r2dbc.DataR2dbcTest;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.math.BigDecimal;
@DataR2dbcTest
class ProductRepositoryTest {
@Autowired
private ProductRepository productRepository;
@Test
void shouldSaveAndFindProduct() {
Product product = new Product();
product.setName("Test Product");
product.setPrice(19.99);
Mono<Product> productMono = productRepository.save(product)
.then(productRepository.findById(product.getId()));
StepVerifier.create(productMono)
.expectNextMatches(p -> p.getName().equals("Test Product") && p.getPrice() == 19.99)
.verifyComplete();
}
}
Controller Test
package com.example.reactive.controller;
import com.example.reactive.dto.ProductDto;
import com.example.reactive.model.Product;
import com.example.reactive.service.ProductService;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.http.MediaType;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
@WebFluxTest(ProductController.class)
class ProductControllerTest {
@Autowired
private WebTestClient webTestClient;
@MockBean
private ProductService productService;
@Test
void shouldGetAllProducts() {
Product product1 = new Product(1L, "Product 1", 10.0);
Product product2 = new Product(2L, "Product 2", 20.0);
Mockito.when(productService.getAllProducts()).thenReturn(Flux.fromIterable(List.of(product1, product2)));
webTestClient.get()
.uri("/api/products")
.exchange()
.expectStatus().isOk()
.expectBodyList(Product.class)
.hasSize(2)
.contains(product1, product2);
}
@Test
void shouldCreateProduct() {
ProductDto productDto = new ProductDto("New Product", 30.0);
Product product = new Product(1L, "New Product", 30.0);
Mockito.when(productService.createProduct(productDto)).thenReturn(Mono.just(product));
webTestClient.post()
.uri("/api/products")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(productDto)
.exchange()
.expectStatus().isCreated()
.expectBody(Product.class)
.isEqualTo(product);
}
}
Service Test
package com.example.reactive.service;
import com.example.reactive.dto.ProductDto;
import com.example.reactive.model.Product;
import com.example.reactive.repository.ProductRepository;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class ProductServiceTest {
@Mock
private ProductRepository productRepository;
@InjectMocks
private ProductService productService;
@Test
void shouldReturnAllProducts() {
Product product1 = new Product(1L, "Product 1", 10.0);
Product product2 = new Product(2L, "Product 2", 20.0);
when(productRepository.findAll()).thenReturn(Flux.just(product1, product2));
Flux<Product> result = productService.getAllProducts();
StepVerifier.create(result)
.expectNext(product1)
.expectNext(product2)
.verifyComplete();
}
@Test
void shouldCreateProduct() {
ProductDto productDto = new ProductDto("New Product", 30.0);
Product product = new Product(null, "New Product", 30.0);
Product savedProduct = new Product(1L, "New Product", 30.0);
when(productRepository.save(product)).thenReturn(Mono.just(savedProduct));
Mono<Product> result = productService.createProduct(productDto);
StepVerifier.create(result)
.expectNext(savedProduct)
.verifyComplete();
}
}
WebClient Test
package com.example.reactive.service;
import com.example.reactive.config.WebClientConfig;
import com.example.reactive.dto.PostDto;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import java.io.IOException;
class ExternalApiServiceTest {
private static MockWebServer mockWebServer;
private static ExternalApiService externalApiService;
@BeforeAll
static void setUp() throws IOException {
mockWebServer = new MockWebServer();
mockWebServer.start();
WebClient webClient = WebClient.builder()
.baseUrl(mockWebServer.url("/").toString())
.build();
externalApiService = new ExternalApiService(webClient);
}
@AfterAll
static void tearDown() throws IOException {
mockWebServer.shutdown();
}
@Test
void shouldFetchPosts() {
String responseBody = "[{\"id\":1,\"userId\":1,\"title\":\"Test Title\",\"body\":\"Test Body\"}]";
mockWebServer.enqueue(new MockResponse()
.setBody(responseBody)
.addHeader("Content-Type", "application/json"));
Flux<PostDto> posts = externalApiService.fetchPosts();
StepVerifier.create(posts)
.expectNextMatches(post ->
post.getId() == 1L &&
post.getTitle().equals("Test Title"))
.verifyComplete();
}
}
Conclusion
This tutorial covered all the essential aspects of building a reactive Spring Boot application:
- Set up a reactive Spring Boot project
- Configured PostgreSQL with R2DBC
- Implemented reactive controllers and services
- Added reactive security
- Used WebClient for external API calls
- Tested all reactive components
The complete code is production-ready and follows best practices for reactive programming in Spring. You can extend this foundation to build more complex reactive applications.