完成components.mq 组件, 完成activeMQ和rabbitMQ的封装, 对消息生产者和消费者的配置透明化。

This commit is contained in:
terrfly 2021-07-23 17:32:32 +08:00
parent 8e9e60879c
commit 06a5cc9b02
19 changed files with 1088 additions and 4 deletions

View File

@ -0,0 +1,29 @@
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.constant;
/**
* 定义MQ消息类型
* QUEUE - 点对点 只有1个消费者可消费
* TOPIC - 订阅模式 (所有接收者都可接收到)
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/7/23 16:49
*/
public enum MQSendTypeEnum {
QUEUE, TOPIC
}

View File

@ -0,0 +1,33 @@
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.constant;
/**
* MQ 厂商定义类
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/7/23 16:50
*/
public class MQVenderCS {
public static final String YML_VENDER_KEY = "isys.mq.vender";
public static final String ACTIVE_MQ = "activeMQ";
public static final String RABBIT_MQ = "rabbitMQ";
public static final String ROCKET_MQ = "rocketMQ";
}

View File

@ -0,0 +1,60 @@
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.executor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/*
* MQ 线程池配置
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/6/8 17:33
*/
@Configuration
@EnableAsync
public class MqThreadExecutor {
public static final String EXECUTOR_PAYORDER_MCH_NOTIFY = "mqQueue4PayOrderMchNotifyExecutor";
/*
* 功能描述:
* 支付结果通知到商户的异步执行器 由于量大 单独新建一个线程池处理 之前的不做变动
* 20, 300, 10, 60 该配置 同一时间最大并发量300已经验证通过 商户都可以收到请求消息
* 缓存队列尽量减少否则将堵塞在队列中无法执行 corePoolSize 根据机器的配置进行添加此处设置的为20
*/
@Bean
public Executor mqQueue4PayOrderMchNotifyExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20); // 线程池维护线程的最少数量
executor.setMaxPoolSize(300); // 线程池维护线程的最大数量
executor.setQueueCapacity(10); // 缓存队列
executor.setThreadNamePrefix("payOrderMchNotifyExecutor-");
// rejection-policy当pool已经达到max size的时候如何处理新任务
// CALLER_RUNS不在新线程中执行任务而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //对拒绝task的处理策略
executor.setKeepAliveSeconds(60); // 允许的空闲时间
executor.initialize();
return executor;
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.model;
import com.jeequan.jeepay.components.mq.constant.MQSendTypeEnum;
/**
* 定义MQ消息格式
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/7/22 15:33
*/
public abstract class AbstractMQ {
/** MQ名称 **/
public abstract String getMQName();
/** MQ 类型 **/
public abstract MQSendTypeEnum getMQType();
/** 构造MQ消息体 String类型 **/
public abstract String toMessage();
}

View File

@ -0,0 +1,86 @@
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.model;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.jeequan.jeepay.components.mq.constant.MQSendTypeEnum;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
*
* 定义MQ消息格式
* 业务场景 [ 支付订单的商户通知消息 ]
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/7/22 15:25
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PayOrderMchNotifyMQ extends AbstractMQ {
/** 【!重要配置项!】 定义MQ名称 **/
public static final String MQ_NAME = "QUEUE_PAY_ORDER_MCH_NOTIFY";
/** 内置msg 消息体定义 **/
private MsgPayload payload;
/** 【!重要配置项!】 定义Msg消息载体 **/
@Data
@AllArgsConstructor
public static class MsgPayload {
/** 支付订单号 **/
private String payOrderId;
}
@Override
public String getMQName() {
return MQ_NAME;
}
/** 【!重要配置项!】 **/
@Override
public MQSendTypeEnum getMQType(){
return MQSendTypeEnum.QUEUE; // QUEUE - 点对点 Topic - 订阅模式
}
@Override
public String toMessage() {
return JSONObject.toJSONString(payload);
}
/** 【!重要配置项!】 构造MQModel , 一般用于发送MQ时 **/
public static PayOrderMchNotifyMQ build(String payOrderId){
return new PayOrderMchNotifyMQ(new MsgPayload(payOrderId));
}
/** 解析MQ消息 一般用于接收MQ消息时 **/
public static MsgPayload parse(String msg){
return JSON.parseObject(msg, MsgPayload.class);
}
/** 定义 IMQReceiver 接口: 项目实现该接口则可接收到对应的业务消息 **/
public interface IMQReceiver{
void receive(MsgPayload payload);
}
}

View File

@ -0,0 +1,86 @@
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.model;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.jeequan.jeepay.components.mq.constant.MQSendTypeEnum;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
*
* 定义MQ消息格式
* 业务场景 [ 支付订单补单一般用于没有回调的接口比如微信的条码支付 ]
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/7/22 15:25
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PayOrderReissueMQ extends AbstractMQ {
/** 【!重要配置项!】 定义MQ名称 **/
public static final String MQ_NAME = "QUEUE_PAY_ORDER_REISSUE";
/** 内置msg 消息体定义 **/
private MsgPayload payload;
/** 【!重要配置项!】 定义Msg消息载体 **/
@Data
@AllArgsConstructor
public static class MsgPayload {
/** 支付订单号 **/
private String payOrderId;
}
@Override
public String getMQName() {
return MQ_NAME;
}
/** 【!重要配置项!】 **/
@Override
public MQSendTypeEnum getMQType(){
return MQSendTypeEnum.QUEUE; // QUEUE - 点对点 Topic - 订阅模式
}
@Override
public String toMessage() {
return JSONObject.toJSONString(payload);
}
/** 【!重要配置项!】 构造MQModel , 一般用于发送MQ时 **/
public static PayOrderReissueMQ build(String payOrderId){
return new PayOrderReissueMQ(new MsgPayload(payOrderId));
}
/** 解析MQ消息 一般用于接收MQ消息时 **/
public static MsgPayload parse(String msg){
return JSON.parseObject(msg, MsgPayload.class);
}
/** 定义 IMQReceiver 接口: 项目实现该接口则可接收到对应的业务消息 **/
public interface IMQReceiver{
void receive(MsgPayload payload);
}
}

View File

@ -0,0 +1,82 @@
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.model;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.jeequan.jeepay.components.mq.constant.MQSendTypeEnum;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
*
* 定义MQ消息格式
* 业务场景 [ 更新系统配置参数 ]
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/7/22 15:25
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ResetAppConfigMQ extends AbstractMQ {
/** 【!重要配置项!】 定义MQ名称 **/
public static final String MQ_NAME = "TOPIC_RESET_APP_CONFIG";
/** 内置msg 消息体定义 **/
private MsgPayload payload;
/** 【!重要配置项!】 定义Msg消息载体 **/
@Data
@AllArgsConstructor
public static class MsgPayload {
}
@Override
public String getMQName() {
return MQ_NAME;
}
/** 【!重要配置项!】 **/
@Override
public MQSendTypeEnum getMQType(){
return MQSendTypeEnum.TOPIC; // QUEUE - 点对点 Topic - 订阅模式
}
@Override
public String toMessage() {
return JSONObject.toJSONString(payload);
}
/** 【!重要配置项!】 构造MQModel , 一般用于发送MQ时 **/
public static ResetAppConfigMQ build(){
return new ResetAppConfigMQ(new MsgPayload());
}
/** 解析MQ消息 一般用于接收MQ消息时 **/
public static MsgPayload parse(String msg){
return JSON.parseObject(msg, MsgPayload.class);
}
/** 定义 IMQReceiver 接口: 项目实现该接口则可接收到对应的业务消息 **/
public interface IMQReceiver{
void receive(MsgPayload payload);
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.vender;
/**
* MQ 消息接收器 接口定义
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/7/21 16:09
*/
public interface IMQMsgReceiver {
/** 接收消息 **/
void receiveMsg(String msg);
}

View File

@ -0,0 +1,35 @@
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.vender;
import com.jeequan.jeepay.components.mq.model.AbstractMQ;
/**
* MQ 消息发送器 接口定义
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/7/21 16:09
*/
public interface IMQSender {
/** 推送MQ消息 实时 **/
void send(AbstractMQ mqModel);
/** 推送MQ消息 延迟接收单位s **/
void send(AbstractMQ mqModel, int delay);
}

View File

@ -0,0 +1,76 @@
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.vender.activemq;
import com.jeequan.jeepay.components.mq.constant.MQSendTypeEnum;
import com.jeequan.jeepay.components.mq.model.AbstractMQ;
import com.jeequan.jeepay.components.mq.constant.MQVenderCS;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.stereotype.Component;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/*
* activeMQ的配置项
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/7/23 16:51
*/
@Component
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.ACTIVE_MQ)
public class ActiveMQConfig {
Map<String, Destination> map = new ConcurrentHashMap<>();
public Destination getDestination(AbstractMQ mqModel){
if(map.get(mqModel.getMQName()) == null){
this.init(mqModel.getMQName(), mqModel.getMQType());
}
return map.get(mqModel.getMQName());
}
private synchronized void init(String mqName, MQSendTypeEnum mqSendTypeEnum){
if(mqSendTypeEnum == MQSendTypeEnum.QUEUE){
map.put(mqName, new ActiveMQQueue(mqName) );
}else{
map.put(mqName, new ActiveMQTopic(mqName) );
}
}
public static final String TOPIC_LISTENER_CONTAINER = "jmsTopicListenerContainer";
/** 新增jmsListenerContainer, 用于接收topic类型的消息 **/
@Bean
public JmsListenerContainerFactory<?> jmsTopicListenerContainer(ConnectionFactory factory){
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);
bean.setConnectionFactory(factory);
return bean;
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.vender.activemq;
import com.jeequan.jeepay.components.mq.model.AbstractMQ;
import com.jeequan.jeepay.components.mq.constant.MQVenderCS;
import com.jeequan.jeepay.components.mq.vender.IMQSender;
import org.apache.activemq.ScheduledMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
import javax.jms.TextMessage;
/**
* activeMQ 消息发送器的实现
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/7/23 16:52
*/
@Component
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.ACTIVE_MQ)
public class ActiveMQSender implements IMQSender {
@Autowired
private ActiveMQConfig activeMQConfig;
@Autowired
private JmsTemplate jmsTemplate;
@Override
public void send(AbstractMQ mqModel) {
jmsTemplate.convertAndSend(activeMQConfig.getDestination(mqModel), mqModel.toMessage());
}
@Override
public void send(AbstractMQ mqModel, int delay) {
jmsTemplate.send(activeMQConfig.getDestination(mqModel), session -> {
TextMessage tm = session.createTextMessage(mqModel.toMessage());
tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1*1000);
tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 1);
return tm;
});
}
}

View File

@ -0,0 +1,52 @@
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.vender.activemq.receive;
import com.jeequan.jeepay.components.mq.executor.MqThreadExecutor;
import com.jeequan.jeepay.components.mq.model.PayOrderMchNotifyMQ;
import com.jeequan.jeepay.components.mq.vender.IMQMsgReceiver;
import com.jeequan.jeepay.components.mq.constant.MQVenderCS;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* activeMQ 消息接收器仅在vender=avtiveMQ时 && 项目实现IMQReceiver接口时 进行实例化
* 业务 支付订单商户通知
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/7/22 17:06
*/
@Component
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.ACTIVE_MQ)
@ConditionalOnBean(PayOrderMchNotifyMQ.IMQReceiver.class)
public class PayOrderMchNotifyActiveMQReceiver implements IMQMsgReceiver {
@Autowired
private PayOrderMchNotifyMQ.IMQReceiver mqReceiver;
/** 接收 【 queue 】 类型的消息 **/
@Async(MqThreadExecutor.EXECUTOR_PAYORDER_MCH_NOTIFY)
@JmsListener(destination = PayOrderMchNotifyMQ.MQ_NAME)
public void receiveMsg(String msg){
mqReceiver.receive(PayOrderMchNotifyMQ.parse(msg));
}
}

View File

@ -0,0 +1,50 @@
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.vender.activemq.receive;
import com.jeequan.jeepay.components.mq.model.ResetAppConfigMQ;
import com.jeequan.jeepay.components.mq.constant.MQVenderCS;
import com.jeequan.jeepay.components.mq.vender.IMQMsgReceiver;
import com.jeequan.jeepay.components.mq.vender.activemq.ActiveMQConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/**
* activeMQ消息接收器仅在vender=avtiveMQ时 && 项目实现IMQReceiver接口时 进行实例化
* 业务 更新系统配置参数
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/7/22 17:06
*/
@Component
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.ACTIVE_MQ)
@ConditionalOnBean(ResetAppConfigMQ.IMQReceiver.class)
public class ResetAppConfigActiveMQReceiver implements IMQMsgReceiver {
@Autowired
private ResetAppConfigMQ.IMQReceiver mqReceiver;
/** 接收 【 topic 】 类型的消息 **/
@JmsListener(destination = ResetAppConfigMQ.MQ_NAME, containerFactory = ActiveMQConfig.TOPIC_LISTENER_CONTAINER)
public void receiveMsg(String msg){
mqReceiver.receive(ResetAppConfigMQ.parse(msg));
}
}

View File

@ -0,0 +1,67 @@
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.vender.rabbitmq;
import com.jeequan.jeepay.components.mq.constant.MQVenderCS;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* 将spring容器的 [bean注册器]放置到属性中 RabbitConfig提供访问
* 顺序
* 1. postProcessBeanDefinitionRegistry (存放注册器)
* 2. postProcessBeanFactory 没有使用
* 3. 注册延迟消息交换机的bean: delayedExchange
* 4. 动态配置RabbitMQ所需的bean
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/7/23 16:27
*/
@Configuration
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.RABBIT_MQ)
public class RabbitMQBeanProcessor implements BeanDefinitionRegistryPostProcessor {
/** bean注册器 **/
protected BeanDefinitionRegistry beanDefinitionRegistry;
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException {
this.beanDefinitionRegistry = beanDefinitionRegistry;
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
}
/** 自定义交换机: 用于延迟消息 **/
@Bean(name = RabbitMQConfig.DELAYED_EXCHANGE_NAME)
CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(RabbitMQConfig.DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
}

View File

@ -0,0 +1,110 @@
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.vender.rabbitmq;
import cn.hutool.core.util.ClassUtil;
import cn.hutool.core.util.ReflectUtil;
import com.jeequan.jeepay.components.mq.constant.MQSendTypeEnum;
import com.jeequan.jeepay.components.mq.constant.MQVenderCS;
import com.jeequan.jeepay.components.mq.model.AbstractMQ;
import com.jeequan.jeepay.core.utils.SpringBeansUtil;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Set;
/**
* RabbitMQ的配置项
* 1. 注册全部定义好的Queue Bean
* 2. 动态注册fanout交换机
* 3. 将Queue模式绑定到延时消息的交换机
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/7/23 16:33
*/
@Component
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.RABBIT_MQ)
public class RabbitMQConfig {
/** 全局定义延迟交换机名称 **/
public static final String DELAYED_EXCHANGE_NAME = "delayedExchange";
/** 扇形交换机前缀activeMQ中的topic模式 需根据queue动态拼接 **/
public static final String FANOUT_EXCHANGE_NAME_PREFIX = "fanout_exchange_";
/** 注入延迟交换机Bean **/
@Autowired
@Qualifier(DELAYED_EXCHANGE_NAME)
private CustomExchange delayedExchange;
/** 注入rabbitMQBeanProcessor **/
@Autowired
private RabbitMQBeanProcessor rabbitMQBeanProcessor;
/** 在全部bean注册完成后再执行 **/
@PostConstruct
public void init(){
// 获取到所有的MQ定义
Set<Class<?>> set = ClassUtil.scanPackageBySuper(ClassUtil.getPackage(AbstractMQ.class), AbstractMQ.class);
for (Class<?> aClass : set) {
// 实例化
AbstractMQ amq = (AbstractMQ) ReflectUtil.newInstance(aClass);
// 注册Queue === new Queue(name) queue名称/bean名称 = mqName
rabbitMQBeanProcessor.beanDefinitionRegistry.registerBeanDefinition(amq.getMQName(),
BeanDefinitionBuilder.rootBeanDefinition(Queue.class).addConstructorArgValue(amq.getMQName()).getBeanDefinition());
// topic模式
if(amq.getMQType() == MQSendTypeEnum.TOPIC){
// 动态注册交换机 交换机名称/bean名称 = FANOUT_EXCHANGE_NAME_PREFIX + amq.getMQName()
rabbitMQBeanProcessor.beanDefinitionRegistry.registerBeanDefinition(FANOUT_EXCHANGE_NAME_PREFIX +amq.getMQName(),
BeanDefinitionBuilder.genericBeanDefinition(FanoutExchange.class, () ->{
// 普通FanoutExchange 交换机
return new FanoutExchange(FANOUT_EXCHANGE_NAME_PREFIX +amq.getMQName(),true,false);
// 支持 延迟的 FanoutExchange 交换机 配置无效果
// Map<String, Object> args = new HashMap<>();
// args.put("x-delayed-type", ExchangeTypes.FANOUT);
// return new CustomExchange(RabbitMQConfig.DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
).getBeanDefinition()
);
}else{
// 延迟交换机与Queue进行绑定 绑定Bean名称 = mqName_DelayedBind
rabbitMQBeanProcessor.beanDefinitionRegistry.registerBeanDefinition(amq.getMQName() + "_DelayedBind",
BeanDefinitionBuilder.genericBeanDefinition(Binding.class, () ->
BindingBuilder.bind(SpringBeansUtil.getBean(amq.getMQName(), Queue.class)).to(delayedExchange).with(amq.getMQName()).noargs()
).getBeanDefinition()
);
}
}
}
}

View File

@ -0,0 +1,71 @@
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.vender.rabbitmq;
import com.jeequan.jeepay.components.mq.constant.MQSendTypeEnum;
import com.jeequan.jeepay.components.mq.constant.MQVenderCS;
import com.jeequan.jeepay.components.mq.model.AbstractMQ;
import com.jeequan.jeepay.components.mq.vender.IMQSender;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* rabbitMQ 消息发送器的实现
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/7/23 16:52
*/
@Component
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.RABBIT_MQ)
public class RabbitMQSender implements IMQSender {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void send(AbstractMQ mqModel) {
if(mqModel.getMQType() == MQSendTypeEnum.QUEUE){
rabbitTemplate.convertAndSend(mqModel.getMQName(), mqModel.toMessage());
}else{
// fanout模式 routeKEY 没意义
this.rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE_NAME_PREFIX + mqModel.getMQName(), null, mqModel.toMessage());
}
}
@Override
public void send(AbstractMQ mqModel, int delay) {
if(mqModel.getMQType() == MQSendTypeEnum.QUEUE){
rabbitTemplate.convertAndSend(RabbitMQConfig.DELAYED_EXCHANGE_NAME, mqModel.getMQName(), mqModel.toMessage(), messagePostProcessor ->{
messagePostProcessor.getMessageProperties().setDelay(Math.toIntExact(delay * 1000));
return messagePostProcessor;
});
}else{
// fanout模式 routeKEY 没意义 没有延迟属性
this.rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE_NAME_PREFIX + mqModel.getMQName(), null, mqModel.toMessage());
}
}
}

View File

@ -0,0 +1,52 @@
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.vender.rabbitmq.receive;
import com.jeequan.jeepay.components.mq.executor.MqThreadExecutor;
import com.jeequan.jeepay.components.mq.model.PayOrderMchNotifyMQ;
import com.jeequan.jeepay.components.mq.vender.IMQMsgReceiver;
import com.jeequan.jeepay.components.mq.constant.MQVenderCS;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* rabbitMQ消息接收器仅在vender=rabbitMQ时 && 项目实现IMQReceiver接口时 进行实例化
* 业务 支付订单商户通知
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/7/22 17:06
*/
@Component
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.RABBIT_MQ)
@ConditionalOnBean(PayOrderMchNotifyMQ.IMQReceiver.class)
public class PayOrderMchNotifyRabbitMQReceiver implements IMQMsgReceiver {
@Autowired
private PayOrderMchNotifyMQ.IMQReceiver mqReceiver;
/** 接收 【 queue 】 类型的消息 **/
@Async(MqThreadExecutor.EXECUTOR_PAYORDER_MCH_NOTIFY)
@RabbitListener(queues = PayOrderMchNotifyMQ.MQ_NAME)
public void receiveMsg(String msg){
mqReceiver.receive(PayOrderMchNotifyMQ.parse(msg));
}
}

View File

@ -0,0 +1,66 @@
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.vender.rabbitmq.receive;
import com.jeequan.jeepay.components.mq.model.ResetAppConfigMQ;
import com.jeequan.jeepay.components.mq.vender.IMQMsgReceiver;
import com.jeequan.jeepay.components.mq.constant.MQVenderCS;
import com.jeequan.jeepay.components.mq.vender.rabbitmq.RabbitMQConfig;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* rabbitMQ消息接收器仅在vender=rabbitMQ时 && 项目实现IMQReceiver接口时 进行实例化
* 业务 更新系统配置参数
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/7/22 17:06
*/
@Component
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.RABBIT_MQ)
@ConditionalOnBean(ResetAppConfigMQ.IMQReceiver.class)
public class ResetAppConfigRabbitMQReceiver implements IMQMsgReceiver {
@Autowired
private ResetAppConfigMQ.IMQReceiver mqReceiver;
/** 接收 topic 类型的消息
*
* 注意
* RabbitMQ的广播模式fanout交换机 --全部的Queue
* 如果queue包含多个消费者 例如manager和payment的监听器是名称相同的queue下的消费者Consumers 两个消费者是工作模式且存在竞争关系 导致只能一个来消费
* 解决
* 每个topic的QUEUE都声明一个FANOUT交换机 消费者声明一个系统产生的随机队列绑定到这个交换机上然后往交换机发消息只要绑定到这个交换机上都能收到消息
* 参考 https://bbs.csdn.net/topics/392509262?list=70088931
*
* **/
@RabbitListener(
bindings = {@QueueBinding(value = @Queue(), // 注意这里不要定义队列名称,系统会随机产生
exchange = @Exchange(name = RabbitMQConfig.FANOUT_EXCHANGE_NAME_PREFIX + ResetAppConfigMQ.MQ_NAME,
type = ExchangeTypes.FANOUT ))} )
public void receiveMsg(String msg){
mqReceiver.receive(ResetAppConfigMQ.parse(msg));
}
}

View File

@ -25,13 +25,13 @@ import java.util.concurrent.ThreadPoolExecutor;
/*
* MQ 线程池配置
*
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/6/8 17:33
*/
@Configuration
@EnableAsync
//@Configuration
//@EnableAsync
public class MqThreadExecutor {
public static final String EXECUTOR_PAYORDER_MCH_NOTIFY = "mqQueue4PayOrderMchNotifyExecutor";
@ -59,4 +59,4 @@ public class MqThreadExecutor {
}
}