Spring Integration
Full Reference: See adapters.md for File, HTTP, Kafka adapters, Error Handling, and Testing patterns.
Overview
┌─────────────────────────────────────────────────────────────────────┐
│ Spring Integration Flow │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ [Inbound] [Channel] [Transformer] [Channel] [Outbound] │
│ Adapter ──▶ ════════ ──▶ ┌─────────┐ ──▶ ════════ ──▶ Adapter │
│ (File, (Queue/ │ Convert │ (Direct/ (DB, │
│ HTTP, Direct) │ Enrich │ PubSub) Kafka, │
│ Kafka) └─────────┘ HTTP) │
│ │
└─────────────────────────────────────────────────────────────────────┘
Quick Start
<!-- pom.xml -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
</dependency>
@Configuration
@EnableIntegration
public class IntegrationConfig {
@Bean
public IntegrationFlow fileProcessingFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(new File("/input"))
.patternFilter("*.csv"),
e -> e.poller(Pollers.fixedDelay(1000)))
.transform(Files.toStringTransformer())
.handle((payload, headers) -> {
System.out.println("Processing: " + payload);
return payload;
})
.get();
}
}
Message & Channels
// Message structure
Message<String> message = MessageBuilder
.withPayload("Hello Integration")
.setHeader("contentType", "text/plain")
.setHeader("priority", 1)
.setCorrelationId(UUID.randomUUID())
.build();
// Channel types
@Configuration
public class ChannelConfig {
// Direct Channel (point-to-point, synchronous)
@Bean
public DirectChannel orderChannel() {
return new DirectChannel();
}
// Queue Channel (point-to-point, async with buffer)
@Bean
public QueueChannel processingQueue() {
return new QueueChannel(100);
}
// PublishSubscribe Channel (broadcast to all subscribers)
@Bean
public PublishSubscribeChannel notificationChannel() {
return new PublishSubscribeChannel();
}
// Executor Channel (async with thread pool)
@Bean
public ExecutorChannel asyncChannel() {
return new ExecutorChannel(Executors.newFixedThreadPool(10));
}
}
Gateway (Entry Point)
@MessagingGateway
public interface OrderGateway {
@Gateway(requestChannel = "orderChannel")
void submitOrder(Order order);
@Gateway(requestChannel = "orderChannel", replyChannel = "orderResponseChannel")
OrderConfirmation submitOrderAndWait(Order order);
@Gateway(requestChannel = "orderChannel", replyTimeout = 5000)
@Async
CompletableFuture<OrderConfirmation> submitOrderAsync(Order order);
}
@Service
@RequiredArgsConstructor
public class OrderService {
private final OrderGateway orderGateway;
public OrderConfirmation createOrder(CreateOrderRequest request) {
Order order = mapToOrder(request);
return orderGateway.submitOrderAndWait(order);
}
}
Integration Flow DSL
@Bean
public IntegrationFlow orderFlow() {
return IntegrationFlow
.from("orderChannel")
// Validation
.filter(Order.class, order -> order.getTotal().compareTo(BigDecimal.ZERO) > 0,
f -> f.discardChannel("invalidOrderChannel"))
// Enrichment
.enrich(e -> e
.requestChannel("customerLookupChannel")
.propertyExpression("customer", "payload"))
// Transformation
.transform(Order.class, order -> {
order.setStatus(OrderStatus.VALIDATED);
return order;
})
// Routing
.<Order, String>route(order ->
order.getTotal().compareTo(new BigDecimal("1000")) > 0
? "highValueOrder" : "standardOrder",
r -> r
.subFlowMapping("highValueOrder", sf -> sf
.handle("priorityOrderHandler", "process"))
.subFlowMapping("standardOrder", sf -> sf
.handle("standardOrderHandler", "process")))
.handle("orderRepository", "save")
.get();
}
Splitter & Aggregator
@Bean
public IntegrationFlow batchOrderFlow() {
return IntegrationFlow
.from("batchOrderChannel")
// Split batch into individual orders
.split(BatchOrder.class, BatchOrder::getOrders)
.channel(c -> c.executor(Executors.newFixedThreadPool(5)))
.handle("orderProcessor", "process")
// Aggregate results
.aggregate(a -> a
.correlationStrategy(m -> m.getHeaders().get("correlationId"))
.releaseStrategy(g -> g.size() == g.getSequenceSize())
.outputProcessor(g -> new BatchResult(
g.getMessages().stream()
.map(m -> (OrderResult) m.getPayload())
.toList()
))
.expireGroupsUponCompletion(true)
.groupTimeout(30000))
.get();
}
Transformers
@Bean
public IntegrationFlow transformFlow() {
return IntegrationFlow
.from("inputChannel")
.transform(String.class, String::toUpperCase)
.transform(Transformers.toJson())
.transform(Transformers.fromJson(Order.class))
.enrichHeaders(h -> h
.header("timestamp", Instant.now())
.headerExpression("orderValue", "payload.total"))
.channel("outputChannel")
.get();
}
Routers
// Header-based router
@Bean
public IntegrationFlow headerRouterFlow() {
return IntegrationFlow
.from("inboundChannel")
.<Message<?>, String>route(m -> m.getHeaders().get("type", String.class),
r -> r
.subFlowMapping("ORDER", sf -> sf.channel("orderChannel"))
.subFlowMapping("PAYMENT", sf -> sf.channel("paymentChannel"))
.defaultOutputChannel("unknownChannel"))
.get();
}
// Payload-based router
@Bean
public IntegrationFlow payloadRouterFlow() {
return IntegrationFlow
.from("orderChannel")
.<Order, OrderType>route(Order::getType,
r -> r
.subFlowMapping(OrderType.STANDARD, sf -> sf
.handle("standardProcessor", "process"))
.subFlowMapping(OrderType.EXPRESS, sf -> sf
.handle("expressProcessor", "process")))
.get();
}
Service Activator
@Component
public class OrderHandler {
@ServiceActivator(inputChannel = "orderChannel", outputChannel = "resultChannel")
public OrderResult processOrder(Order order,
@Header("priority") int priority) {
validateOrder(order);
calculateTotals(order);
return new OrderResult(order.getId(), "PROCESSED");
}
}
// DSL equivalent
@Bean
public IntegrationFlow serviceActivatorFlow() {
return IntegrationFlow
.from("orderChannel")
.handle(Order.class, (order, headers) -> {
return new OrderResult(order.getId(), "PROCESSED");
})
.channel("resultChannel")
.get();
}
Best Practices
| Do | Don't | |----|-------| | Use DSL for readable flows | Build flows with XML only | | Configure error channels | Ignore errors silently | | Implement retry with backoff | Fail on first error | | Use queue channels for decoupling | Block in synchronous handlers | | Monitor channel metrics | Deploy without observability |
Production Checklist
- [ ] Error channels configured
- [ ] Retry policies implemented
- [ ] Queue channel capacities set
- [ ] Idempotent receivers where needed
- [ ] Proper timeout configuration
- [ ] Dead letter channels for failures
Anti-Patterns
| Anti-Pattern | Problem | Solution | |--------------|---------|----------| | No error channel | Silent failures | Configure global errorChannel | | Blocking in DirectChannel | Thread exhaustion | Use QueueChannel or async | | Unbounded queue | Memory leak | Set queue capacity | | Missing idempotency | Duplicate processing | Use IdempotentReceiverInterceptor |
Quick Troubleshooting
| Problem | Fix | |---------|-----| | No channel found | Define @Bean for channel | | Dispatcher has no subscribers | Add handler to flow | | Reply timeout | Use nullChannel or return value | | Duplicate messages | Add IdempotentReceiverInterceptor |
微信扫一扫