From 06a5cc9b02b0c8354d264650418f338c8eeed387 Mon Sep 17 00:00:00 2001 From: terrfly Date: Fri, 23 Jul 2021 17:32:32 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E6=88=90components.mq=20=E7=BB=84?= =?UTF-8?q?=E4=BB=B6=EF=BC=8C=20=E5=AE=8C=E6=88=90activeMQ=E5=92=8CrabbitM?= =?UTF-8?q?Q=E7=9A=84=E5=B0=81=E8=A3=85=EF=BC=8C=20=E5=AF=B9=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E7=94=9F=E4=BA=A7=E8=80=85=E5=92=8C=E6=B6=88=E8=B4=B9?= =?UTF-8?q?=E8=80=85=E7=9A=84=E9=85=8D=E7=BD=AE=E9=80=8F=E6=98=8E=E5=8C=96?= =?UTF-8?q?=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mq/constant/MQSendTypeEnum.java | 29 +++++ .../components/mq/constant/MQVenderCS.java | 33 ++++++ .../mq/executor/MqThreadExecutor.java | 60 ++++++++++ .../components/mq/model/AbstractMQ.java | 38 ++++++ .../mq/model/PayOrderMchNotifyMQ.java | 86 ++++++++++++++ .../mq/model/PayOrderReissueMQ.java | 86 ++++++++++++++ .../components/mq/model/ResetAppConfigMQ.java | 82 +++++++++++++ .../components/mq/vender/IMQMsgReceiver.java | 29 +++++ .../components/mq/vender/IMQSender.java | 35 ++++++ .../mq/vender/activemq/ActiveMQConfig.java | 76 ++++++++++++ .../mq/vender/activemq/ActiveMQSender.java | 62 ++++++++++ .../PayOrderMchNotifyActiveMQReceiver.java | 52 +++++++++ .../ResetAppConfigActiveMQReceiver.java | 50 ++++++++ .../rabbitmq/RabbitMQBeanProcessor.java | 67 +++++++++++ .../mq/vender/rabbitmq/RabbitMQConfig.java | 110 ++++++++++++++++++ .../mq/vender/rabbitmq/RabbitMQSender.java | 71 +++++++++++ .../PayOrderMchNotifyRabbitMQReceiver.java | 52 +++++++++ .../ResetAppConfigRabbitMQReceiver.java | 66 +++++++++++ .../pay/mq/config/MqThreadExecutor.java | 8 +- 19 files changed, 1088 insertions(+), 4 deletions(-) create mode 100644 jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/constant/MQSendTypeEnum.java create mode 100644 jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/constant/MQVenderCS.java create mode 100644 jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/executor/MqThreadExecutor.java create mode 100644 jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/model/AbstractMQ.java create mode 100644 jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/model/PayOrderMchNotifyMQ.java create mode 100644 jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/model/PayOrderReissueMQ.java create mode 100644 jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/model/ResetAppConfigMQ.java create mode 100644 jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/IMQMsgReceiver.java create mode 100644 jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/IMQSender.java create mode 100644 jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/activemq/ActiveMQConfig.java create mode 100644 jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/activemq/ActiveMQSender.java create mode 100644 jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/activemq/receive/PayOrderMchNotifyActiveMQReceiver.java create mode 100644 jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/activemq/receive/ResetAppConfigActiveMQReceiver.java create mode 100644 jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/rabbitmq/RabbitMQBeanProcessor.java create mode 100644 jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/rabbitmq/RabbitMQConfig.java create mode 100644 jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/rabbitmq/RabbitMQSender.java create mode 100644 jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/rabbitmq/receive/PayOrderMchNotifyRabbitMQReceiver.java create mode 100644 jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/rabbitmq/receive/ResetAppConfigRabbitMQReceiver.java diff --git a/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/constant/MQSendTypeEnum.java b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/constant/MQSendTypeEnum.java new file mode 100644 index 0000000..6ad490e --- /dev/null +++ b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/constant/MQSendTypeEnum.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jeequan.jeepay.components.mq.constant; + +/** +* 定义MQ消息类型: +* QUEUE - 点对点 (只有1个消费者可消费) +* TOPIC - 订阅模式 (所有接收者都可接收到) +* +* @author terrfly +* @site https://www.jeepay.vip +* @date 2021/7/23 16:49 +*/ +public enum MQSendTypeEnum { + QUEUE, TOPIC +} diff --git a/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/constant/MQVenderCS.java b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/constant/MQVenderCS.java new file mode 100644 index 0000000..4af5fc3 --- /dev/null +++ b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/constant/MQVenderCS.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jeequan.jeepay.components.mq.constant; + +/** +* MQ 厂商定义类 +* +* @author terrfly +* @site https://www.jeepay.vip +* @date 2021/7/23 16:50 +*/ +public class MQVenderCS { + + public static final String YML_VENDER_KEY = "isys.mq.vender"; + + public static final String ACTIVE_MQ = "activeMQ"; + public static final String RABBIT_MQ = "rabbitMQ"; + public static final String ROCKET_MQ = "rocketMQ"; + +} diff --git a/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/executor/MqThreadExecutor.java b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/executor/MqThreadExecutor.java new file mode 100644 index 0000000..59f9855 --- /dev/null +++ b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/executor/MqThreadExecutor.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jeequan.jeepay.components.mq.executor; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +/* +* MQ 线程池配置 +* +* @author terrfly +* @site https://www.jeepay.vip +* @date 2021/6/8 17:33 +*/ +@Configuration +@EnableAsync +public class MqThreadExecutor { + + public static final String EXECUTOR_PAYORDER_MCH_NOTIFY = "mqQueue4PayOrderMchNotifyExecutor"; + + /* + * 功能描述: + * 支付结果通知到商户的异步执行器 (由于量大, 单独新建一个线程池处理, 之前的不做变动 ) + * 20, 300, 10, 60 该配置: 同一时间最大并发量300,(已经验证通过, 商户都可以收到请求消息) + * 缓存队列尽量减少,否则将堵塞在队列中无法执行。 corePoolSize 根据机器的配置进行添加。此处设置的为20 + */ + @Bean + public Executor mqQueue4PayOrderMchNotifyExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(20); // 线程池维护线程的最少数量 + executor.setMaxPoolSize(300); // 线程池维护线程的最大数量 + executor.setQueueCapacity(10); // 缓存队列 + executor.setThreadNamePrefix("payOrderMchNotifyExecutor-"); + // rejection-policy:当pool已经达到max size的时候,如何处理新任务 + // CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行 + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //对拒绝task的处理策略 + executor.setKeepAliveSeconds(60); // 允许的空闲时间 + executor.initialize(); + return executor; + } + +} diff --git a/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/model/AbstractMQ.java b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/model/AbstractMQ.java new file mode 100644 index 0000000..2b310eb --- /dev/null +++ b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/model/AbstractMQ.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jeequan.jeepay.components.mq.model; + +import com.jeequan.jeepay.components.mq.constant.MQSendTypeEnum; + +/** +* 定义MQ消息格式 +* +* @author terrfly +* @site https://www.jeepay.vip +* @date 2021/7/22 15:33 +*/ +public abstract class AbstractMQ { + + /** MQ名称 **/ + public abstract String getMQName(); + + /** MQ 类型 **/ + public abstract MQSendTypeEnum getMQType(); + + /** 构造MQ消息体 String类型 **/ + public abstract String toMessage(); + +} diff --git a/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/model/PayOrderMchNotifyMQ.java b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/model/PayOrderMchNotifyMQ.java new file mode 100644 index 0000000..b8120a4 --- /dev/null +++ b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/model/PayOrderMchNotifyMQ.java @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jeequan.jeepay.components.mq.model; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.jeequan.jeepay.components.mq.constant.MQSendTypeEnum; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** +* +* 定义MQ消息格式 +* 业务场景: [ 支付订单的商户通知消息 ] +* +* @author terrfly +* @site https://www.jeepay.vip +* @date 2021/7/22 15:25 +*/ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class PayOrderMchNotifyMQ extends AbstractMQ { + + /** 【!重要配置项!】 定义MQ名称 **/ + public static final String MQ_NAME = "QUEUE_PAY_ORDER_MCH_NOTIFY"; + + /** 内置msg 消息体定义 **/ + private MsgPayload payload; + + /** 【!重要配置项!】 定义Msg消息载体 **/ + @Data + @AllArgsConstructor + public static class MsgPayload { + + /** 支付订单号 **/ + private String payOrderId; + + } + + @Override + public String getMQName() { + return MQ_NAME; + } + + /** 【!重要配置项!】 **/ + @Override + public MQSendTypeEnum getMQType(){ + return MQSendTypeEnum.QUEUE; // QUEUE - 点对点 、 Topic - 订阅模式 + } + + @Override + public String toMessage() { + return JSONObject.toJSONString(payload); + } + + /** 【!重要配置项!】 构造MQModel , 一般用于发送MQ时 **/ + public static PayOrderMchNotifyMQ build(String payOrderId){ + return new PayOrderMchNotifyMQ(new MsgPayload(payOrderId)); + } + + /** 解析MQ消息, 一般用于接收MQ消息时 **/ + public static MsgPayload parse(String msg){ + return JSON.parseObject(msg, MsgPayload.class); + } + + /** 定义 IMQReceiver 接口: 项目实现该接口则可接收到对应的业务消息 **/ + public interface IMQReceiver{ + void receive(MsgPayload payload); + } + +} diff --git a/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/model/PayOrderReissueMQ.java b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/model/PayOrderReissueMQ.java new file mode 100644 index 0000000..c772135 --- /dev/null +++ b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/model/PayOrderReissueMQ.java @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jeequan.jeepay.components.mq.model; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.jeequan.jeepay.components.mq.constant.MQSendTypeEnum; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** +* +* 定义MQ消息格式 +* 业务场景: [ 支付订单补单(一般用于没有回调的接口,比如微信的条码支付) ] +* +* @author terrfly +* @site https://www.jeepay.vip +* @date 2021/7/22 15:25 +*/ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class PayOrderReissueMQ extends AbstractMQ { + + /** 【!重要配置项!】 定义MQ名称 **/ + public static final String MQ_NAME = "QUEUE_PAY_ORDER_REISSUE"; + + /** 内置msg 消息体定义 **/ + private MsgPayload payload; + + /** 【!重要配置项!】 定义Msg消息载体 **/ + @Data + @AllArgsConstructor + public static class MsgPayload { + + /** 支付订单号 **/ + private String payOrderId; + + } + + @Override + public String getMQName() { + return MQ_NAME; + } + + /** 【!重要配置项!】 **/ + @Override + public MQSendTypeEnum getMQType(){ + return MQSendTypeEnum.QUEUE; // QUEUE - 点对点 、 Topic - 订阅模式 + } + + @Override + public String toMessage() { + return JSONObject.toJSONString(payload); + } + + /** 【!重要配置项!】 构造MQModel , 一般用于发送MQ时 **/ + public static PayOrderReissueMQ build(String payOrderId){ + return new PayOrderReissueMQ(new MsgPayload(payOrderId)); + } + + /** 解析MQ消息, 一般用于接收MQ消息时 **/ + public static MsgPayload parse(String msg){ + return JSON.parseObject(msg, MsgPayload.class); + } + + /** 定义 IMQReceiver 接口: 项目实现该接口则可接收到对应的业务消息 **/ + public interface IMQReceiver{ + void receive(MsgPayload payload); + } + +} diff --git a/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/model/ResetAppConfigMQ.java b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/model/ResetAppConfigMQ.java new file mode 100644 index 0000000..d5d47ef --- /dev/null +++ b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/model/ResetAppConfigMQ.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jeequan.jeepay.components.mq.model; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.jeequan.jeepay.components.mq.constant.MQSendTypeEnum; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** +* +* 定义MQ消息格式 +* 业务场景: [ 更新系统配置参数 ] +* +* @author terrfly +* @site https://www.jeepay.vip +* @date 2021/7/22 15:25 +*/ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class ResetAppConfigMQ extends AbstractMQ { + + /** 【!重要配置项!】 定义MQ名称 **/ + public static final String MQ_NAME = "TOPIC_RESET_APP_CONFIG"; + + /** 内置msg 消息体定义 **/ + private MsgPayload payload; + + /** 【!重要配置项!】 定义Msg消息载体 **/ + @Data + @AllArgsConstructor + public static class MsgPayload { + } + + @Override + public String getMQName() { + return MQ_NAME; + } + + /** 【!重要配置项!】 **/ + @Override + public MQSendTypeEnum getMQType(){ + return MQSendTypeEnum.TOPIC; // QUEUE - 点对点 、 Topic - 订阅模式 + } + + @Override + public String toMessage() { + return JSONObject.toJSONString(payload); + } + + /** 【!重要配置项!】 构造MQModel , 一般用于发送MQ时 **/ + public static ResetAppConfigMQ build(){ + return new ResetAppConfigMQ(new MsgPayload()); + } + + /** 解析MQ消息, 一般用于接收MQ消息时 **/ + public static MsgPayload parse(String msg){ + return JSON.parseObject(msg, MsgPayload.class); + } + + /** 定义 IMQReceiver 接口: 项目实现该接口则可接收到对应的业务消息 **/ + public interface IMQReceiver{ + void receive(MsgPayload payload); + } + +} diff --git a/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/IMQMsgReceiver.java b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/IMQMsgReceiver.java new file mode 100644 index 0000000..a108bf4 --- /dev/null +++ b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/IMQMsgReceiver.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jeequan.jeepay.components.mq.vender; + +/** +* MQ 消息接收器 接口定义 +* +* @author terrfly +* @site https://www.jeepay.vip +* @date 2021/7/21 16:09 +*/ +public interface IMQMsgReceiver { + + /** 接收消息 **/ + void receiveMsg(String msg); +} diff --git a/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/IMQSender.java b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/IMQSender.java new file mode 100644 index 0000000..a6c1618 --- /dev/null +++ b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/IMQSender.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jeequan.jeepay.components.mq.vender; + +import com.jeequan.jeepay.components.mq.model.AbstractMQ; + +/** +* MQ 消息发送器 接口定义 +* +* @author terrfly +* @site https://www.jeepay.vip +* @date 2021/7/21 16:09 +*/ +public interface IMQSender { + + /** 推送MQ消息, 实时 **/ + void send(AbstractMQ mqModel); + + /** 推送MQ消息, 延迟接收,单位:s **/ + void send(AbstractMQ mqModel, int delay); + +} diff --git a/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/activemq/ActiveMQConfig.java b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/activemq/ActiveMQConfig.java new file mode 100644 index 0000000..39c2e03 --- /dev/null +++ b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/activemq/ActiveMQConfig.java @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jeequan.jeepay.components.mq.vender.activemq; + +import com.jeequan.jeepay.components.mq.constant.MQSendTypeEnum; +import com.jeequan.jeepay.components.mq.model.AbstractMQ; +import com.jeequan.jeepay.components.mq.constant.MQVenderCS; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.jms.config.DefaultJmsListenerContainerFactory; +import org.springframework.jms.config.JmsListenerContainerFactory; +import org.springframework.stereotype.Component; + +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/* +* activeMQ的配置项 +* +* @author terrfly +* @site https://www.jeepay.vip +* @date 2021/7/23 16:51 +*/ +@Component +@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.ACTIVE_MQ) +public class ActiveMQConfig { + + Map map = new ConcurrentHashMap<>(); + + public Destination getDestination(AbstractMQ mqModel){ + + if(map.get(mqModel.getMQName()) == null){ + this.init(mqModel.getMQName(), mqModel.getMQType()); + } + return map.get(mqModel.getMQName()); + } + + private synchronized void init(String mqName, MQSendTypeEnum mqSendTypeEnum){ + + if(mqSendTypeEnum == MQSendTypeEnum.QUEUE){ + map.put(mqName, new ActiveMQQueue(mqName) ); + }else{ + map.put(mqName, new ActiveMQTopic(mqName) ); + } + } + + + public static final String TOPIC_LISTENER_CONTAINER = "jmsTopicListenerContainer"; + + /** 新增jmsListenerContainer, 用于接收topic类型的消息 **/ + @Bean + public JmsListenerContainerFactory jmsTopicListenerContainer(ConnectionFactory factory){ + DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); + bean.setPubSubDomain(true); + bean.setConnectionFactory(factory); + return bean; + } + +} diff --git a/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/activemq/ActiveMQSender.java b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/activemq/ActiveMQSender.java new file mode 100644 index 0000000..c1dc6b6 --- /dev/null +++ b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/activemq/ActiveMQSender.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jeequan.jeepay.components.mq.vender.activemq; + +import com.jeequan.jeepay.components.mq.model.AbstractMQ; +import com.jeequan.jeepay.components.mq.constant.MQVenderCS; +import com.jeequan.jeepay.components.mq.vender.IMQSender; +import org.apache.activemq.ScheduledMessage; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.jms.core.JmsTemplate; +import org.springframework.stereotype.Component; + +import javax.jms.TextMessage; + +/** +* activeMQ 消息发送器的实现 +* +* @author terrfly +* @site https://www.jeepay.vip +* @date 2021/7/23 16:52 +*/ +@Component +@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.ACTIVE_MQ) +public class ActiveMQSender implements IMQSender { + + @Autowired + private ActiveMQConfig activeMQConfig; + + @Autowired + private JmsTemplate jmsTemplate; + + @Override + public void send(AbstractMQ mqModel) { + jmsTemplate.convertAndSend(activeMQConfig.getDestination(mqModel), mqModel.toMessage()); + } + + @Override + public void send(AbstractMQ mqModel, int delay) { + jmsTemplate.send(activeMQConfig.getDestination(mqModel), session -> { + TextMessage tm = session.createTextMessage(mqModel.toMessage()); + tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); + tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1*1000); + tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 1); + return tm; + }); + } + +} diff --git a/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/activemq/receive/PayOrderMchNotifyActiveMQReceiver.java b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/activemq/receive/PayOrderMchNotifyActiveMQReceiver.java new file mode 100644 index 0000000..f40083a --- /dev/null +++ b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/activemq/receive/PayOrderMchNotifyActiveMQReceiver.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jeequan.jeepay.components.mq.vender.activemq.receive; + +import com.jeequan.jeepay.components.mq.executor.MqThreadExecutor; +import com.jeequan.jeepay.components.mq.model.PayOrderMchNotifyMQ; +import com.jeequan.jeepay.components.mq.vender.IMQMsgReceiver; +import com.jeequan.jeepay.components.mq.constant.MQVenderCS; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.jms.annotation.JmsListener; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +/** + * activeMQ 消息接收器:仅在vender=avtiveMQ时 && 项目实现IMQReceiver接口时 进行实例化 + * 业务: 支付订单商户通知 + * + * @author terrfly + * @site https://www.jeepay.vip + * @date 2021/7/22 17:06 + */ +@Component +@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.ACTIVE_MQ) +@ConditionalOnBean(PayOrderMchNotifyMQ.IMQReceiver.class) +public class PayOrderMchNotifyActiveMQReceiver implements IMQMsgReceiver { + + @Autowired + private PayOrderMchNotifyMQ.IMQReceiver mqReceiver; + + /** 接收 【 queue 】 类型的消息 **/ + @Async(MqThreadExecutor.EXECUTOR_PAYORDER_MCH_NOTIFY) + @JmsListener(destination = PayOrderMchNotifyMQ.MQ_NAME) + public void receiveMsg(String msg){ + mqReceiver.receive(PayOrderMchNotifyMQ.parse(msg)); + } + +} diff --git a/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/activemq/receive/ResetAppConfigActiveMQReceiver.java b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/activemq/receive/ResetAppConfigActiveMQReceiver.java new file mode 100644 index 0000000..465dc28 --- /dev/null +++ b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/activemq/receive/ResetAppConfigActiveMQReceiver.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jeequan.jeepay.components.mq.vender.activemq.receive; + +import com.jeequan.jeepay.components.mq.model.ResetAppConfigMQ; +import com.jeequan.jeepay.components.mq.constant.MQVenderCS; +import com.jeequan.jeepay.components.mq.vender.IMQMsgReceiver; +import com.jeequan.jeepay.components.mq.vender.activemq.ActiveMQConfig; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.jms.annotation.JmsListener; +import org.springframework.stereotype.Component; + +/** +* activeMQ消息接收器:仅在vender=avtiveMQ时 && 项目实现IMQReceiver接口时 进行实例化 +* 业务: 更新系统配置参数 +* +* @author terrfly +* @site https://www.jeepay.vip +* @date 2021/7/22 17:06 +*/ +@Component +@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.ACTIVE_MQ) +@ConditionalOnBean(ResetAppConfigMQ.IMQReceiver.class) +public class ResetAppConfigActiveMQReceiver implements IMQMsgReceiver { + + @Autowired + private ResetAppConfigMQ.IMQReceiver mqReceiver; + + /** 接收 【 topic 】 类型的消息 **/ + @JmsListener(destination = ResetAppConfigMQ.MQ_NAME, containerFactory = ActiveMQConfig.TOPIC_LISTENER_CONTAINER) + public void receiveMsg(String msg){ + mqReceiver.receive(ResetAppConfigMQ.parse(msg)); + } + +} diff --git a/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/rabbitmq/RabbitMQBeanProcessor.java b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/rabbitmq/RabbitMQBeanProcessor.java new file mode 100644 index 0000000..6700f2a --- /dev/null +++ b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/rabbitmq/RabbitMQBeanProcessor.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jeequan.jeepay.components.mq.vender.rabbitmq; + +import com.jeequan.jeepay.components.mq.constant.MQVenderCS; +import org.springframework.amqp.core.CustomExchange; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; +import org.springframework.beans.factory.support.BeanDefinitionRegistry; +import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.HashMap; +import java.util.Map; + +/** +* 将spring容器的 [bean注册器]放置到属性中,为 RabbitConfig提供访问。 +* 顺序: + * 1. postProcessBeanDefinitionRegistry (存放注册器) + * 2. postProcessBeanFactory (没有使用) + * 3. 注册延迟消息交换机的bean: delayedExchange + * 4. 动态配置RabbitMQ所需的bean。 +* +* @author terrfly +* @site https://www.jeepay.vip +* @date 2021/7/23 16:27 +*/ +@Configuration +@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.RABBIT_MQ) +public class RabbitMQBeanProcessor implements BeanDefinitionRegistryPostProcessor { + + /** bean注册器 **/ + protected BeanDefinitionRegistry beanDefinitionRegistry; + + @Override + public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException { + this.beanDefinitionRegistry = beanDefinitionRegistry; + } + + @Override + public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException { + } + + /** 自定义交换机: 用于延迟消息 **/ + @Bean(name = RabbitMQConfig.DELAYED_EXCHANGE_NAME) + CustomExchange delayedExchange() { + Map args = new HashMap<>(); + args.put("x-delayed-type", "direct"); + return new CustomExchange(RabbitMQConfig.DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args); + } + +} diff --git a/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/rabbitmq/RabbitMQConfig.java b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/rabbitmq/RabbitMQConfig.java new file mode 100644 index 0000000..b9295b9 --- /dev/null +++ b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/rabbitmq/RabbitMQConfig.java @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jeequan.jeepay.components.mq.vender.rabbitmq; + +import cn.hutool.core.util.ClassUtil; +import cn.hutool.core.util.ReflectUtil; +import com.jeequan.jeepay.components.mq.constant.MQSendTypeEnum; +import com.jeequan.jeepay.components.mq.constant.MQVenderCS; +import com.jeequan.jeepay.components.mq.model.AbstractMQ; +import com.jeequan.jeepay.core.utils.SpringBeansUtil; +import org.springframework.amqp.core.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.Set; + +/** +* RabbitMQ的配置项 +* 1. 注册全部定义好的Queue Bean +* 2. 动态注册fanout交换机 +* 3. 将Queue模式绑定到延时消息的交换机 +* +* @author terrfly +* @site https://www.jeepay.vip +* @date 2021/7/23 16:33 +*/ +@Component +@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.RABBIT_MQ) +public class RabbitMQConfig { + + /** 全局定义延迟交换机名称 **/ + public static final String DELAYED_EXCHANGE_NAME = "delayedExchange"; + + /** 扇形交换机前缀(activeMQ中的topic模式), 需根据queue动态拼接 **/ + public static final String FANOUT_EXCHANGE_NAME_PREFIX = "fanout_exchange_"; + + /** 注入延迟交换机Bean **/ + @Autowired + @Qualifier(DELAYED_EXCHANGE_NAME) + private CustomExchange delayedExchange; + + /** 注入rabbitMQBeanProcessor **/ + @Autowired + private RabbitMQBeanProcessor rabbitMQBeanProcessor; + + /** 在全部bean注册完成后再执行 **/ + @PostConstruct + public void init(){ + + // 获取到所有的MQ定义 + Set> set = ClassUtil.scanPackageBySuper(ClassUtil.getPackage(AbstractMQ.class), AbstractMQ.class); + + for (Class aClass : set) { + + // 实例化 + AbstractMQ amq = (AbstractMQ) ReflectUtil.newInstance(aClass); + + // 注册Queue === new Queue(name), queue名称/bean名称 = mqName + rabbitMQBeanProcessor.beanDefinitionRegistry.registerBeanDefinition(amq.getMQName(), + BeanDefinitionBuilder.rootBeanDefinition(Queue.class).addConstructorArgValue(amq.getMQName()).getBeanDefinition()); + + // topic模式 + if(amq.getMQType() == MQSendTypeEnum.TOPIC){ + + // 动态注册交换机, 交换机名称/bean名称 = FANOUT_EXCHANGE_NAME_PREFIX + amq.getMQName() + rabbitMQBeanProcessor.beanDefinitionRegistry.registerBeanDefinition(FANOUT_EXCHANGE_NAME_PREFIX +amq.getMQName(), + BeanDefinitionBuilder.genericBeanDefinition(FanoutExchange.class, () ->{ + + // 普通FanoutExchange 交换机 + return new FanoutExchange(FANOUT_EXCHANGE_NAME_PREFIX +amq.getMQName(),true,false); + + // 支持 延迟的 FanoutExchange 交换机, 配置无效果。 +// Map args = new HashMap<>(); +// args.put("x-delayed-type", ExchangeTypes.FANOUT); +// return new CustomExchange(RabbitMQConfig.DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args); + } + + ).getBeanDefinition() + ); + + }else{ + + // 延迟交换机与Queue进行绑定, 绑定Bean名称 = mqName_DelayedBind + rabbitMQBeanProcessor.beanDefinitionRegistry.registerBeanDefinition(amq.getMQName() + "_DelayedBind", + BeanDefinitionBuilder.genericBeanDefinition(Binding.class, () -> + BindingBuilder.bind(SpringBeansUtil.getBean(amq.getMQName(), Queue.class)).to(delayedExchange).with(amq.getMQName()).noargs() + + ).getBeanDefinition() + ); + } + } + } +} diff --git a/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/rabbitmq/RabbitMQSender.java b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/rabbitmq/RabbitMQSender.java new file mode 100644 index 0000000..0ed677d --- /dev/null +++ b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/rabbitmq/RabbitMQSender.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jeequan.jeepay.components.mq.vender.rabbitmq; + +import com.jeequan.jeepay.components.mq.constant.MQSendTypeEnum; +import com.jeequan.jeepay.components.mq.constant.MQVenderCS; +import com.jeequan.jeepay.components.mq.model.AbstractMQ; +import com.jeequan.jeepay.components.mq.vender.IMQSender; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * rabbitMQ 消息发送器的实现 + * + * @author terrfly + * @site https://www.jeepay.vip + * @date 2021/7/23 16:52 + */ +@Component +@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.RABBIT_MQ) +public class RabbitMQSender implements IMQSender { + + @Autowired + private RabbitTemplate rabbitTemplate; + + @Override + public void send(AbstractMQ mqModel) { + + if(mqModel.getMQType() == MQSendTypeEnum.QUEUE){ + + rabbitTemplate.convertAndSend(mqModel.getMQName(), mqModel.toMessage()); + }else{ + + // fanout模式 的 routeKEY 没意义。 + this.rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE_NAME_PREFIX + mqModel.getMQName(), null, mqModel.toMessage()); + } + } + + @Override + public void send(AbstractMQ mqModel, int delay) { + + + if(mqModel.getMQType() == MQSendTypeEnum.QUEUE){ + + rabbitTemplate.convertAndSend(RabbitMQConfig.DELAYED_EXCHANGE_NAME, mqModel.getMQName(), mqModel.toMessage(), messagePostProcessor ->{ + messagePostProcessor.getMessageProperties().setDelay(Math.toIntExact(delay * 1000)); + return messagePostProcessor; + }); + }else{ + + // fanout模式 的 routeKEY 没意义。 没有延迟属性 + this.rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE_NAME_PREFIX + mqModel.getMQName(), null, mqModel.toMessage()); + } + } + +} diff --git a/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/rabbitmq/receive/PayOrderMchNotifyRabbitMQReceiver.java b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/rabbitmq/receive/PayOrderMchNotifyRabbitMQReceiver.java new file mode 100644 index 0000000..39f85f8 --- /dev/null +++ b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/rabbitmq/receive/PayOrderMchNotifyRabbitMQReceiver.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jeequan.jeepay.components.mq.vender.rabbitmq.receive; + +import com.jeequan.jeepay.components.mq.executor.MqThreadExecutor; +import com.jeequan.jeepay.components.mq.model.PayOrderMchNotifyMQ; +import com.jeequan.jeepay.components.mq.vender.IMQMsgReceiver; +import com.jeequan.jeepay.components.mq.constant.MQVenderCS; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +/** + * rabbitMQ消息接收器:仅在vender=rabbitMQ时 && 项目实现IMQReceiver接口时 进行实例化 + * 业务: 支付订单商户通知 + * + * @author terrfly + * @site https://www.jeepay.vip + * @date 2021/7/22 17:06 + */ +@Component +@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.RABBIT_MQ) +@ConditionalOnBean(PayOrderMchNotifyMQ.IMQReceiver.class) +public class PayOrderMchNotifyRabbitMQReceiver implements IMQMsgReceiver { + + @Autowired + private PayOrderMchNotifyMQ.IMQReceiver mqReceiver; + + /** 接收 【 queue 】 类型的消息 **/ + @Async(MqThreadExecutor.EXECUTOR_PAYORDER_MCH_NOTIFY) + @RabbitListener(queues = PayOrderMchNotifyMQ.MQ_NAME) + public void receiveMsg(String msg){ + mqReceiver.receive(PayOrderMchNotifyMQ.parse(msg)); + } + +} diff --git a/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/rabbitmq/receive/ResetAppConfigRabbitMQReceiver.java b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/rabbitmq/receive/ResetAppConfigRabbitMQReceiver.java new file mode 100644 index 0000000..af5f2e2 --- /dev/null +++ b/jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/rabbitmq/receive/ResetAppConfigRabbitMQReceiver.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com). + *

+ * Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.gnu.org/licenses/lgpl.html + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.jeequan.jeepay.components.mq.vender.rabbitmq.receive; + +import com.jeequan.jeepay.components.mq.model.ResetAppConfigMQ; +import com.jeequan.jeepay.components.mq.vender.IMQMsgReceiver; +import com.jeequan.jeepay.components.mq.constant.MQVenderCS; +import com.jeequan.jeepay.components.mq.vender.rabbitmq.RabbitMQConfig; +import org.springframework.amqp.core.ExchangeTypes; +import org.springframework.amqp.rabbit.annotation.Exchange; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.QueueBinding; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** +* rabbitMQ消息接收器:仅在vender=rabbitMQ时 && 项目实现IMQReceiver接口时 进行实例化 +* 业务: 更新系统配置参数 +* +* @author terrfly +* @site https://www.jeepay.vip +* @date 2021/7/22 17:06 +*/ +@Component +@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.RABBIT_MQ) +@ConditionalOnBean(ResetAppConfigMQ.IMQReceiver.class) +public class ResetAppConfigRabbitMQReceiver implements IMQMsgReceiver { + + @Autowired + private ResetAppConfigMQ.IMQReceiver mqReceiver; + + /** 接收 【 topic 】 类型的消息 + * + * 注意: + * RabbitMQ的广播模式(fanout)交换机 --》全部的Queue + * 如果queue包含多个消费者, 【例如,manager和payment的监听器是名称相同的queue下的消费者(Consumers) 】, 两个消费者是工作模式且存在竞争关系, 导致只能一个来消费。 + * 解决: + * 每个topic的QUEUE都声明一个FANOUT交换机, 消费者声明一个系统产生的【随机队列】绑定到这个交换机上,然后往交换机发消息,只要绑定到这个交换机上都能收到消息。 + * 参考: https://bbs.csdn.net/topics/392509262?list=70088931 + * + * **/ + @RabbitListener( + bindings = {@QueueBinding(value = @Queue(), // 注意这里不要定义队列名称,系统会随机产生 + exchange = @Exchange(name = RabbitMQConfig.FANOUT_EXCHANGE_NAME_PREFIX + ResetAppConfigMQ.MQ_NAME, + type = ExchangeTypes.FANOUT ))} ) + public void receiveMsg(String msg){ + mqReceiver.receive(ResetAppConfigMQ.parse(msg)); + } + +} diff --git a/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/config/MqThreadExecutor.java b/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/config/MqThreadExecutor.java index 019d3bf..aea26c0 100644 --- a/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/config/MqThreadExecutor.java +++ b/jeepay-payment/src/main/java/com/jeequan/jeepay/pay/mq/config/MqThreadExecutor.java @@ -25,13 +25,13 @@ import java.util.concurrent.ThreadPoolExecutor; /* * MQ 线程池配置 -* +* * @author terrfly * @site https://www.jeepay.vip * @date 2021/6/8 17:33 */ -@Configuration -@EnableAsync +//@Configuration +//@EnableAsync public class MqThreadExecutor { public static final String EXECUTOR_PAYORDER_MCH_NOTIFY = "mqQueue4PayOrderMchNotifyExecutor"; @@ -59,4 +59,4 @@ public class MqThreadExecutor { -} \ No newline at end of file +}