Skip to content

Commit 79ac157

Browse files
committed
Spring Cloud Stream消费失败后的处理策略(二):自定义错误处理逻辑
1 parent a7fb2e8 commit 79ac157

4 files changed

Lines changed: 166 additions & 3 deletions

File tree

4-Finchley/pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313
<module>stream-consumer-self</module>
1414
<!-- 消息重试 -->
1515
<module>stream-exception-handler-1</module>
16+
<!-- 自定义错误处理逻辑 -->
17+
<module>stream-exception-handler-2</module>
1618

17-
<!-- 重新入队 -->
18-
19-
<!-- 自定义降级方法 -->
19+
<!-- 重新入队(RabbitMQ) -->
2020

2121
<!-- 使用DLQ队列(RabbitMQ) -->
2222

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4+
<modelVersion>4.0.0</modelVersion>
5+
6+
<groupId>com.didispace</groupId>
7+
<artifactId>stream-exception-handler-2</artifactId>
8+
<version>0.0.1-SNAPSHOT</version>
9+
<packaging>jar</packaging>
10+
11+
<parent>
12+
<groupId>org.springframework.boot</groupId>
13+
<artifactId>spring-boot-starter-parent</artifactId>
14+
<version>2.0.5.RELEASE</version>
15+
<relativePath/> <!-- lookup parent from repository -->
16+
</parent>
17+
18+
<properties>
19+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
20+
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
21+
<java.version>1.8</java.version>
22+
</properties>
23+
24+
<dependencies>
25+
<dependency>
26+
<groupId>org.projectlombok</groupId>
27+
<artifactId>lombok</artifactId>
28+
<version>1.18.2</version>
29+
</dependency>
30+
<dependency>
31+
<groupId>org.springframework.boot</groupId>
32+
<artifactId>spring-boot-starter-test</artifactId>
33+
<scope>test</scope>
34+
</dependency>
35+
36+
<dependency>
37+
<groupId>org.springframework.cloud</groupId>
38+
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
39+
</dependency>
40+
<dependency>
41+
<groupId>org.springframework.boot</groupId>
42+
<artifactId>spring-boot-starter-actuator</artifactId>
43+
</dependency>
44+
</dependencies>
45+
46+
<dependencyManagement>
47+
<dependencies>
48+
<dependency>
49+
<groupId>org.springframework.cloud</groupId>
50+
<artifactId>spring-cloud-dependencies</artifactId>
51+
<version>Finchley.SR1</version>
52+
<type>pom</type>
53+
<scope>import</scope>
54+
</dependency>
55+
</dependencies>
56+
</dependencyManagement>
57+
58+
<build>
59+
<plugins>
60+
<plugin>
61+
<groupId>org.springframework.boot</groupId>
62+
<artifactId>spring-boot-maven-plugin</artifactId>
63+
</plugin>
64+
</plugins>
65+
</build>
66+
</project>
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package com.didispace.stream;
2+
3+
import lombok.extern.slf4j.Slf4j;
4+
import org.springframework.beans.factory.annotation.Autowired;
5+
import org.springframework.boot.SpringApplication;
6+
import org.springframework.boot.autoconfigure.SpringBootApplication;
7+
import org.springframework.cloud.stream.annotation.EnableBinding;
8+
import org.springframework.cloud.stream.annotation.Input;
9+
import org.springframework.cloud.stream.annotation.Output;
10+
import org.springframework.cloud.stream.annotation.StreamListener;
11+
import org.springframework.integration.annotation.ServiceActivator;
12+
import org.springframework.integration.support.MessageBuilder;
13+
import org.springframework.messaging.Message;
14+
import org.springframework.messaging.MessageChannel;
15+
import org.springframework.messaging.SubscribableChannel;
16+
import org.springframework.stereotype.Component;
17+
import org.springframework.web.bind.annotation.GetMapping;
18+
import org.springframework.web.bind.annotation.RequestParam;
19+
import org.springframework.web.bind.annotation.RestController;
20+
21+
22+
@EnableBinding(TestApplication.TestTopic.class)
23+
@SpringBootApplication
24+
public class TestApplication {
25+
26+
public static void main(String[] args) {
27+
SpringApplication.run(TestApplication.class, args);
28+
}
29+
30+
@RestController
31+
static class TestController {
32+
33+
@Autowired
34+
private TestTopic testTopic;
35+
36+
/**
37+
* 消息生产接口
38+
*
39+
* @param message
40+
* @return
41+
*/
42+
@GetMapping("/sendMessage")
43+
public String messageWithMQ(@RequestParam String message) {
44+
testTopic.output().send(MessageBuilder.withPayload(message).build());
45+
return "ok";
46+
}
47+
48+
}
49+
50+
/**
51+
* 消息消费逻辑
52+
*/
53+
@Slf4j
54+
@Component
55+
static class TestListener {
56+
57+
@StreamListener(TestTopic.INPUT)
58+
public void receive(String payload) {
59+
log.info("Received payload : " + payload);
60+
throw new RuntimeException("Message consumer failed!");
61+
}
62+
63+
/**
64+
* 消息消费失败的降级处理逻辑
65+
*
66+
* @param message
67+
*/
68+
@ServiceActivator(inputChannel = "test-topic.stream-exception-handler.errors")
69+
public void error(Message<?> message) {
70+
log.info("Message consumer failed, call fallback!");
71+
}
72+
73+
}
74+
75+
interface TestTopic {
76+
77+
String OUTPUT = "example-topic-output";
78+
String INPUT = "example-topic-input";
79+
80+
@Output(OUTPUT)
81+
MessageChannel output();
82+
83+
@Input(INPUT)
84+
SubscribableChannel input();
85+
86+
}
87+
88+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
spring.application.name=stream-exception-handler-2
2+
server.port=8080
3+
4+
spring.cloud.stream.bindings.example-topic-input.destination=test-topic
5+
spring.cloud.stream.bindings.example-topic-input.group=stream-exception-handler
6+
spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1
7+
8+
spring.cloud.stream.bindings.example-topic-output.destination=test-topic
9+

0 commit comments

Comments
 (0)