提交 8e6f4263 作者: 方治民

feat: 完善真源 xyz 转经纬度工具配置、重构定位消息监听队列采用动态配置方式兼容模拟与真实环境

上级 552903ac
......@@ -2,6 +2,8 @@
package com.yiring.app.domain.location;
import java.io.Serializable;
import java.util.List;
import java.util.Set;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.stereotype.Repository;
......@@ -9,9 +11,16 @@ import org.springframework.stereotype.Repository;
/**
* 定位标签JPA
* @author LJ-2204
* @date 2022/4/14
* 2022/4/14
*/
@Repository
public interface LocationTagRepository
extends JpaRepository<LocationTag, Serializable>, JpaSpecificationExecutor<LocationTag> {}
extends JpaRepository<LocationTag, Serializable>, JpaSpecificationExecutor<LocationTag> {
/**
* 根据标签号集合查询标签集合
* @param codes 标签号集合
* @return 标签集合
*/
List<LocationTag> findByCodeIn(Set<String> codes);
}
......@@ -5,7 +5,7 @@ import com.alibaba.fastjson.JSONObject;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.yiring.app.domain.location.LocationLog;
import com.yiring.app.domain.location.LocationLogRepository;
import com.yiring.app.rabbit.config.ZyRabbitConfig;
import com.yiring.app.rabbit.config.ZyConfigProperties;
import com.yiring.app.util.GeoUtils;
import com.yiring.common.constant.DateFormatter;
import java.time.LocalDateTime;
......@@ -37,6 +37,9 @@ public class MockZyMessageJob {
@Resource
LocationLogRepository locationLogRepository;
@Resource
ZyConfigProperties.ZyConfigRabbitmq rabbitmq;
@XxlJob("MockZyMessageHandler")
public void mockMessageHandler() {
log.info("MockZyMessageHandler: {}", LocalDateTime.now().format(DateFormatter.DATE_TIME));
......@@ -66,7 +69,7 @@ public class MockZyMessageJob {
}
public JSONObject send(JSONObject body) {
rabbitTemplate.convertAndSend(ZyRabbitConfig.MESSAGE_QUEUES_NAME, body.toJSONString());
rabbitTemplate.convertAndSend(rabbitmq.getQueueName(), body.toJSONString());
return body;
}
......
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.rabbit.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import com.yiring.app.rabbit.receiver.PositionMessageHandler;
import javax.annotation.Resource;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
......@@ -16,8 +18,12 @@ import org.springframework.context.annotation.Configuration;
* 2019/8/21 15:44
*/
@Configuration
@ConditionalOnProperty(prefix = "zy-config.rabbitmq", value = "mock", havingValue = "true")
public class MockZyRabbitConfig {
@Resource
ZyConfigProperties.ZyConfigRabbitmq rabbitmq;
/**
* 消息交换机
*/
......@@ -33,16 +39,20 @@ public class MockZyRabbitConfig {
return new TopicExchange(ZY_TOPIC_EXCHANGE, true, false);
}
@Bean(ZyRabbitConfig.MESSAGE_QUEUES_NAME)
public Queue mockMessageQueue() {
return new Queue(ZyRabbitConfig.MESSAGE_QUEUES_NAME, true, false, false);
}
@Bean
Binding bindingExchangeMock(
@Qualifier(ZyRabbitConfig.MESSAGE_QUEUES_NAME) Queue queue,
@Qualifier(ZyRabbitConfig.QUEUE_BEAN_NAME) Queue queue,
@Qualifier(ZY_TOPIC_EXCHANGE) TopicExchange exchange
) {
return BindingBuilder.bind(queue).to(exchange).with(ZyRabbitConfig.MESSAGE_QUEUES_NAME);
return BindingBuilder.bind(queue).to(exchange).with(rabbitmq.getQueueName());
}
@Bean(ZyRabbitConfig.LISTENER_CONTAINER_NAME)
public SimpleMessageListenerContainer simpleMessageListenerContainer(
PositionMessageHandler handler,
@Qualifier(ZyRabbitConfig.QUEUE_BEAN_NAME) Queue queue,
@Qualifier(RabbitConfig.CONNECTION_FACTORY_NAME) ConnectionFactory connectionFactory
) {
return ZyRabbitConfig.buildPositionMessageListenerContainer(handler, queue, connectionFactory);
}
}
......@@ -2,15 +2,11 @@
package com.yiring.app.rabbit.config;
import javax.annotation.Resource;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
......@@ -43,19 +39,6 @@ public class RabbitConfig {
@Bean
@Primary
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier(CONNECTION_FACTORY_NAME) ConnectionFactory connectionFactory
) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// 手动确认消息模式
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean
@Primary
public RabbitTemplate rabbitTemplate(@Qualifier(CONNECTION_FACTORY_NAME) ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
......
......@@ -36,6 +36,7 @@ public class ZyConfigProperties {
@ConfigurationProperties(prefix = "zy-config.rabbitmq")
public static class ZyConfigRabbitmq {
boolean mock;
boolean enabled;
String host;
int port;
......
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.rabbit.config;
import com.yiring.app.rabbit.receiver.PositionMessageHandler;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
......@@ -31,19 +29,9 @@ public class ZyRabbitConfig {
@Resource
ZyConfigProperties.ZyConfigRabbitmq rabbitmq;
@Resource
ConfigurableApplicationContext context;
public static final String CONNECTION_FACTORY_NAME = "zyRabbitConnectionFactory";
public static final String LISTENER_FACTORY_NAME = "zyRabbitListenerFactory";
public static final String TEMPLATE_NAME = "zyRabbitTemplate";
/**
* 消息队列名称(必须要与配置文件中的 queue-name 完全一致)
* 规则: tenant_msg_${open.client-secret}_${open.client-id}
* 参见文档: 定位平台接口规范V3.0.1 - 通用版.pdf #6
*/
public static final String MESSAGE_QUEUES_NAME = "tenant_msg_12A14FDC_sc21080400";
public static final String LISTENER_CONTAINER_NAME = "zyPositionMessageListenerContainer";
public static final String QUEUE_BEAN_NAME = "messageHandlerQueue";
@Bean(CONNECTION_FACTORY_NAME)
public ConnectionFactory zyConnectionFactory() {
......@@ -56,27 +44,35 @@ public class ZyRabbitConfig {
);
}
@Bean(LISTENER_FACTORY_NAME)
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Bean(LISTENER_CONTAINER_NAME)
@ConditionalOnProperty(prefix = "zy-config.rabbitmq", value = "mock", havingValue = "false")
public SimpleMessageListenerContainer simpleMessageListenerContainer(
PositionMessageHandler handler,
@Qualifier(QUEUE_BEAN_NAME) Queue queue,
@Qualifier(CONNECTION_FACTORY_NAME) ConnectionFactory connectionFactory
) {
// 检查队列名称是否与配置文件一致,避免监听错误
if (!MESSAGE_QUEUES_NAME.equals(rabbitmq.getQueueName())) {
log.error("队列名称不一致,请检查配置文件");
context.close();
return null;
return buildPositionMessageListenerContainer(handler, queue, connectionFactory);
}
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// 手动确认消息模式
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
configurer.configure(factory, connectionFactory);
return factory;
@Bean(QUEUE_BEAN_NAME)
public Queue messageHandlerQueue() {
return new Queue(rabbitmq.getQueueName(), true, false, false);
}
@Bean(TEMPLATE_NAME)
public RabbitTemplate rabbitTemplate(@Qualifier(CONNECTION_FACTORY_NAME) ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
public static SimpleMessageListenerContainer buildPositionMessageListenerContainer(
PositionMessageHandler handler,
Queue queue,
ConnectionFactory connectionFactory
) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setBatchSize(10);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
container.setExposeListenerChannel(true);
container.setMessageListener(handler);
container.addQueues(queue);
return container;
}
}
......@@ -2,31 +2,29 @@
package com.yiring.app.rabbit.receiver;
import com.rabbitmq.client.Channel;
import com.yiring.app.rabbit.config.ZyRabbitConfig;
import com.yiring.app.service.message.ZyMessageService;
import com.yiring.app.service.message.PositionMessageService;
import com.yiring.common.annotation.Times;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
/**
* 真源 RabbitMQ 消息监听(消息消费者)
* 真源消息监听处理器
*
* @author Jim
* @version 0.1
* 2022/4/13 17:13
* 2022/5/11 16:35
*/
@Slf4j
@Component
@Configuration
@ConditionalOnProperty(prefix = "zy-config.rabbitmq", value = "enabled", havingValue = "true")
public class ZyRabbitReceiver {
@Transactional(rollbackFor = Exception.class)
public class PositionMessageHandler implements ChannelAwareMessageListener {
// TODO
// 1. 新增消息订阅定时任务,检查是否正常订阅了真源定位系统的消息
......@@ -34,26 +32,15 @@ public class ZyRabbitReceiver {
// 3. 订阅 position(定位数据)、lowPower(低电量报警)、deviceStatus(设备状态)、keyWarning(按键报警)
@Resource
ZyMessageService zyMessageService;
PositionMessageService positionMessageService;
/**
* 订阅真源定位系统 RabbitMQ 推送过来的消息(主动订阅的一些消息类别)
* 参见: 定位平台接口规范V3.0.1 - 通用版.pdf #6
* @param msg 消息内容
* @param channel 消息通道
* @param message 消息主体
* @throws IOException 消息确认异常
*/
@RabbitHandler
// @RabbitListener(
// queues = ZyRabbitConfig.MESSAGE_QUEUES_NAME,
// containerFactory = ZyRabbitConfig.LISTENER_FACTORY_NAME
// )
@RabbitListener(queues = ZyRabbitConfig.MESSAGE_QUEUES_NAME, containerFactory = "rabbitListenerContainerFactory")
public void listen(String msg, Channel channel, Message message) throws IOException {
// 消费消息
zyMessageService.consume(msg);
@Times
@Override
public void onMessage(Message message, Channel channel) throws IOException {
// 手动确认消息已收到
assert channel != null;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 消费消息
positionMessageService.consume(new String(message.getBody(), StandardCharsets.UTF_8));
}
}
......@@ -8,7 +8,7 @@ package com.yiring.app.service.message;
* @version 0.1
* 2022/5/9 10:16
*/
public interface ZyMessageService {
public interface PositionMessageService {
/**
* 消费消息
* @param message 消息内容
......
......@@ -7,7 +7,7 @@ import com.alibaba.fastjson.JSONObject;
import com.yiring.app.domain.location.*;
import com.yiring.app.domain.log.ZyRealtimeLog;
import com.yiring.app.domain.log.ZyRealtimeLogRepository;
import com.yiring.app.service.message.ZyMessageService;
import com.yiring.app.service.message.PositionMessageService;
import com.yiring.app.util.GeoUtils;
import com.yiring.auth.domain.user.User;
import com.yiring.common.annotation.Times;
......@@ -37,7 +37,7 @@ import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Service
@Transactional(rollbackFor = Exception.class)
public class ZyMessageServiceImpl implements ZyMessageService {
public class PositionMessageServiceImpl implements PositionMessageService {
@Resource
LocationTagRepository locationTagRepository;
......
......@@ -4,6 +4,7 @@ package com.yiring.app.util.zy;
import com.alibaba.fastjson.JSONObject;
/**
* 从真源拿到的引擎计算经纬度的工具类
* @author Jim
* @version 0.1
* 2022/5/9 16:41
......@@ -16,8 +17,8 @@ public class LonLatUtil {
public static JSONObject getRoot() {
JSONObject root = new JSONObject();
root.put("lon", 0D);
root.put("lat", 0D);
root.put("lon", 112.85893346021206);
root.put("lat", 30.473384230484854);
root.put("x", 0D);
root.put("y", 0D);
root.put("altitude", 0D);
......
......@@ -91,6 +91,7 @@ zy-config:
host: project.yz-online.com
# RabbitMQ 订阅配置
rabbitmq:
mock: false
enabled: true
host: ${zy-config.host}
port: 672
......
......@@ -25,9 +25,9 @@ import org.springframework.stereotype.Component;
public class TimesAspect {
@Pointcut("@annotation(com.yiring.common.annotation.Times)")
public void pointCut() {}
public void times() {}
@Around("pointCut()")
@Around("times()")
public Object around(ProceedingJoinPoint point) throws Throwable {
long start = System.currentTimeMillis();
Object result = point.proceed();
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论