提交 264c18a3 作者: 方治民

feat: 调整 RabbitMQ 消息订阅及推送实现、数据表集成 BasicEntity 等

上级 cb49a3ae
...@@ -9,8 +9,13 @@ dependencies { ...@@ -9,8 +9,13 @@ dependencies {
// 本地依赖 // 本地依赖
implementation fileTree(dir: project.rootDir.getPath() + '\\libs', includes: ['*jar']) implementation fileTree(dir: project.rootDir.getPath() + '\\libs', includes: ['*jar'])
// 文档及参数校验
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation "io.swagger:swagger-annotations:${swaggerAnnotationsVersion}"
// hutool-core // hutool-core
implementation "cn.hutool:hutool-core:${hutoolVersion}" implementation "cn.hutool:hutool-core:${hutoolVersion}"
implementation "cn.hutool:hutool-http:${hutoolVersion}"
// fastjson // fastjson
implementation "com.alibaba:fastjson:${fastJsonVersion}" implementation "com.alibaba:fastjson:${fastJsonVersion}"
...@@ -19,4 +24,15 @@ dependencies { ...@@ -19,4 +24,15 @@ dependencies {
// hibernate-types-55 // hibernate-types-55
implementation "com.vladmihalcea:hibernate-types-55:${hibernateTypesVersion}" implementation "com.vladmihalcea:hibernate-types-55:${hibernateTypesVersion}"
// ======================= 推送相关 =======================
// 个推 SDK(App 推送)
// https://mvnrepository.com/artifact/com.getui.push/restful-sdk
implementation 'com.getui.push:restful-sdk:1.0.0.6'
// 阿里云短信 SDK(SMS 推送)
// https://mvnrepository.com/artifact/com.aliyun/dysmsapi20170525
implementation 'com.aliyun:dysmsapi20170525:2.0.9'
// 邮件 SDK(EMAIL 推送)
implementation 'org.springframework.boot:spring-boot-starter-mail'
// ======================================================
} }
/* (C) 2022 YiRing, Inc. */ /* (C) 2022 YiRing, Inc. */
package com.yiring.app.push.domain; package com.yiring.app.push.domain;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject;
import com.vladmihalcea.hibernate.type.json.JsonType; import com.vladmihalcea.hibernate.type.json.JsonBinaryType;
import com.yiring.common.domain.BasicEntity; import com.yiring.common.domain.BasicEntity;
import java.io.Serial; import java.io.Serial;
import java.io.Serializable; import java.io.Serializable;
...@@ -34,7 +34,7 @@ import org.hibernate.annotations.TypeDef; ...@@ -34,7 +34,7 @@ import org.hibernate.annotations.TypeDef;
@FieldNameConstants @FieldNameConstants
@FieldDefaults(level = AccessLevel.PRIVATE) @FieldDefaults(level = AccessLevel.PRIVATE)
@Entity @Entity
@TypeDef(name = "json", typeClass = JsonType.class) @TypeDef(name = "jsonb", typeClass = JsonBinaryType.class)
@Table( @Table(
name = "SYS_PUSH_MESSAGE", name = "SYS_PUSH_MESSAGE",
indexes = { indexes = {
...@@ -73,18 +73,18 @@ public class PushMessage extends BasicEntity implements Serializable { ...@@ -73,18 +73,18 @@ public class PushMessage extends BasicEntity implements Serializable {
String content; String content;
@Comment("扩展数据") @Comment("扩展数据")
@org.hibernate.annotations.Type(type = "json") @org.hibernate.annotations.Type(type = "jsonb")
@Column(columnDefinition = "json") @Column(columnDefinition = "jsonb")
JSON extra; JSONObject extra;
@Comment("消息状态") @Comment("消息状态")
@Enumerated(EnumType.STRING) @Enumerated(EnumType.STRING)
Status status; Status status;
@Comment("推送反馈结果") @Comment("推送反馈结果")
@org.hibernate.annotations.Type(type = "json") @org.hibernate.annotations.Type(type = "jsonb")
@Column(columnDefinition = "json") @Column(columnDefinition = "jsonb")
JSON result; JSONObject result;
@Comment("重试次数") @Comment("重试次数")
Integer retryCount; Integer retryCount;
...@@ -94,7 +94,7 @@ public class PushMessage extends BasicEntity implements Serializable { ...@@ -94,7 +94,7 @@ public class PushMessage extends BasicEntity implements Serializable {
@SuppressWarnings({ "unused" }) @SuppressWarnings({ "unused" })
public enum Type { public enum Type {
WS("STOMP WebSocket 消息"), WEBHOOK("webhook"),
APP("APP 消息"), APP("APP 消息"),
SMS("短信消息"), SMS("短信消息"),
EMAIL("邮件消息"), EMAIL("邮件消息"),
...@@ -113,7 +113,7 @@ public class PushMessage extends BasicEntity implements Serializable { ...@@ -113,7 +113,7 @@ public class PushMessage extends BasicEntity implements Serializable {
} }
public String queue() { public String queue() {
return String.format("push.%s.queue", this.name()); return String.format("push.%s.queue", this.name().toLowerCase());
} }
} }
......
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.push.param;
import io.swagger.annotations.ApiModel;
import lombok.*;
import lombok.experimental.FieldDefaults;
/**
* App 推送参数
*
* @author Jim
* @version 0.1
* 2022/4/21 10:27
*/
@ApiModel("A")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@FieldDefaults(level = AccessLevel.PRIVATE)
public class PushAppParam {
String webhook;
}
...@@ -20,14 +20,37 @@ import org.springframework.context.annotation.Configuration; ...@@ -20,14 +20,37 @@ import org.springframework.context.annotation.Configuration;
@Configuration @Configuration
public class PushRabbitConfig { public class PushRabbitConfig {
/**
* 消息交换机
*/
public static final String PUSH_TOPIC_EXCHANGE = "push.topic.exchange"; public static final String PUSH_TOPIC_EXCHANGE = "push.topic.exchange";
/**
* WebHook 推送队列
*/
public static final String PUSH_WEBHOOK_QUEUE = "push.webhook.queue";
/**
* App 消息推送队列
*/
public static final String PUSH_APP_QUEUE = "push.app.queue"; public static final String PUSH_APP_QUEUE = "push.app.queue";
public static final String PUSH_WS_QUEUE = "push.ws.queue"; /**
* 短信推送队列
*/
public static final String PUSH_SMS_QUEUE = "push.sms.queue"; public static final String PUSH_SMS_QUEUE = "push.sms.queue";
public static final String PUSH_EMAIL_QUEUE = "push.email.queue"; /**
* 邮箱推送队列
*/
public static final String PUSH_MAIL_QUEUE = "push.mail.queue";
/**
* TTS 语音电话队列
*/
public static final String PUSH_CALL_TTS_QUEUE = "push.call.tts.queue"; public static final String PUSH_CALL_TTS_QUEUE = "push.call.tts.queue";
/**
* 语音电话推送队列
*/
public static final String PUSH_CALL_AUDIO_QUEUE = "push.call.audio.queue"; public static final String PUSH_CALL_AUDIO_QUEUE = "push.call.audio.queue";
/**
* 设备广播推送队列
*/
public static final String PUSH_DEVICE_TONE_QUEUE = "push.device.tone.queue"; public static final String PUSH_DEVICE_TONE_QUEUE = "push.device.tone.queue";
/** /**
...@@ -52,16 +75,16 @@ public class PushRabbitConfig { ...@@ -52,16 +75,16 @@ public class PushRabbitConfig {
return BindingBuilder.bind(queue).to(exchange).with(PUSH_APP_QUEUE); return BindingBuilder.bind(queue).to(exchange).with(PUSH_APP_QUEUE);
} }
@Bean(PUSH_WS_QUEUE) @Bean(PUSH_WEBHOOK_QUEUE)
public Queue pushWebSocketQueue() { public Queue pushWebHookQueue() {
Map<String, Object> args = new HashMap<>(1); Map<String, Object> args = new HashMap<>(1);
args.put("x-message-ttl", 30 * 1000); args.put("x-message-ttl", 30 * 1000);
return new Queue(PUSH_WS_QUEUE, true, false, false, args); return new Queue(PUSH_WEBHOOK_QUEUE, true, false, false, args);
} }
@Bean @Bean
Binding bindingExchangeWs(@Qualifier(PUSH_WS_QUEUE) Queue queue, TopicExchange exchange) { Binding bindingExchangeWebHook(@Qualifier(PUSH_WEBHOOK_QUEUE) Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(PUSH_WS_QUEUE); return BindingBuilder.bind(queue).to(exchange).with(PUSH_WEBHOOK_QUEUE);
} }
@Bean(PUSH_SMS_QUEUE) @Bean(PUSH_SMS_QUEUE)
...@@ -76,16 +99,16 @@ public class PushRabbitConfig { ...@@ -76,16 +99,16 @@ public class PushRabbitConfig {
return BindingBuilder.bind(queue).to(exchange).with(PUSH_SMS_QUEUE); return BindingBuilder.bind(queue).to(exchange).with(PUSH_SMS_QUEUE);
} }
@Bean(PUSH_EMAIL_QUEUE) @Bean(PUSH_MAIL_QUEUE)
public Queue pushEmailQueue() { public Queue pushMailQueue() {
Map<String, Object> args = new HashMap<>(1); Map<String, Object> args = new HashMap<>(1);
args.put("x-message-ttl", 60 * 1000); args.put("x-message-ttl", 60 * 1000);
return new Queue(PUSH_EMAIL_QUEUE, true, false, false, args); return new Queue(PUSH_MAIL_QUEUE, true, false, false, args);
} }
@Bean @Bean
Binding bindingExchangeEmail(@Qualifier(PUSH_EMAIL_QUEUE) Queue queue, TopicExchange exchange) { Binding bindingExchangeMail(@Qualifier(PUSH_MAIL_QUEUE) Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(PUSH_EMAIL_QUEUE); return BindingBuilder.bind(queue).to(exchange).with(PUSH_MAIL_QUEUE);
} }
@Bean(PUSH_CALL_TTS_QUEUE) @Bean(PUSH_CALL_TTS_QUEUE)
......
...@@ -26,10 +26,61 @@ public class PushRabbitReceiver { ...@@ -26,10 +26,61 @@ public class PushRabbitReceiver {
PushMessageRepository repository; PushMessageRepository repository;
@RabbitHandler @RabbitHandler
@RabbitListener(queues = PushRabbitConfig.PUSH_WS_QUEUE) @RabbitListener(queues = PushRabbitConfig.PUSH_WEBHOOK_QUEUE)
public void receiverWebSocketMessage(Long id) { public void receiverWebHookMessage(Long id) {
log.info("接收到消息: " + id); log.info("[WebHook] Message: " + id);
repository.findById(id).ifPresent(message -> log.info("{}", message)); repository.findById(id).ifPresent(message -> log.info("{}", message));
// TODO
// 实现调用 WebHook,基于 WebHook API 发起 HTTPs 请求
}
@RabbitHandler
@RabbitListener(queues = PushRabbitConfig.PUSH_APP_QUEUE)
public void receiverAppMessage(Long id) {
log.info("[App] Message: " + id);
// TODO
// 实现推送消息到 App,基于个推 API
// https://github.com/GetuiLaboratory/getui-pushapi-java-client-v2
}
@RabbitHandler
@RabbitListener(queues = PushRabbitConfig.PUSH_SMS_QUEUE)
public void receiverSmsMessage(Long id) {
log.info("[SMS] Message: " + id);
// TODO
// 实现发送短信,基于阿里云短信平台 API
}
@RabbitHandler
@RabbitListener(queues = PushRabbitConfig.PUSH_MAIL_QUEUE)
public void receiverMailMessage(Long id) {
log.info("[Mail] Message: " + id);
// TODO
// 实现发送邮件,基于邮件配置
}
@RabbitHandler
@RabbitListener(queues = PushRabbitConfig.PUSH_CALL_TTS_QUEUE)
public void receiverTtsMessage(Long id) {
log.info("[Phone TTS] Message: " + id);
// TODO
// 实现拨打电话播放 TTS 转语音,基于壹润外呼系统 API
}
@RabbitHandler
@RabbitListener(queues = PushRabbitConfig.PUSH_CALL_AUDIO_QUEUE)
public void receiverAudioMessage(Long id) {
log.info("[Phone Audio] Message: " + id);
// TODO
// 实现拨打电话播放音频文件,基于壹润外呼系统 API
}
@RabbitHandler
@RabbitListener(queues = PushRabbitConfig.PUSH_DEVICE_TONE_QUEUE)
public void receiverToneMessage(Long id) {
log.info("[Device Tone] Message: " + id);
// TODO
// 实现播放设备铃声,基于音柱广播设备 API
// SDK 暂未拿到,设备未采购,未进行联调
} }
} }
...@@ -3,6 +3,7 @@ package com.yiring.app.push.service; ...@@ -3,6 +3,7 @@ package com.yiring.app.push.service;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.yiring.app.push.domain.PushMessage; import com.yiring.app.push.domain.PushMessage;
import com.yiring.app.push.param.PushAppParam;
/** /**
* 推送服务 * 推送服务
...@@ -13,4 +14,10 @@ import com.yiring.app.push.domain.PushMessage; ...@@ -13,4 +14,10 @@ import com.yiring.app.push.domain.PushMessage;
*/ */
public interface PushService { public interface PushService {
void push(PushMessage.Type type, JSONObject raw); void push(PushMessage.Type type, JSONObject raw);
/**
* 推送到 App
* @param param 参数
*/
void app(PushAppParam param);
} }
...@@ -4,12 +4,13 @@ package com.yiring.app.push.service.impl; ...@@ -4,12 +4,13 @@ package com.yiring.app.push.service.impl;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.yiring.app.push.domain.PushMessage; import com.yiring.app.push.domain.PushMessage;
import com.yiring.app.push.domain.PushMessageRepository; import com.yiring.app.push.domain.PushMessageRepository;
import com.yiring.app.push.param.PushAppParam;
import com.yiring.app.push.rabbitmq.PushRabbitConfig; import com.yiring.app.push.rabbitmq.PushRabbitConfig;
import com.yiring.app.push.service.PushService; import com.yiring.app.push.service.PushService;
import javax.annotation.Resource; import javax.annotation.Resource;
import javax.transaction.Transactional; import javax.transaction.Transactional;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
/** /**
...@@ -29,7 +30,7 @@ public class PushServiceImpl implements PushService { ...@@ -29,7 +30,7 @@ public class PushServiceImpl implements PushService {
PushMessageRepository repository; PushMessageRepository repository;
@Resource @Resource
AmqpTemplate amqpTemplate; RabbitTemplate rabbitTemplate;
@Override @Override
public void push(PushMessage.Type type, JSONObject raw) { public void push(PushMessage.Type type, JSONObject raw) {
...@@ -37,6 +38,10 @@ public class PushServiceImpl implements PushService { ...@@ -37,6 +38,10 @@ public class PushServiceImpl implements PushService {
message.setType(type); message.setType(type);
message.setExtra(raw); message.setExtra(raw);
repository.saveAndFlush(message); repository.saveAndFlush(message);
amqpTemplate.convertAndSend(PushRabbitConfig.PUSH_TOPIC_EXCHANGE, type.queue(), message.getId());
rabbitTemplate.convertAndSend(PushRabbitConfig.PUSH_TOPIC_EXCHANGE, type.queue(), message.getId());
} }
@Override
public void app(PushAppParam param) {}
} }
/* (C) 2022 YiRing, Inc. */ /* (C) 2022 YiRing, Inc. */
package com.yiring.app.domain.location; package com.yiring.app.domain.location;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject;
import com.vladmihalcea.hibernate.type.json.JsonType; import com.vladmihalcea.hibernate.type.json.JsonType;
import com.yiring.auth.domain.user.User; import com.yiring.auth.domain.user.User;
import java.io.Serial; import java.io.Serial;
...@@ -116,7 +116,7 @@ public class LocationLog implements Serializable { ...@@ -116,7 +116,7 @@ public class LocationLog implements Serializable {
@Comment("原始数据") @Comment("原始数据")
@org.hibernate.annotations.Type(type = "json") @org.hibernate.annotations.Type(type = "json")
@Column(columnDefinition = "json") @Column(columnDefinition = "json")
JSON raw; JSONObject raw;
@Comment("创建时间") @Comment("创建时间")
LocalDateTime createTime; LocalDateTime createTime;
......
...@@ -3,6 +3,7 @@ package com.yiring.app.domain.location; ...@@ -3,6 +3,7 @@ package com.yiring.app.domain.location;
import com.yiring.auth.domain.user.User; import com.yiring.auth.domain.user.User;
import com.yiring.common.annotation.FieldMapping; import com.yiring.common.annotation.FieldMapping;
import com.yiring.common.domain.BasicEntity;
import java.io.Serial; import java.io.Serial;
import java.io.Serializable; import java.io.Serializable;
import java.time.LocalDateTime; import java.time.LocalDateTime;
...@@ -11,10 +12,9 @@ import javax.persistence.*; ...@@ -11,10 +12,9 @@ import javax.persistence.*;
import lombok.*; import lombok.*;
import lombok.experimental.FieldDefaults; import lombok.experimental.FieldDefaults;
import lombok.experimental.FieldNameConstants; import lombok.experimental.FieldNameConstants;
import lombok.experimental.SuperBuilder;
import org.hibernate.Hibernate; import org.hibernate.Hibernate;
import org.hibernate.annotations.Comment; import org.hibernate.annotations.Comment;
import org.hibernate.annotations.GenericGenerator;
import org.hibernate.snowflake.SnowflakeId;
/** /**
* 定位标签 * 定位标签
...@@ -30,7 +30,7 @@ import org.hibernate.snowflake.SnowflakeId; ...@@ -30,7 +30,7 @@ import org.hibernate.snowflake.SnowflakeId;
@Getter @Getter
@Setter @Setter
@ToString @ToString
@Builder @SuperBuilder(toBuilder = true)
@NoArgsConstructor @NoArgsConstructor
@AllArgsConstructor @AllArgsConstructor
@FieldNameConstants @FieldNameConstants
...@@ -47,17 +47,11 @@ import org.hibernate.snowflake.SnowflakeId; ...@@ -47,17 +47,11 @@ import org.hibernate.snowflake.SnowflakeId;
} }
) )
@Comment("定位标签") @Comment("定位标签")
public class LocationTag implements Serializable { public class LocationTag extends BasicEntity implements Serializable {
@Serial @Serial
private static final long serialVersionUID = 5419734189897829250L; private static final long serialVersionUID = 5419734189897829250L;
@Comment("主键")
@Id
@GeneratedValue(generator = SnowflakeId.GENERATOR)
@GenericGenerator(name = SnowflakeId.GENERATOR, strategy = SnowflakeId.Strategy.LONG)
Long id;
/** /**
* 数据来源于【真源人员定位系统 - 定位标签】 * 数据来源于【真源人员定位系统 - 定位标签】
* 作用: 用于双向联动进行数据同步 * 作用: 用于双向联动进行数据同步
...@@ -134,7 +128,7 @@ public class LocationTag implements Serializable { ...@@ -134,7 +128,7 @@ public class LocationTag implements Serializable {
if (this == o) return true; if (this == o) return true;
if (o == null || Hibernate.getClass(this) != Hibernate.getClass(o)) return false; if (o == null || Hibernate.getClass(this) != Hibernate.getClass(o)) return false;
LocationTag that = (LocationTag) o; LocationTag that = (LocationTag) o;
return id != null && Objects.equals(id, that.id); return getId() != null && Objects.equals(getId(), that.getId());
} }
@Override @Override
......
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.rabbit.config;
import javax.annotation.Resource;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
/**
* rabbitmq配置
*
* @author ifzm
* 2019/8/21 15:44
*/
@Configuration
public class RabbitConfig {
@Resource
RabbitProperties rabbitProperties;
private static final String CONNECTION_FACTORY_NAME = "rabbitConnectionFactory";
@Bean(CONNECTION_FACTORY_NAME)
@Primary
public ConnectionFactory connectionFactory() {
return RabbitConfig.connectionFactory(
rabbitProperties.getHost(),
rabbitProperties.getPort(),
rabbitProperties.getUsername(),
rabbitProperties.getPassword(),
rabbitProperties.getVirtualHost()
);
}
@Bean
@Primary
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(
@Qualifier(CONNECTION_FACTORY_NAME) ConnectionFactory connectionFactory
) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
return factory;
}
@Bean
@Primary
public RabbitTemplate rabbitTemplate(@Qualifier(CONNECTION_FACTORY_NAME) ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
/**
* 创建连接工厂
*
* @param host 主机
* @param port 端口
* @param username 用户名
* @param password 密码
* @param virtualHost 虚拟主机
* @return 连接工厂
*/
public static CachingConnectionFactory connectionFactory(
String host,
int port,
String username,
String password,
String virtualHost
) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
return connectionFactory;
}
}
/* (C) 2022 YiRing, Inc. */ /* (C) 2022 YiRing, Inc. */
package com.yiring.app.config.zy; package com.yiring.app.rabbit.config;
import javax.annotation.Resource; import javax.annotation.Resource;
import lombok.AccessLevel; import lombok.AccessLevel;
......
/* (C) 2022 YiRing, Inc. */ /* (C) 2022 YiRing, Inc. */
package com.yiring.app.config.zy; package com.yiring.app.rabbit.config;
import javax.annotation.Resource; import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
...@@ -43,8 +44,8 @@ public class ZyRabbitConfig { ...@@ -43,8 +44,8 @@ public class ZyRabbitConfig {
public static final String MESSAGE_QUEUES_NAME = "tenant_msg_12A14FDC_sc21080400"; public static final String MESSAGE_QUEUES_NAME = "tenant_msg_12A14FDC_sc21080400";
@Bean(CONNECTION_FACTORY_NAME) @Bean(CONNECTION_FACTORY_NAME)
public CachingConnectionFactory zyConnectionFactory() { public ConnectionFactory zyConnectionFactory() {
return connectionFactory( return RabbitConfig.connectionFactory(
rabbitmq.getHost(), rabbitmq.getHost(),
rabbitmq.getPort(), rabbitmq.getPort(),
rabbitmq.getUsername(), rabbitmq.getUsername(),
...@@ -54,9 +55,9 @@ public class ZyRabbitConfig { ...@@ -54,9 +55,9 @@ public class ZyRabbitConfig {
} }
@Bean(LISTENER_FACTORY_NAME) @Bean(LISTENER_FACTORY_NAME)
public SimpleRabbitListenerContainerFactory secondFactory( public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer, SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier(CONNECTION_FACTORY_NAME) CachingConnectionFactory connectionFactory @Qualifier(CONNECTION_FACTORY_NAME) ConnectionFactory connectionFactory
) { ) {
// 检查队列名称是否与配置文件一致,避免监听错误 // 检查队列名称是否与配置文件一致,避免监听错误
if (!MESSAGE_QUEUES_NAME.equals(rabbitmq.getQueueName())) { if (!MESSAGE_QUEUES_NAME.equals(rabbitmq.getQueueName())) {
...@@ -66,31 +67,9 @@ public class ZyRabbitConfig { ...@@ -66,31 +67,9 @@ public class ZyRabbitConfig {
} }
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// 最小消费者数量
factory.setConcurrentConsumers(1);
// 最大消费者数量
factory.setMaxConcurrentConsumers(1);
// 预读取一条消息
factory.setPrefetchCount(1);
// 手动确认消息模式 // 手动确认消息模式
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
configurer.configure(factory, connectionFactory); configurer.configure(factory, connectionFactory);
return factory; return factory;
} }
public CachingConnectionFactory connectionFactory(
String host,
int port,
String username,
String password,
String virtualHost
) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
return connectionFactory;
}
} }
/* (C) 2022 YiRing, Inc. */ /* (C) 2022 YiRing, Inc. */
package com.yiring.app.rabbitmq; package com.yiring.app.rabbit.receiver;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
......
/* (C) 2022 YiRing, Inc. */ /* (C) 2022 YiRing, Inc. */
package com.yiring.app.rabbitmq.zy; package com.yiring.app.rabbit.receiver;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import com.yiring.app.config.zy.ZyRabbitConfig;
import com.yiring.app.push.domain.PushMessage;
import com.yiring.app.push.service.PushService; import com.yiring.app.push.service.PushService;
import com.yiring.app.rabbit.config.ZyRabbitConfig;
import java.io.IOException; import java.io.IOException;
import javax.annotation.Resource; import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
...@@ -15,6 +14,7 @@ import org.springframework.amqp.rabbit.annotation.RabbitHandler; ...@@ -15,6 +14,7 @@ import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
...@@ -30,7 +30,7 @@ import org.springframework.transaction.annotation.Transactional; ...@@ -30,7 +30,7 @@ import org.springframework.transaction.annotation.Transactional;
@Component @Component
@Configuration @Configuration
@ConditionalOnProperty(prefix = "zy-config.rabbitmq", value = "enabled", havingValue = "true") @ConditionalOnProperty(prefix = "zy-config.rabbitmq", value = "enabled", havingValue = "true")
public class ZyRabbitmqReceiver { public class ZyRabbitReceiver {
// TODO // TODO
// 1. 新增消息订阅定时任务,检查是否正常订阅了真源定位系统的消息 // 1. 新增消息订阅定时任务,检查是否正常订阅了真源定位系统的消息
...@@ -40,6 +40,9 @@ public class ZyRabbitmqReceiver { ...@@ -40,6 +40,9 @@ public class ZyRabbitmqReceiver {
@Resource @Resource
PushService pushService; PushService pushService;
@Resource
SimpMessagingTemplate simpMessagingTemplate;
/** /**
* 订阅真源定位系统 RabbitMQ 推送过来的消息(主动订阅的一些消息类别) * 订阅真源定位系统 RabbitMQ 推送过来的消息(主动订阅的一些消息类别)
* 参见: 定位平台接口规范V3.0.1 - 通用版.pdf #6 * 参见: 定位平台接口规范V3.0.1 - 通用版.pdf #6
...@@ -74,6 +77,8 @@ public class ZyRabbitmqReceiver { ...@@ -74,6 +77,8 @@ public class ZyRabbitmqReceiver {
case "deviceStatus" -> processDeviceStatusMessage(data); case "deviceStatus" -> processDeviceStatusMessage(data);
// 按键报警 // 按键报警
case "keyWarning" -> processKeyWarningMessage(data); case "keyWarning" -> processKeyWarningMessage(data);
// 围栏报警
case "enclosure" -> log.warn("Ignore Message Type [enclosure]: {}", info);
default -> log.warn("Unknown Message Type: {}", info); default -> log.warn("Unknown Message Type: {}", info);
} }
} catch (Exception e) { } catch (Exception e) {
...@@ -95,9 +100,10 @@ public class ZyRabbitmqReceiver { ...@@ -95,9 +100,10 @@ public class ZyRabbitmqReceiver {
// 1. 解析消息内容,进行围栏、出入标识判断等处理,将定位记录录入数据库 // 1. 解析消息内容,进行围栏、出入标识判断等处理,将定位记录录入数据库
// 2. 创建一条需要进行消息推送的记录 // 2. 创建一条需要进行消息推送的记录
// 3. 将记录推送的消息推送模块 // 3. 将记录推送的消息推送模块
// 4. 坚持是否触发围栏告警,记录告警数据,并推送消息
// WebSocket 消息推送 // WebSocket 消息推送
pushService.push(PushMessage.Type.WS, data); simpMessagingTemplate.convertAndSend("/topic/position", data);
} }
/** /**
......
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.rabbitmq;
import org.springframework.context.annotation.Configuration;
/**
* rabbitmq配置
*
* @author ifzm
* 2019/8/21 15:44
*/
@Configuration
public class RabbitConfig {}
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.stomp;
import lombok.AccessLevel;
import lombok.Data;
import lombok.experimental.FieldDefaults;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* RabbitmqProperties
*
* @author ifzm
* @version 0.1
* 2019/9/25 21:31
*/
@Data
@FieldDefaults(level = AccessLevel.PRIVATE)
@Component
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitmqProperties {
int port;
String host;
String username;
String password;
}
...@@ -5,6 +5,7 @@ import com.yiring.app.constant.RedisKey; ...@@ -5,6 +5,7 @@ import com.yiring.app.constant.RedisKey;
import com.yiring.common.core.Redis; import com.yiring.common.core.Redis;
import javax.annotation.Resource; import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration; import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.messaging.simp.config.MessageBrokerRegistry;
...@@ -32,7 +33,7 @@ public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer { ...@@ -32,7 +33,7 @@ public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
Redis redis; Redis redis;
@Resource @Resource
RabbitmqProperties rabbitmqProperties; RabbitProperties rabbitProperties;
@Resource @Resource
ClientInboundChannelInterceptor clientInboundChannelInterceptor; ClientInboundChannelInterceptor clientInboundChannelInterceptor;
...@@ -61,11 +62,11 @@ public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer { ...@@ -61,11 +62,11 @@ public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
registry.setApplicationDestinationPrefixes("/app"); registry.setApplicationDestinationPrefixes("/app");
registry registry
.enableStompBrokerRelay("/topic", "/queue") .enableStompBrokerRelay("/topic", "/queue")
.setRelayHost(rabbitmqProperties.getHost()) .setRelayHost(rabbitProperties.getHost())
.setClientLogin(rabbitmqProperties.getUsername()) .setClientLogin(rabbitProperties.getUsername())
.setClientPasscode(rabbitmqProperties.getPassword()) .setClientPasscode(rabbitProperties.getPassword())
.setSystemLogin(rabbitmqProperties.getUsername()) .setSystemLogin(rabbitProperties.getUsername())
.setSystemPasscode(rabbitmqProperties.getPassword()); .setSystemPasscode(rabbitProperties.getPassword());
log.info("Init RabbitMQ STOMP MessageBroker Success."); log.info("Init RabbitMQ STOMP MessageBroker Success.");
} }
......
/* (C) 2021 YiRing, Inc. */ /* (C) 2021 YiRing, Inc. */
package com.yiring.app.web; package com.yiring.app.web;
import cn.hutool.extra.spring.SpringUtil;
import com.alibaba.fastjson.JSONObject;
import com.yiring.app.constant.Code; import com.yiring.app.constant.Code;
import com.yiring.app.domain.location.LocationTag;
import com.yiring.app.exception.CodeException; import com.yiring.app.exception.CodeException;
import com.yiring.app.push.domain.PushMessage;
import com.yiring.app.push.service.PushService;
import com.yiring.app.util.GeoUtils; import com.yiring.app.util.GeoUtils;
import com.yiring.auth.domain.user.User;
import com.yiring.common.core.Result; import com.yiring.common.core.Result;
import com.yiring.common.domain.BasicEntity;
import com.yiring.common.param.PageParam; import com.yiring.common.param.PageParam;
import com.yiring.common.vo.PageVo; import com.yiring.common.vo.PageVo;
import io.swagger.annotations.Api; import io.swagger.annotations.Api;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import javax.annotation.Resource;
import javax.persistence.EntityManager;
import javax.persistence.criteria.*;
import javax.validation.Valid; import javax.validation.Valid;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.experimental.FieldDefaults;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.locationtech.jts.geom.Point; import org.locationtech.jts.geom.Point;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
...@@ -53,11 +61,55 @@ public class HelloController { ...@@ -53,11 +61,55 @@ public class HelloController {
return Result.ok(vo); return Result.ok(vo);
} }
@Resource
SimpMessagingTemplate simpMessagingTemplate;
@GetMapping("test") @GetMapping("test")
public Result<Point> test() { public Result<Point> test() {
PushService service = SpringUtil.getBean(PushService.class);
service.push(PushMessage.Type.WS, new JSONObject().fluentPut("msg", "hello"));
Point point = GeoUtils.createPoint(112.1, 23.56); Point point = GeoUtils.createPoint(112.1, 23.56);
simpMessagingTemplate.convertAndSend("/topic/position", point);
return Result.ok(point); return Result.ok(point);
} }
@Resource
EntityManager em;
@Data
@AllArgsConstructor
@FieldDefaults(level = AccessLevel.PRIVATE)
public static class UserVo {
Long id;
String avatar;
String code;
}
@GetMapping("test2")
public Result<ArrayList<UserVo>> query() {
CriteriaBuilder cb = em.getCriteriaBuilder();
CriteriaQuery<UserVo> cq = cb.createQuery(UserVo.class);
Root<User> root = cq.from(User.class);
Expression<Long> id = root.get(BasicEntity.Fields.id);
Expression<String> avatar = root.get(User.Fields.avatar);
// 子查询
Subquery<String> query = cq.subquery(String.class);
Root<LocationTag> tagRoot = query.from(LocationTag.class);
query.select(tagRoot.get(LocationTag.Fields.code));
query.where(cb.equal(tagRoot.get(LocationTag.Fields.user), root));
// 构建查询字段
cq.multiselect(id, avatar, query);
// 查询条件
List<Predicate> predicates = new ArrayList<>();
// 可根据入参判断是否需要查询指定字段
predicates.add(cb.isNotNull(root.get(User.Fields.avatar)));
predicates.add(cb.equal(query, "BTT22222222"));
cq.where(predicates.toArray(new Predicate[0]));
List<UserVo> users = em.createQuery(cq).getResultList();
return Result.ok(new ArrayList<>(users));
}
} }
...@@ -26,6 +26,17 @@ spring: ...@@ -26,6 +26,17 @@ spring:
username: admin username: admin
password: 123456 password: 123456
virtual-host: / virtual-host: /
# 开启发送端确认
publisher-confirm-type: correlated
# 开启接收端确认
publisher-returns: true
template:
# 消息抵达队列,异步回调 confirm
mandatory: true
listener:
simple:
# 手动确认消息
acknowledge-mode: manual
# knife4j # knife4j
knife4j: knife4j:
...@@ -56,7 +67,7 @@ zy-config: ...@@ -56,7 +67,7 @@ zy-config:
host: project.yz-online.com host: project.yz-online.com
# RabbitMQ 订阅配置 # RabbitMQ 订阅配置
rabbitmq: rabbitmq:
enabled: false enabled: true
host: ${zy-config.host} host: ${zy-config.host}
port: 672 port: 672
username: admin username: admin
......
...@@ -27,5 +27,4 @@ dependencies { ...@@ -27,5 +27,4 @@ dependencies {
// myexcel // myexcel
implementation "com.github.liaochong:myexcel:${myexcelVersion}" implementation "com.github.liaochong:myexcel:${myexcelVersion}"
} }
...@@ -16,6 +16,7 @@ import javax.persistence.*; ...@@ -16,6 +16,7 @@ import javax.persistence.*;
import lombok.*; import lombok.*;
import lombok.experimental.FieldDefaults; import lombok.experimental.FieldDefaults;
import lombok.experimental.FieldNameConstants; import lombok.experimental.FieldNameConstants;
import lombok.experimental.SuperBuilder;
import org.hibernate.Hibernate; import org.hibernate.Hibernate;
import org.hibernate.annotations.Comment; import org.hibernate.annotations.Comment;
...@@ -29,7 +30,7 @@ import org.hibernate.annotations.Comment; ...@@ -29,7 +30,7 @@ import org.hibernate.annotations.Comment;
@Getter @Getter
@Setter @Setter
@ToString @ToString
@Builder @SuperBuilder(toBuilder = true)
@NoArgsConstructor @NoArgsConstructor
@AllArgsConstructor @AllArgsConstructor
@FieldNameConstants @FieldNameConstants
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论