提交 7f531ed7 作者: 方治民

feat: 新增 STOMP 实现、真源定位系统 RabbitMQ 消息采集、消息推送模块初步设计

上级 cd93f11f
dependencies {
implementation project(':basic-common:core')
implementation project(':basic-common:util')
implementation project(':basic-common:redis')
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
implementation 'org.springframework.boot:spring-boot-starter-amqp'
// 本地依赖
implementation fileTree(dir: project.rootDir.getPath() + '\\libs', includes: ['*jar'])
// hutool-core
implementation "cn.hutool:hutool-core:${hutoolVersion}"
// fastjson
implementation "com.alibaba:fastjson:${fastJsonVersion}"
// https://github.com/vladmihalcea/hibernate-types
// hibernate-types-52
implementation "com.vladmihalcea:hibernate-types-52:${hibernateTypesVersion}"
}
/**
* @author Jim
* @version 0.1
* 2022/4/14 14:53
*/
package com.yiring.app;
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.push.domain;
import com.alibaba.fastjson.JSON;
import com.vladmihalcea.hibernate.type.json.JsonType;
import java.io.Serializable;
import java.time.LocalDateTime;
import javax.persistence.*;
import lombok.*;
import lombok.experimental.FieldDefaults;
import lombok.experimental.FieldNameConstants;
import org.hibernate.annotaions.Comment;
import org.hibernate.annotations.GenericGenerator;
import org.hibernate.annotations.Type;
import org.hibernate.annotations.TypeDef;
import org.hibernate.snowflake.SnowflakeId;
/**
* 推送消息日志
*
* @author Jim
* @version 0.1
* 2022/4/14 14:55
*/
@Getter
@Setter
@ToString
@Builder
@NoArgsConstructor
@AllArgsConstructor
@FieldNameConstants
@FieldDefaults(level = AccessLevel.PRIVATE)
@Entity
@TypeDef(name = "json", typeClass = JsonType.class)
@Table(
name = "SYS_PUSH_MESSAGE",
indexes = {
@Index(columnList = "type"),
@Index(columnList = "status"),
@Index(columnList = "source"),
@Index(columnList = "sourceId"),
@Index(columnList = "target"),
@Index(columnList = "targetId"),
@Index(columnList = "createTime"),
@Index(columnList = "pushTime"),
@Index(columnList = "updateTime"),
}
)
@Comment("推送消息日志")
public class PushMessage implements Serializable {
private static final long serialVersionUID = 1478640364892356248L;
@Comment("主键")
@Id
@GeneratedValue(generator = SnowflakeId.GENERATOR)
@GenericGenerator(name = SnowflakeId.GENERATOR, strategy = SnowflakeId.Strategy.LONG)
Long id;
@Comment("消息类型")
@Enumerated(EnumType.STRING)
Type type;
@Comment("消息来源")
String source;
@Comment("消息来源标识")
String sourceId;
@Comment("消息目标")
String target;
@Comment("消息目标标识")
String targetId;
@Comment("消息内容")
String content;
@Comment("扩展数据")
@org.hibernate.annotations.Type(type = "json")
@Column(columnDefinition = "json")
JSON extra;
@Comment("消息状态")
@Enumerated(EnumType.STRING)
Status status;
@Comment("推送反馈结果")
@Column(columnDefinition = "TEXT")
String result;
@Comment("重试次数")
Integer retryCount;
@Comment("推送时间")
LocalDateTime pushTime;
@Comment("更新时间")
LocalDateTime updateTime;
@Comment("创建时间")
LocalDateTime createTime;
@SuppressWarnings({ "unused" })
public enum Type {
WS("STOMP WebSocket 消息"),
APP("APP 消息"),
SMS("短信消息"),
EMAIL("邮件消息"),
CALL_TTS("TTS 文本合成语音消息(电话)"),
CALL_AUDIO("自定义音频消息(电话)"),
DEVICE_TONE("自定义音频消息(音柱)");
final String text;
Type(String text) {
this.text = text;
}
public String text() {
return this.text;
}
public String queue() {
return String.format("push.%s.queue", this.name());
}
}
@SuppressWarnings({ "unused" })
public enum Status {
SUCCESS("成功"),
FAIL("失败");
final String text;
Status(String text) {
this.text = text;
}
public String text() {
return this.text;
}
}
}
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.push.domain;
import java.io.Serializable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.stereotype.Repository;
/**
* @author Jim
* @version 0.1
* 2022/4/14 15:33
*/
@Repository
public interface PushMessageRepository
extends JpaRepository<PushMessage, Serializable>, JpaSpecificationExecutor<PushMessage> {}
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.push.rabbitmq;
import java.util.HashMap;
import java.util.Map;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* rabbitmq配置
*
* @author ifzm
* 2019/8/21 15:44
*/
@Configuration("PushRabbitConfig")
public class RabbitConfig {
public static final String PUSH_TOPIC_EXCHANGE = "push.topic.exchange";
public static final String PUSH_APP_QUEUE = "push.app.queue";
public static final String PUSH_WS_QUEUE = "push.ws.queue";
public static final String PUSH_SMS_QUEUE = "push.sms.queue";
public static final String PUSH_EMAIL_QUEUE = "push.email.queue";
public static final String PUSH_CALL_TTS_QUEUE = "push.call.tts.queue";
public static final String PUSH_CALL_AUDIO_QUEUE = "push.call.audio.queue";
public static final String PUSH_DEVICE_TONE_QUEUE = "push.device.tone.queue";
/**
* 订阅模式
*
* @return TopicExchange
*/
@Bean
TopicExchange exchange() {
return new TopicExchange(PUSH_TOPIC_EXCHANGE, true, false);
}
@Bean(PUSH_APP_QUEUE)
public Queue pushAppQueue() {
Map<String, Object> args = new HashMap<>(1);
args.put("x-message-ttl", 60 * 1000);
return new Queue(PUSH_APP_QUEUE, true, false, false, args);
}
@Bean
Binding bindingExchangeApp(@Qualifier(PUSH_APP_QUEUE) Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(PUSH_APP_QUEUE);
}
@Bean(PUSH_WS_QUEUE)
public Queue pushWebSocketQueue() {
Map<String, Object> args = new HashMap<>(1);
args.put("x-message-ttl", 30 * 1000);
return new Queue(PUSH_WS_QUEUE, true, false, false, args);
}
@Bean
Binding bindingExchangeWs(@Qualifier(PUSH_WS_QUEUE) Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(PUSH_WS_QUEUE);
}
@Bean(PUSH_SMS_QUEUE)
public Queue pushSmsQueue() {
Map<String, Object> args = new HashMap<>(1);
args.put("x-message-ttl", 30 * 1000);
return new Queue(PUSH_SMS_QUEUE, true, false, false, args);
}
@Bean
Binding bindingExchangeSms(@Qualifier(PUSH_SMS_QUEUE) Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(PUSH_SMS_QUEUE);
}
@Bean(PUSH_EMAIL_QUEUE)
public Queue pushEmailQueue() {
Map<String, Object> args = new HashMap<>(1);
args.put("x-message-ttl", 60 * 1000);
return new Queue(PUSH_EMAIL_QUEUE, true, false, false, args);
}
@Bean
Binding bindingExchangeEmail(@Qualifier(PUSH_EMAIL_QUEUE) Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(PUSH_EMAIL_QUEUE);
}
@Bean(PUSH_CALL_TTS_QUEUE)
public Queue pushCallTtsQueue() {
Map<String, Object> args = new HashMap<>(1);
args.put("x-message-ttl", 60 * 10 * 1000);
return new Queue(PUSH_CALL_TTS_QUEUE, true, false, false, args);
}
@Bean
Binding bindingExchangeCallTts(@Qualifier(PUSH_CALL_TTS_QUEUE) Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(PUSH_CALL_TTS_QUEUE);
}
@Bean(PUSH_CALL_AUDIO_QUEUE)
public Queue pushCallAudioQueue() {
Map<String, Object> args = new HashMap<>(1);
args.put("x-message-ttl", 60 * 10 * 1000);
return new Queue(PUSH_CALL_AUDIO_QUEUE, true, false, false, args);
}
@Bean
Binding bindingExchangeCallAudio(@Qualifier(PUSH_CALL_AUDIO_QUEUE) Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(PUSH_CALL_AUDIO_QUEUE);
}
@Bean(PUSH_DEVICE_TONE_QUEUE)
public Queue pushDeviceToneQueue() {
Map<String, Object> args = new HashMap<>(1);
args.put("x-message-ttl", 60 * 10 * 1000);
return new Queue(PUSH_DEVICE_TONE_QUEUE, true, false, false, args);
}
@Bean
Binding bindingExchangeDeviceTone(@Qualifier(PUSH_DEVICE_TONE_QUEUE) Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(PUSH_DEVICE_TONE_QUEUE);
}
}
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.push.rabbitmq;
import com.yiring.app.push.domain.PushMessageRepository;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
/**
* rabbitmq配置
*
* @author ifzm
* 2019/8/21 16:07
*/
@SuppressWarnings({ "unused" })
@Slf4j
@Component("PushRabbitReceiver")
@Transactional(rollbackFor = Exception.class)
public class RabbitReceiver {
@Resource
PushMessageRepository repository;
@RabbitHandler
@RabbitListener(queues = RabbitConfig.PUSH_WS_QUEUE)
public void receiverWebSocketMessage(Long id) {
log.info("接收到消息: " + id);
repository.findById(id).ifPresent(message -> log.info("{}", message));
}
}
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.push.service;
import com.alibaba.fastjson.JSONObject;
import com.yiring.app.push.domain.PushMessage;
/**
* 推送服务
*
* @author Jim
* @version 0.1
* 2022/4/14 16:01
*/
public interface PushService {
void push(PushMessage.Type type, JSONObject raw);
}
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.push.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.yiring.app.push.domain.PushMessage;
import com.yiring.app.push.domain.PushMessageRepository;
import com.yiring.app.push.service.PushService;
import java.time.LocalDateTime;
import javax.annotation.Resource;
import javax.transaction.Transactional;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Service;
/**
* 推送服务实现
*
* @author Jim
* @version 0.1
* 2022/4/14 16:08
*/
@Slf4j
@Service
@Transactional(rollbackOn = Exception.class)
public class PushServiceImpl implements PushService {
@Resource
PushMessageRepository repository;
@Resource
AmqpTemplate amqpTemplate;
@Override
public void push(PushMessage.Type type, JSONObject raw) {
PushMessage message = new PushMessage();
message.setType(type);
message.setExtra(raw);
message.setCreateTime(LocalDateTime.now());
repository.saveAndFlush(message);
amqpTemplate.convertAndSend(type.queue(), message.getId());
}
}
......@@ -8,7 +8,10 @@ bootJar {
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
implementation 'org.springframework.boot:spring-boot-starter-amqp'
implementation "org.springframework.boot:spring-boot-starter-websocket"
implementation "org.springframework.boot:spring-boot-starter-reactor-netty"
// 💬 Mock/Test Env
runtimeOnly 'com.h2database:h2'
// 💬 Prod/Dev Env
......@@ -20,6 +23,8 @@ dependencies {
implementation project(":basic-common:core")
implementation project(":basic-common:util")
implementation project(":basic-common:redis")
implementation project(":app-push")
// Optional: Doc
implementation project(":basic-common:doc")
......@@ -38,9 +43,14 @@ dependencies {
// fastjson
implementation "com.alibaba:fastjson:${fastJsonVersion}"
// hutool-core
implementation "cn.hutool:hutool-core:${hutoolVersion}"
// JPA 增加空间字段支持
// https://blog.wuwii.com/jpa-spatial.html
implementation "org.hibernate:hibernate-spatial:${hibernateSpatialVersion}"
implementation("org.hibernate:hibernate-spatial:${hibernateSpatialVersion}") {
exclude group: 'org.postgresql', module: 'postgresql'
}
// JTS 几何对象操作库
implementation "org.locationtech.jts:jts-core:${jtsVersion}"
......
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.config.zy;
import javax.annotation.Resource;
import lombok.AccessLevel;
import lombok.Data;
import lombok.experimental.FieldDefaults;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* 真源配置想
*
* @author ifzm
* @version 0.1
* 2019/9/25 21:31
*/
@Data
@FieldDefaults(level = AccessLevel.PRIVATE)
@Component
@ConfigurationProperties(prefix = "zy-config")
public class ZyConfigProperties {
String host;
@Resource
ZyConfigRabbitmq rabbitmq;
@Resource
ZyConfigOpen open;
@Data
@FieldDefaults(level = AccessLevel.PRIVATE)
@Component
@ConfigurationProperties(prefix = "zy-config.rabbitmq")
public static class ZyConfigRabbitmq {
String host;
int port;
String username;
String password;
String virtualHost;
String queueName;
}
@Data
@FieldDefaults(level = AccessLevel.PRIVATE)
@Component
@ConfigurationProperties(prefix = "zy-config.open")
public static class ZyConfigOpen {
String api;
String clientId;
String grantType;
String clientSecret;
String tenant;
}
@Data
@FieldDefaults(level = AccessLevel.PRIVATE)
@Component
@ConfigurationProperties(prefix = "zy-config.proxy")
public static class ZyConfigProxy {
String api;
String tenant;
@Resource
ZyConfigProxyClient client;
@Resource
ZyConfigProxyManage manage;
@Data
@FieldDefaults(level = AccessLevel.PRIVATE)
@Component
@ConfigurationProperties(prefix = "zy-config.proxy.client")
public static class ZyConfigProxyClient {
String username;
String password;
}
@Data
@FieldDefaults(level = AccessLevel.PRIVATE)
@Component
@ConfigurationProperties(prefix = "zy-config.proxy.manage")
public static class ZyConfigProxyManage {
String username;
String password;
}
}
}
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.config.zy;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 真源 RabbitMQ 订阅配置
*
* @author Jim
* @version 0.1
* 2022/4/13 17:05
*/
@Slf4j
@Configuration
public class ZyRabbitConfig {
@Resource
ZyConfigProperties.ZyConfigRabbitmq rabbitmq;
@Resource
ConfigurableApplicationContext context;
public static final String TEMPLATE_NAME = "zyRabbitTemplate";
public static final String CONNECTION_FACTORY_NAME = "zyRabbitConnectionFactory";
public static final String LISTENER_FACTORY_NAME = "zyRabbitListenerFactory";
/**
* 消息队列名称(必须要与配置文件中的 queue-name 完全一致)
* 规则: tenant_msg_${open.client-secret}_${open.client-id}
* 参见文档: 定位平台接口规范V3.0.1 - 通用版.pdf #6
*/
public static final String MESSAGE_QUEUES_NAME = "tenant_msg_12A14FDC_sc21080400";
@Bean(name = CONNECTION_FACTORY_NAME)
public CachingConnectionFactory zyConnectionFactory() {
return connectionFactory(
rabbitmq.getHost(),
rabbitmq.getPort(),
rabbitmq.getUsername(),
rabbitmq.getPassword(),
rabbitmq.getVirtualHost()
);
}
@Bean(name = TEMPLATE_NAME)
public RabbitTemplate secondRabbitTemplate(
@Qualifier(CONNECTION_FACTORY_NAME) CachingConnectionFactory connectionFactory
) {
return new RabbitTemplate(connectionFactory);
}
@Bean(name = LISTENER_FACTORY_NAME)
public SimpleRabbitListenerContainerFactory secondFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier(CONNECTION_FACTORY_NAME) CachingConnectionFactory connectionFactory
) {
// 检查队列名称是否与配置文件一致,避免监听错误
if (!MESSAGE_QUEUES_NAME.equals(rabbitmq.getQueueName())) {
log.error("队列名称不一致,请检查配置文件");
context.close();
return null;
}
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// 最小消费者数量
factory.setConcurrentConsumers(1);
// 最大消费者数量
factory.setMaxConcurrentConsumers(1);
// 预读取一条消息
factory.setPrefetchCount(1);
// 手动确认消息模式
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
configurer.configure(factory, connectionFactory);
return factory;
}
public CachingConnectionFactory connectionFactory(
String host,
int port,
String username,
String password,
String virtualHost
) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
return connectionFactory;
}
}
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.constant;
/**
* Redis Key 常量类
*
* @author fangzhimin
* 2018/9/4 15:51
*/
public interface RedisKey {
/**
* STOMP 在线用户关键数据
*/
String STOMP_ONLINE_USERS = "STOMP_ONLINE_USERS";
}
......@@ -2,8 +2,8 @@
package com.yiring.app.domain.location;
import com.alibaba.fastjson.JSONObject;
import com.yiring.app.config.converter.JSONObjectConverter;
import com.yiring.auth.domain.user.User;
import com.yiring.common.config.converter.JSONObjectConverter;
import java.io.Serializable;
import java.math.BigDecimal;
import java.time.LocalDateTime;
......
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.rabbitmq;
import org.springframework.context.annotation.Configuration;
/**
* rabbitmq配置
*
* @author ifzm
* 2019/8/21 15:44
*/
@Configuration
public class RabbitConfig {}
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.rabbitmq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
/**
* rabbitmq配置
*
* @author ifzm
* 2019/8/21 16:07
*/
@SuppressWarnings({ "unused" })
@Slf4j
@Component
@Transactional(rollbackFor = Exception.class)
public class RabbitReceiver {}
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.rabbitmq.zy;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.yiring.app.config.zy.ZyRabbitConfig;
import com.yiring.app.push.domain.PushMessage;
import com.yiring.app.push.service.PushService;
import java.io.IOException;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
/**
* 真源 RabbitMQ 消息监听(消息消费者)
*
* @author Jim
* @version 0.1
* 2022/4/13 17:13
*/
@Slf4j
@Component
public class ZyRabbitmqReceiver {
// TODO
// 1. 新增消息订阅定时任务,检查是否正常订阅了真源定位系统的消息
// 2. 如果没有所需的订阅记录,则发起消息订阅(见接口文档: 6.2)
// 3. 订阅 position(定位数据)、lowPower(低电量报警)、deviceStatus(设备状态)、keyWarning(按键报警)
@Resource
PushService pushService;
/**
* 订阅真源定位系统 RabbitMQ 推送过来的消息(主动订阅的一些消息类别)
* 参见: 定位平台接口规范V3.0.1 - 通用版.pdf #6
* @param msg 消息内容
* @param channel 消息通道
* @param message 消息主体
* @throws IOException 消息确认异常
*/
@RabbitHandler
@RabbitListener(
queues = ZyRabbitConfig.MESSAGE_QUEUES_NAME,
containerFactory = ZyRabbitConfig.LISTENER_FACTORY_NAME
)
public void listen(String msg, Channel channel, Message message) throws IOException {
// 手动确认消息已收到
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
try {
// 将消息转换成 JSON 格式
JSONObject info = JSON.parseObject(msg);
log.info("Receiver Message: {}", info);
// 解构消息内容
JSONObject data = info.getJSONObject("params");
String method = info.getString("method");
switch (method) {
// 实时定位
case "position":
processPositionMessage(data);
break;
// 设备低电量
case "lowPower":
processLowPowerMessage(data);
break;
// 设备状态变更
case "deviceStatus":
processDeviceStatusMessage(data);
break;
// 按键报警
case "keyWarning":
processKeyWarningMessage(data);
break;
default:
log.warn("Unknown Message Type: {}", info);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
/**
* 处理定位消息
* @param data 消息内容
*/
@Transactional(rollbackFor = Exception.class)
public void processPositionMessage(JSONObject data) {
// TODO
log.info("Position Message: {}", data);
// 包装消息
// TODO
// 1. 解析消息内容,进行围栏、出入标识判断等处理,将定位记录录入数据库
// 2. 创建一条需要进行消息推送的记录
// 3. 将记录推送的消息推送模块
// WebSocket 消息推送
pushService.push(PushMessage.Type.WS, data);
}
/**
* 处理低电量报警消息
* @param data 消息内容
*/
@Transactional(rollbackFor = Exception.class)
public void processLowPowerMessage(JSONObject data) {
// TODO
log.info("LowPower Message: {}", data);
}
/**
* 处理设备状态更新消息
* @param data 消息内容
*/
@Transactional(rollbackFor = Exception.class)
public void processDeviceStatusMessage(JSONObject data) {
// TODO
log.info("DeviceStatus Message: {}", data);
}
/**
* 处理按键报警消息
* @param data 消息内容
*/
@Transactional(rollbackFor = Exception.class)
public void processKeyWarningMessage(JSONObject data) {
// TODO
log.info("KeyWarning Message: {}", data);
}
}
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.stomp;
import com.yiring.app.constant.RedisKey;
import com.yiring.auth.domain.user.User;
import com.yiring.auth.util.Auths;
import com.yiring.common.core.Redis;
import java.util.LinkedList;
import java.util.Map;
import javax.annotation.Resource;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.messaging.support.NativeMessageHeaderAccessor;
import org.springframework.stereotype.Component;
/**
* ClientInboundChannelInterceptor
* 接收客户端消息的拦截器
*
* @author ifzm
* @version 0.1
* 2019/9/28 20:58
*/
@Slf4j
@Component
public class ClientInboundChannelInterceptor implements ChannelInterceptor {
@Resource
Redis redis;
@Resource
Auths auths;
@Override
public Message<?> preSend(@NonNull Message<?> message, @NonNull MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assert accessor != null;
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
Object raw = message.getHeaders().get(NativeMessageHeaderAccessor.NATIVE_HEADERS);
if (raw instanceof Map) {
StompPrincipal principal = new StompPrincipal();
principal.setSession(accessor.getSessionId());
Object tokens = ((Map<?, ?>) raw).get("token");
if (tokens instanceof LinkedList) {
String token = ((LinkedList<?>) tokens).getFirst().toString();
User user = auths.getUserByToken(token);
principal.setUser(user.getUsername());
principal.setType(StompPrincipal.Type.LOGIN_USER);
} else {
principal.setUser("Guest." + principal.getSession());
principal.setType(StompPrincipal.Type.GUEST_USER);
}
accessor.setUser(principal);
redis.hset(RedisKey.STOMP_ONLINE_USERS, principal.getSession(), principal);
log.info(
"STOMP Online Users: {} (incr: +1, session: {}, user: `{}`)",
redis.hsize(RedisKey.STOMP_ONLINE_USERS),
principal.getSession(),
principal.getUser()
);
}
} else if (StompCommand.DISCONNECT.equals(accessor.getCommand())) {
StompPrincipal principal = (StompPrincipal) accessor.getUser();
assert principal != null;
if (
!message.getHeaders().containsKey(SimpMessageHeaderAccessor.HEART_BEAT_HEADER) &&
principal.getSession() != null
) {
redis.hdel(RedisKey.STOMP_ONLINE_USERS, principal.getSession());
log.info(
"STOMP Online Users: {} (incr: -1, session: {}, user: `{}`)",
redis.hsize(RedisKey.STOMP_ONLINE_USERS),
principal.getSession(),
principal.getUser()
);
}
}
return message;
}
}
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.stomp;
import com.alibaba.fastjson.JSON;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.stereotype.Component;
/**
* ClientOutboundChannelInterceptor
* 向客户端输出消息的拦截器
*
* @author ifzm
* @version 0.1
* 2019/10/12 11:05
*/
@Slf4j
@Component
public class ClientOutboundChannelInterceptor implements ChannelInterceptor {
@Override
public Message<?> preSend(@NonNull Message<?> message, @NonNull MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assert accessor != null;
if (StompCommand.CONNECTED.equals(accessor.getCommand())) {
StompPrincipal principal = (StompPrincipal) accessor.getUser();
return MessageBuilder.createMessage(JSON.toJSONBytes(principal), message.getHeaders());
}
return message;
}
}
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.stomp;
import java.security.Principal;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import lombok.NonNull;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.event.SmartApplicationListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import org.springframework.web.socket.messaging.AbstractSubProtocolEvent;
import org.springframework.web.socket.messaging.SessionConnectedEvent;
import org.springframework.web.socket.messaging.SessionDisconnectEvent;
/**
* 自定义STOMP在线用户信息统计与操作
*
* @author ifzm
* @version 0.1
* 2019/10/10 21:19
*/
@Component
public class CustomStompUserRegistry implements StompUserRegistry, SmartApplicationListener {
/**
* sessionId, Principal
*/
private final Map<String, StompPrincipal> users = new ConcurrentHashMap<>();
private final Object userLock = new Object();
@Override
public boolean supportsEventType(@NonNull Class<? extends ApplicationEvent> eventType) {
return AbstractSubProtocolEvent.class.isAssignableFrom(eventType);
}
@Override
public void onApplicationEvent(@NonNull ApplicationEvent event) {
AbstractSubProtocolEvent subProtocolEvent = (AbstractSubProtocolEvent) event;
Message<?> message = subProtocolEvent.getMessage();
SimpMessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(
message,
SimpMessageHeaderAccessor.class
);
Assert.state(accessor != null, "No SimpMessageHeaderAccessor");
String sessionId = accessor.getSessionId();
Assert.state(sessionId != null, "No session id");
if (event instanceof SessionConnectedEvent) {
Principal user = subProtocolEvent.getUser();
synchronized (this.userLock) {
this.users.put(sessionId, (StompPrincipal) user);
}
} else if (event instanceof SessionDisconnectEvent) {
synchronized (this.userLock) {
this.users.remove(sessionId);
}
}
}
@Override
public Set<StompPrincipal> getUsers() {
return new HashSet<>(this.users.values());
}
@Override
public int getUserCount() {
return this.users.size();
}
@Override
public StompPrincipal getUser(String sessionId) {
return this.users.get(sessionId);
}
@Override
public void updateUser(String sessionId, StompPrincipal principal) {
synchronized (this.userLock) {
if (this.users.containsKey(sessionId)) {
this.users.put(sessionId, principal);
}
}
}
}
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.stomp;
import lombok.AccessLevel;
import lombok.Data;
import lombok.experimental.FieldDefaults;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* RabbitmqProperties
*
* @author ifzm
* @version 0.1
* 2019/9/25 21:31
*/
@Data
@FieldDefaults(level = AccessLevel.PRIVATE)
@Component
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitmqProperties {
int port;
String host;
String username;
String password;
}
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.stomp;
import com.alibaba.fastjson.JSONObject;
import java.io.Serializable;
import java.security.Principal;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.FieldDefaults;
import lombok.experimental.FieldNameConstants;
/**
* StompPrincipal
*
* @author ifzm
* @version 0.1
* 2019/9/28 21:28
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@FieldNameConstants
@FieldDefaults(level = AccessLevel.PRIVATE)
public class StompPrincipal implements Principal, Serializable {
private static final long serialVersionUID = 5351052642945180737L;
Type type;
String user;
String session;
JSONObject options;
@Override
public String getName() {
return this.session;
}
public enum Type {
/**
* 游客用户
*/
GUEST_USER,
/**
* 登录用户
*/
LOGIN_USER,
}
}
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.stomp;
import com.alibaba.fastjson.JSON;
import com.yiring.auth.domain.user.User;
import com.yiring.auth.util.Auths;
import com.yiring.common.core.Result;
import com.yiring.common.core.Status;
import com.yiring.common.util.Commons;
import java.util.Objects;
import java.util.Set;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.simp.user.SimpUser;
import org.springframework.messaging.simp.user.SimpUserRegistry;
import org.springframework.stereotype.Controller;
/**
* STOMP Receiver Controller
*
* @author ifzm
* @version 0.1
* 2019/9/28 23:13
*/
@Slf4j
@Controller
public class StompReceiver {
@Resource
Auths auths;
@Resource
SimpMessagingTemplate simpMessagingTemplate;
@Resource
SimpUserRegistry simpUserRegistry;
@Resource
StompUserRegistry stompUserRegistry;
/**
* 登录
*
* @param accessor StompHeaderAccessor
*/
@MessageMapping("/login")
public void login(StompHeaderAccessor accessor, String token) {
try {
User user = auths.getUserByToken(token);
StompPrincipal principal = (StompPrincipal) accessor.getUser();
assert principal != null;
principal.setType(StompPrincipal.Type.LOGIN_USER);
principal.setUser(user.getUsername());
accessor.setUser(principal);
stompUserRegistry.updateUser(accessor.getSessionId(), principal);
log.info("STOMP user `{}` login success.", principal.getUser());
} catch (Exception e) {
simpMessagingTemplate.convertAndSendToUser(
Objects.requireNonNull(accessor.getSessionId()),
"/topic/notice",
Result.no(Status.UNAUTHORIZED, "token 无效,登录失败!")
);
}
}
/**
* 更新用户状态
*
* @param accessor 访问器
*/
@MessageMapping("/state")
public void state(StompHeaderAccessor accessor, String message) {
log.info("收到来自 STOMP Client `/app/state` 消息:{}", message);
StompPrincipal principal = (StompPrincipal) accessor.getUser();
assert principal != null;
principal.setOptions(JSON.parseObject(message));
accessor.setUser(principal);
log.info("principal info: {}", principal);
stompUserRegistry.updateUser(accessor.getSessionId(), principal);
}
@MessageMapping("/test")
public void test(StompHeaderAccessor accessor, String message) {
log.info("收到来自 STOMP Client `/app/test` 消息:{}", message);
Set<SimpUser> users = simpUserRegistry.getUsers();
log.info("{}", users);
simpMessagingTemplate.convertAndSendToUser(
Objects.requireNonNull(accessor.getSessionId()),
"/topic/reply",
Result.ok(Commons.uuid())
);
}
}
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.stomp;
import java.util.Set;
/**
* STOMP 用户注册器
*
* @author ifzm
* @version 0.1
* 2019/10/10 21:57
*/
public interface StompUserRegistry {
/**
* 获取所有在线的用户信息
*
* @return 用户信息集合
*/
Set<StompPrincipal> getUsers();
/**
* 获取所有在线用户的数量
*
* @return 在线用户的数量
*/
int getUserCount();
/**
* 根据SessionId获取用户信息
*
* @param sessionId sessionId
* @return StompPrincipal
*/
StompPrincipal getUser(String sessionId);
/**
* 更新用户信息
*
* @param sessionId sessionId
* @param principal StompPrincipal
*/
void updateUser(String sessionId, StompPrincipal principal);
}
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.stomp;
import com.yiring.app.constant.RedisKey;
import com.yiring.common.core.Redis;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
/**
* WebSocketStompConfig
*
* @author ifzm
* @version 0.1
* 2019/9/25 20:12
*/
@Slf4j
@Configuration
@EnableScheduling
@EnableWebSocketMessageBroker
public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
@Resource
Redis redis;
@Resource
RabbitmqProperties rabbitmqProperties;
@Resource
ClientInboundChannelInterceptor clientInboundChannelInterceptor;
@Resource
ClientOutboundChannelInterceptor clientOutboundChannelInterceptor;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry
.addEndpoint("/stomp/ws")
.setAllowedOriginPatterns("*")
.addInterceptors(new HttpSessionHandshakeInterceptor())
.withSockJS();
log.info("Init STOMP Endpoints Success.");
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 启动前先删除掉可能存在的残留STOMP连接缓存数据
redis.del(RedisKey.STOMP_ONLINE_USERS);
log.info("Clear STOMP online user info cache of redis.");
registry.setPreservePublishOrder(true);
registry.setUserDestinationPrefix("/user");
registry.setApplicationDestinationPrefixes("/app");
registry
.enableStompBrokerRelay("/topic", "/queue")
.setRelayHost(rabbitmqProperties.getHost())
.setClientLogin(rabbitmqProperties.getUsername())
.setClientPasscode(rabbitmqProperties.getPassword())
.setSystemLogin(rabbitmqProperties.getUsername())
.setSystemPasscode(rabbitmqProperties.getPassword());
log.info("Init RabbitMQ STOMP MessageBroker Success.");
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(clientInboundChannelInterceptor);
}
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
registration.interceptors(clientOutboundChannelInterceptor);
}
}
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.util;
import lombok.experimental.UtilityClass;
import org.locationtech.jts.geom.Coordinate;
import org.locationtech.jts.geom.GeometryFactory;
import org.locationtech.jts.geom.Point;
/**
* 空间函数工具类
*
* @author Jim
* @version 0.1
* 2022/4/15 9:52
*/
@SuppressWarnings({ "unused" })
@UtilityClass
public class GeoUtils {
private final GeometryFactory factory = new GeometryFactory();
/**
* 创建点
*
* @param lon 经度
* @param lat 纬度
* @return 点
*/
public Point createPoint(double lon, double lat) {
return factory.createPoint(new Coordinate(lon, lat));
}
}
......@@ -16,6 +16,7 @@ spring:
properties:
hibernate:
format_sql: true
types.print.banner: false
redis:
database: 5
host: ${env.host}
......@@ -24,7 +25,7 @@ spring:
port: 5672
username: admin
password: 123456
virtual-host: /
# knife4j
knife4j:
......@@ -49,3 +50,34 @@ logging:
level:
# sql bind parameter
org.hibernate.type.descriptor.sql.BasicBinder: trace
# 真源定位系统相关配置
zy-config:
host: project.yz-online.com
# RabbitMQ 订阅配置
rabbitmq:
host: ${zy-config.host}
port: 672
username: admin
password: admin
virtual-host: /
queue-name: tenant_msg_${zy-config.open.client-secret}_${zy-config.open.client-id}
# 开放接口信息配置
open:
api: http://${zy-config.host}:789/positionApi
client-id: sc21080400
grant-type: client_credentials
client-secret: 12A14FDC
tenant: sc21080400
# 代理接口信息配置
proxy:
api: https://nl.yz-cloud.com
tenant: ts00000006
# 应用平台账户信息
client:
username: test1234
password: 123456
# 管理后台账户信息
manage:
username: test123
password: test123
/* (C) 2022 YiRing, Inc. */
package com.yiring.auth.util;
import cn.dev33.satoken.exception.NotLoginException;
import cn.dev33.satoken.stp.StpUtil;
import com.yiring.auth.domain.user.User;
import com.yiring.auth.domain.user.UserRepository;
import java.util.Objects;
import java.util.Optional;
import javax.annotation.Resource;
import org.springframework.stereotype.Component;
/**
* 认证工具类
*
* @author Jim
* @version 0.1
* 2022/4/8 17:34
*/
@Component
public class Auths {
@Resource
UserRepository userRepository;
/**
* 根据 Token 获取用户信息
* 如果用户未登录或校验失败会抛出 NotLoginException
* @param token token
* @return 用户信息
*/
public User getUserByToken(String token) {
Object id = StpUtil.getLoginIdByToken(token);
if (id == null) {
throw NotLoginException.newInstance(StpUtil.TYPE, null);
}
Optional<User> optional = userRepository.findById(Long.valueOf(Objects.toString(id)));
if (optional.isEmpty()) {
throw NotLoginException.newInstance(StpUtil.TYPE, NotLoginException.INVALID_TOKEN);
}
return optional.get();
}
}
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.config.converter;
package com.yiring.common.config.converter;
import com.alibaba.fastjson.JSONArray;
import javax.persistence.AttributeConverter;
......
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.config.converter;
package com.yiring.common.config.converter;
import com.alibaba.fastjson.JSONObject;
import javax.persistence.AttributeConverter;
......
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
}
/**
* @author Jim
* @version 0.1
* 2022/3/26 10:36
*/
package com.yiring.common;
......@@ -6,7 +6,7 @@ buildscript {
// https://mvnrepository.com/artifact/com.github.xiaoymin/knife4j-spring-boot-starter
knife4jVersion = '2.0.9'
// https://mvnrepository.com/artifact/io.swagger/swagger-annotations
swaggerAnnotationsVersion = '1.6.5'
swaggerAnnotationsVersion = '1.6.6'
// https://mvnrepository.com/artifact/org.hibernate.validator/hibernate-validator
hibernateValidatorVersion = '7.0.4.Final'
// https://mvnrepository.com/artifact/org.hibernate.orm/hibernate-spatial
......@@ -25,6 +25,8 @@ buildscript {
minioVersion = '8.3.7'
// https://mvnrepository.com/artifact/org.locationtech.jts/jts-core
jtsVersion = '1.18.2'
// https://mvnrepository.com/artifact/com.vladmihalcea/hibernate-types-52
hibernateTypesVersion = '2.16.0'
}
}
......
......@@ -6,9 +6,11 @@ pluginManagement {
}
rootProject.name = 'kshg-api'
include 'app'
include 'app-push'
include 'basic-auth'
include 'basic-common:core'
include 'basic-common:util'
include 'basic-common:doc'
include 'basic-common:minio'
include 'basic-common:redis'
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论