提交 91d4643d 作者: 涂茂林

合并分支 'merge_dev' 到 'dev_tml'

Merge dev

查看合并请求 chemical-kesai/kshg-api!23
......@@ -4,10 +4,7 @@ package com.yiring.app.domain.broadcast;
import java.io.Serial;
import java.io.Serializable;
import java.time.LocalDateTime;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;
import javax.persistence.Table;
import javax.persistence.*;
import lombok.*;
import lombok.experimental.FieldDefaults;
import lombok.experimental.FieldNameConstants;
......@@ -52,6 +49,7 @@ public class Broadcast implements Serializable {
String broadcastName;
@Comment("坐标点信息")
@Column(columnDefinition = "geometry(Point,4326)")
Point point;
@Comment("播报设备地址")
......
......@@ -59,7 +59,7 @@ public class District implements Serializable {
@Comment("区域信息")
@Type(type = "jts_geometry")
@Column(columnDefinition = "geometry")
@Column(columnDefinition = "geometry(Geometry,4326)")
private Geometry geometry;
@Comment("创建时间")
......
......@@ -3,6 +3,7 @@ package com.yiring.app.domain.district;
import java.io.Serializable;
import java.util.List;
import org.locationtech.jts.geom.Geometry;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.data.jpa.repository.Query;
......@@ -36,4 +37,12 @@ public interface DistrictRepository extends JpaRepository<District, Serializable
*/
@Query("SELECT d FROM District d WHERE d.name like %?1%")
List<District> findLikeName(String name);
/**
* 查询空间信息在区域内的区域信息
* @param geometry 空间信息
* @return 区域信息
*/
@Query(value = "select d.* from bs_district d where st_contains(d.geometry, :geometry)", nativeQuery = true)
List<District> findByGeometryContains(Geometry geometry);
}
......@@ -80,6 +80,7 @@ public class LocationBeacon extends BasicEntity implements Serializable {
Double distance;
@Comment("坐标点信息")
@Column(columnDefinition = "geometry(Point,4326)")
Point point;
@FieldMapping
......
......@@ -51,7 +51,7 @@ public class LocationFence extends BasicEntity implements Serializable {
private String mapName;
@Comment("摄像头")
@ManyToOne
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "video_id")
private Video video;
......@@ -60,7 +60,7 @@ public class LocationFence extends BasicEntity implements Serializable {
@Comment("空间信息")
@Type(type = "jts_geometry")
@Column(columnDefinition = "geometry")
@Column(columnDefinition = "geometry(Geometry,4326)")
private Geometry geometry;
@Comment("滞留时间(秒)")
......@@ -87,6 +87,12 @@ public class LocationFence extends BasicEntity implements Serializable {
@OneToMany(mappedBy = "fence")
@ToString.Exclude
private Set<LocationFenceRule> rules = new HashSet<>(0);
@ToString.Exclude
@Comment("围栏中的标签集合")
@Builder.Default
@ManyToMany(fetch = FetchType.LAZY)
Set<LocationTag> tags = new HashSet<>(0);
/*@SuppressWarnings({ "unused" })
public enum Mode {
NORMAL("常规区域"),
......
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.domain.location;
import com.yiring.app.domain.alarm.AlarmType;
import com.yiring.auth.domain.user.User;
import com.yiring.common.domain.BasicEntity;
import java.io.Serial;
import java.io.Serializable;
import java.time.LocalDateTime;
import javax.persistence.*;
import lombok.*;
import lombok.experimental.FieldDefaults;
import lombok.experimental.FieldNameConstants;
import org.hibernate.annotations.Comment;
import org.locationtech.jts.geom.Point;
/**
* 围栏报警记录
*
* @author Jim
* @version 0.1
* 2022/5/12 21:33
*/
@Getter
@Setter
@ToString
@Builder
@NoArgsConstructor
@AllArgsConstructor
@FieldNameConstants
@FieldDefaults(level = AccessLevel.PRIVATE)
@Entity
@Table(name = "BS_LOCATION_FENCE_ALARM")
@Comment("围栏报警记录")
public class LocationFenceAlarm extends BasicEntity implements Serializable {
@Serial
private static final long serialVersionUID = 2984248537199016912L;
@Comment("围栏")
@ManyToOne(fetch = FetchType.LAZY)
LocationFence fence;
@Comment("触警位置")
@Column(columnDefinition = "geometry(Point,4326)")
Point point;
@Comment("地图编号")
Long areaId;
@Comment("触警人员")
@ManyToOne(fetch = FetchType.LAZY)
User user;
@Comment("触警标签")
@ManyToOne(fetch = FetchType.LAZY)
LocationTag tag;
@Comment("报警开始时间")
LocalDateTime startTime;
@Comment("报警结束时间")
LocalDateTime endTime;
@Comment("报警类型")
@ManyToOne(fetch = FetchType.LAZY)
AlarmType type;
@Comment("状态")
@Enumerated(EnumType.STRING)
Status status;
// 推送记录集合(含接收状态)
// TODO
@SuppressWarnings({ "unused" })
public enum Status {
ING("进行中"),
OVER("结束");
final String text;
Status(String text) {
this.text = text;
}
public String text() {
return this.text;
}
}
}
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.domain.location;
import java.io.Serializable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.stereotype.Repository;
/**
* @author Jim
* @version 0.1
* 2022/5/12 21:34
*/
@Repository
public interface LocationFenceAlarmRepository
extends JpaRepository<LocationFenceAlarm, Serializable>, JpaSpecificationExecutor<LocationFenceAlarm> {}
......@@ -3,6 +3,7 @@ package com.yiring.app.domain.location;
import java.io.Serializable;
import java.util.List;
import org.locationtech.jts.geom.Geometry;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.data.jpa.repository.Query;
......@@ -37,4 +38,12 @@ public interface LocationFenceRepository
*/
@Query("SELECT f FROM LocationFence f WHERE name like %?1% AND deleted = false")
List<LocationFence> findLikeName(String name);
/**
* 查询空间信息在围栏内的围栏信息
* @param geometry 空间信息
* @return 围栏信息
*/
@Query(value = "select f.* from bs_location_fence f where st_contains(f.geometry, :geometry)", nativeQuery = true)
List<LocationFence> findByGeometryContains(Geometry geometry);
}
......@@ -9,7 +9,7 @@ import org.springframework.stereotype.Repository;
/**
* @author tml
* @version 1.0
* @date 2022/4/29 11:40
* 2022/4/29 11:40
*/
@Repository
public interface LocationFenceRuleRepository
......
......@@ -71,6 +71,7 @@ public class LocationLog implements Serializable {
User.Status status;
@Comment("坐标点信息")
@Column(columnDefinition = "geometry(Point,4326)")
Point point;
@Comment("信标集合")
......@@ -83,6 +84,11 @@ public class LocationLog implements Serializable {
@Column(columnDefinition = "jsonb")
JSONArray fences;
@Comment("区域集合")
@Type(type = "jsonb")
@Column(columnDefinition = "jsonb")
JSONArray districts;
@Comment("静止/运动")
Boolean silent;
......
......@@ -103,6 +103,7 @@ public class LocationTag extends BasicEntity implements Serializable {
Integer category;
@Comment("最后定位坐标")
@Column(columnDefinition = "geometry(Point,4326)")
Point point;
@SuppressWarnings({ "unused" })
......
......@@ -12,7 +12,7 @@ import org.springframework.stereotype.Repository;
/**
* 定位标签JPA
* @author LJ-2204
* @date 2022/4/14
* 2022/4/14
*/
@Repository
......
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.domain.location;
import com.vladmihalcea.hibernate.type.json.JsonBinaryType;
import com.yiring.common.domain.BasicEntity;
import java.io.Serial;
import java.io.Serializable;
import java.time.LocalDateTime;
import javax.persistence.*;
import lombok.*;
import lombok.experimental.FieldDefaults;
import lombok.experimental.FieldNameConstants;
import org.hibernate.annotations.Comment;
import org.hibernate.annotations.TypeDef;
/**
* 定位进出记录
*
* @author Jim
* @version 0.1
* 2022/5/12 17:54
*/
@Getter
@Setter
@ToString
@Builder
@NoArgsConstructor
@AllArgsConstructor
@FieldNameConstants
@FieldDefaults(level = AccessLevel.PRIVATE)
@Entity
@TypeDef(name = "jsonb", typeClass = JsonBinaryType.class)
@Table(name = "BS_LOCATION_TURNOVER", indexes = { @Index(columnList = "type"), @Index(columnList = "sourceId") })
@Comment("定位进出记录")
public class LocationTurnover extends BasicEntity implements Serializable {
@Serial
private static final long serialVersionUID = 887764448464587364L;
@Comment("进出区域/围栏表主键")
Long sourceId;
@Comment("类型")
@Enumerated(EnumType.STRING)
Type type;
@Comment("定位标签")
@ManyToOne(fetch = FetchType.LAZY)
LocationTag tag;
@Comment("定位时间")
LocalDateTime time;
@Comment("进入")
Boolean enter;
@Comment("是否为最新状态")
Boolean isLatest;
@SuppressWarnings({ "unused" })
public enum Type {
FENCE("围栏"),
DISTRICT("区域");
final String text;
Type(String text) {
this.text = text;
}
public String text() {
return this.text;
}
}
}
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.domain.location;
import java.io.Serializable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.stereotype.Repository;
/**
* @author Jim
* @version 0.1
* 2022/5/12 18:13
*/
@Repository
public interface LocationTurnoverRepository
extends JpaRepository<LocationTurnover, Serializable>, JpaSpecificationExecutor<LocationTurnover> {}
......@@ -48,7 +48,7 @@ public class AccidentSpot extends BasicEntity implements Serializable {
@Comment("空间信息")
@Type(type = "jts_geometry")
@Column(columnDefinition = "geometry")
@Column(columnDefinition = "geometry(Geometry,4326)")
private Geometry geometry;
@Comment("摄像头")
......
......@@ -48,7 +48,7 @@ public class EvacuationZone extends BasicEntity implements Serializable {
@Comment("空间信息")
@Type(type = "jts_geometry")
@Column(columnDefinition = "geometry")
@Column(columnDefinition = "geometry(Geometry,4326)")
private Geometry geometry;
@Comment("摄像头")
......
......@@ -44,6 +44,7 @@ public class Video implements Serializable {
Long id;
@Comment("坐标点信息")
@Column(columnDefinition = "geometry(Point,4326)")
Point point;
@Comment("标识")
......
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.job;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.yiring.app.domain.location.LocationLog;
import com.yiring.app.domain.location.LocationLogRepository;
import com.yiring.app.rabbit.config.ZyRabbitConfig;
import com.yiring.app.rabbit.config.ZyConfigProperties;
import com.yiring.app.util.GeoUtils;
import com.yiring.common.constant.DateFormatter;
import java.time.LocalDateTime;
......@@ -29,7 +31,7 @@ import org.springframework.stereotype.Component;
@SuppressWarnings("unused")
@Slf4j
@Component
public class MockZyMessageJob {
public class MockPositionMessageJob {
@Resource
RabbitTemplate rabbitTemplate;
......@@ -37,19 +39,36 @@ public class MockZyMessageJob {
@Resource
LocationLogRepository locationLogRepository;
@XxlJob("MockZyMessageHandler")
public void mockMessageHandler() {
log.info("MockZyMessageHandler: {}", LocalDateTime.now().format(DateFormatter.DATE_TIME));
@Resource
ZyConfigProperties.ZyConfigRabbitmq rabbitmq;
@XxlJob("MockPositionHandler")
public void MockPositionHandler() {
JSONObject extra = toJSON(XxlJobHelper.getJobParam());
log.info("[Mock] Position: {}, {}", mockPositionMessage(extra), extra);
}
@XxlJob("MockLowPowerHandler")
public void MockLowPowerHandler() {
JSONObject extra = toJSON(XxlJobHelper.getJobParam());
log.info("[Mock] LowPower: {}, {}", mockLowPowerMessage(extra), extra);
}
@XxlJob("MockDeviceStatusHandler")
public void MockDeviceStatusHandler() {
JSONObject extra = toJSON(XxlJobHelper.getJobParam());
log.info("[Mock] DeviceStatus: {}, {}", mockDeviceStatusMessage(extra), extra);
}
log.info("[Mock] Position: {}", mockPositionMessage());
log.info("[Mock] LowPower: {}", mockLowPowerMessage());
log.info("[Mock] DeviceStatus: {}", mockDeviceStatusMessage());
log.info("[Mock] KeyWarning: {}", mockKeyWarningMessage());
@XxlJob("MockKeyWarningHandler")
public void MockKeyWarningHandler() {
JSONObject extra = toJSON(XxlJobHelper.getJobParam());
log.info("[Mock] KeyWarning: {}, {}", mockKeyWarningMessage(extra), extra);
}
@XxlJob("QueryMessageHandler")
public void queryMessageHandler() {
log.info("QueryZyMessageHandler: {}", LocalDateTime.now().format(DateFormatter.DATE_TIME));
log.info("QueryMessageHandler: {}", LocalDateTime.now().format(DateFormatter.DATE_TIME));
try {
Specification<LocationLog> spec = (root, query, cb) -> {
......@@ -59,17 +78,31 @@ public class MockZyMessageJob {
};
List<LocationLog> logs = locationLogRepository.findAll(spec);
log.info("QueryZyMessageHandler: {}", logs.size());
log.info("log size: {}", logs.size());
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
public JSONObject send(JSONObject body) {
rabbitTemplate.convertAndSend(ZyRabbitConfig.MESSAGE_QUEUES_NAME, body.toJSONString());
if (rabbitmq.isMock()) {
rabbitTemplate.convertAndSend(rabbitmq.getQueueName(), body.toJSONString());
}
return body;
}
public JSONObject toJSON(String params) {
JSONObject extra = new JSONObject();
try {
extra = JSON.parseObject(params);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
return extra == null ? new JSONObject() : extra;
}
private String mockTag() {
return "BTT33333331";
}
......@@ -78,7 +111,7 @@ public class MockZyMessageJob {
return 10019L;
}
private JSONObject mockPositionMessage() {
private JSONObject mockPositionMessage(JSONObject extra) {
// 随机生成一个坐标点
Point point = GeoUtils.randomPoint(GeoUtils.defaultBounds(), 0);
......@@ -95,6 +128,7 @@ public class MockZyMessageJob {
params.put("volt", 3650);
params.put("voltUnit", "mV");
params.put("floor", 1);
params.putAll(extra);
JSONObject body = new JSONObject();
body.put("method", "position");
......@@ -102,11 +136,12 @@ public class MockZyMessageJob {
return send(body);
}
private JSONObject mockLowPowerMessage() {
private JSONObject mockLowPowerMessage(JSONObject extra) {
JSONObject params = new JSONObject();
params.put("tagId", mockTag());
params.put("volt", 3650);
params.put("voltUnit", "mV");
params.putAll(extra);
JSONObject body = new JSONObject();
body.put("method", "lowPower");
......@@ -114,7 +149,7 @@ public class MockZyMessageJob {
return send(body);
}
private JSONObject mockDeviceStatusMessage() {
private JSONObject mockDeviceStatusMessage(JSONObject extra) {
JSONObject params = new JSONObject();
params.put("deviceId", mockTag());
params.put("areaId", mockAreaId());
......@@ -122,6 +157,7 @@ public class MockZyMessageJob {
params.put("volt", 3650);
params.put("field_21", "mV");
params.put("updateTime", System.currentTimeMillis());
params.putAll(extra);
JSONObject body = new JSONObject();
body.put("method", "deviceStatus");
......@@ -129,7 +165,7 @@ public class MockZyMessageJob {
return send(body);
}
private JSONObject mockKeyWarningMessage() {
private JSONObject mockKeyWarningMessage(JSONObject extra) {
JSONObject params = new JSONObject();
params.put("tagId", mockTag());
params.put("entityId", "1522770547178475520");
......@@ -139,6 +175,7 @@ public class MockZyMessageJob {
params.put("y", 100);
params.put("z", 0);
params.put("floor", 1);
params.putAll(extra);
JSONObject body = new JSONObject();
body.put("method", "keyWarning");
......
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.rabbit.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import com.yiring.app.rabbit.receiver.PositionMessageHandler;
import javax.annotation.Resource;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
......@@ -16,8 +18,12 @@ import org.springframework.context.annotation.Configuration;
* 2019/8/21 15:44
*/
@Configuration
@ConditionalOnProperty(prefix = "zy-config.rabbitmq", value = "mock", havingValue = "true")
public class MockZyRabbitConfig {
@Resource
ZyConfigProperties.ZyConfigRabbitmq rabbitmq;
/**
* 消息交换机
*/
......@@ -33,16 +39,20 @@ public class MockZyRabbitConfig {
return new TopicExchange(ZY_TOPIC_EXCHANGE, true, false);
}
@Bean(ZyRabbitConfig.MESSAGE_QUEUES_NAME)
public Queue mockMessageQueue() {
return new Queue(ZyRabbitConfig.MESSAGE_QUEUES_NAME, true, false, false);
}
@Bean
Binding bindingExchangeMock(
@Qualifier(ZyRabbitConfig.MESSAGE_QUEUES_NAME) Queue queue,
@Qualifier(ZyRabbitConfig.QUEUE_BEAN_NAME) Queue queue,
@Qualifier(ZY_TOPIC_EXCHANGE) TopicExchange exchange
) {
return BindingBuilder.bind(queue).to(exchange).with(ZyRabbitConfig.MESSAGE_QUEUES_NAME);
return BindingBuilder.bind(queue).to(exchange).with(rabbitmq.getQueueName());
}
@Bean(ZyRabbitConfig.LISTENER_CONTAINER_NAME)
public SimpleMessageListenerContainer simpleMessageListenerContainer(
PositionMessageHandler handler,
@Qualifier(ZyRabbitConfig.QUEUE_BEAN_NAME) Queue queue,
@Qualifier(RabbitConfig.CONNECTION_FACTORY_NAME) ConnectionFactory connectionFactory
) {
return ZyRabbitConfig.buildPositionMessageListenerContainer(handler, queue, connectionFactory);
}
}
......@@ -2,15 +2,11 @@
package com.yiring.app.rabbit.config;
import javax.annotation.Resource;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
......@@ -43,19 +39,6 @@ public class RabbitConfig {
@Bean
@Primary
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier(CONNECTION_FACTORY_NAME) ConnectionFactory connectionFactory
) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// 手动确认消息模式
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean
@Primary
public RabbitTemplate rabbitTemplate(@Qualifier(CONNECTION_FACTORY_NAME) ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
......
......@@ -36,6 +36,7 @@ public class ZyConfigProperties {
@ConfigurationProperties(prefix = "zy-config.rabbitmq")
public static class ZyConfigRabbitmq {
boolean mock;
boolean enabled;
String host;
int port;
......
/* (C) 2022 YiRing, Inc. */
package com.yiring.app.rabbit.config;
import com.yiring.app.rabbit.receiver.PositionMessageHandler;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
......@@ -31,19 +29,9 @@ public class ZyRabbitConfig {
@Resource
ZyConfigProperties.ZyConfigRabbitmq rabbitmq;
@Resource
ConfigurableApplicationContext context;
public static final String CONNECTION_FACTORY_NAME = "zyRabbitConnectionFactory";
public static final String LISTENER_FACTORY_NAME = "zyRabbitListenerFactory";
public static final String TEMPLATE_NAME = "zyRabbitTemplate";
/**
* 消息队列名称(必须要与配置文件中的 queue-name 完全一致)
* 规则: tenant_msg_${open.client-secret}_${open.client-id}
* 参见文档: 定位平台接口规范V3.0.1 - 通用版.pdf #6
*/
public static final String MESSAGE_QUEUES_NAME = "tenant_msg_12A14FDC_sc21080400";
public static final String LISTENER_CONTAINER_NAME = "zyPositionMessageListenerContainer";
public static final String QUEUE_BEAN_NAME = "messageHandlerQueue";
@Bean(CONNECTION_FACTORY_NAME)
public ConnectionFactory zyConnectionFactory() {
......@@ -56,27 +44,35 @@ public class ZyRabbitConfig {
);
}
@Bean(LISTENER_FACTORY_NAME)
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Bean(LISTENER_CONTAINER_NAME)
@ConditionalOnProperty(prefix = "zy-config.rabbitmq", value = "mock", havingValue = "false")
public SimpleMessageListenerContainer simpleMessageListenerContainer(
PositionMessageHandler handler,
@Qualifier(QUEUE_BEAN_NAME) Queue queue,
@Qualifier(CONNECTION_FACTORY_NAME) ConnectionFactory connectionFactory
) {
// 检查队列名称是否与配置文件一致,避免监听错误
if (!MESSAGE_QUEUES_NAME.equals(rabbitmq.getQueueName())) {
log.error("队列名称不一致,请检查配置文件");
context.close();
return null;
return buildPositionMessageListenerContainer(handler, queue, connectionFactory);
}
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// 手动确认消息模式
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
configurer.configure(factory, connectionFactory);
return factory;
@Bean(QUEUE_BEAN_NAME)
public Queue messageHandlerQueue() {
return new Queue(rabbitmq.getQueueName(), true, false, false);
}
@Bean(TEMPLATE_NAME)
public RabbitTemplate rabbitTemplate(@Qualifier(CONNECTION_FACTORY_NAME) ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
public static SimpleMessageListenerContainer buildPositionMessageListenerContainer(
PositionMessageHandler handler,
Queue queue,
ConnectionFactory connectionFactory
) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setBatchSize(10);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
container.setExposeListenerChannel(true);
container.setMessageListener(handler);
container.addQueues(queue);
return container;
}
}
......@@ -2,31 +2,29 @@
package com.yiring.app.rabbit.receiver;
import com.rabbitmq.client.Channel;
import com.yiring.app.rabbit.config.ZyRabbitConfig;
import com.yiring.app.service.message.ZyMessageService;
import com.yiring.app.service.message.PositionMessageService;
import com.yiring.common.annotation.Times;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
/**
* 真源 RabbitMQ 消息监听(消息消费者)
* 真源消息监听处理器
*
* @author Jim
* @version 0.1
* 2022/4/13 17:13
* 2022/5/11 16:35
*/
@Slf4j
@Component
@Configuration
@ConditionalOnProperty(prefix = "zy-config.rabbitmq", value = "enabled", havingValue = "true")
public class ZyRabbitReceiver {
@Transactional(rollbackFor = Exception.class)
public class PositionMessageHandler implements ChannelAwareMessageListener {
// TODO
// 1. 新增消息订阅定时任务,检查是否正常订阅了真源定位系统的消息
......@@ -34,26 +32,15 @@ public class ZyRabbitReceiver {
// 3. 订阅 position(定位数据)、lowPower(低电量报警)、deviceStatus(设备状态)、keyWarning(按键报警)
@Resource
ZyMessageService zyMessageService;
PositionMessageService positionMessageService;
/**
* 订阅真源定位系统 RabbitMQ 推送过来的消息(主动订阅的一些消息类别)
* 参见: 定位平台接口规范V3.0.1 - 通用版.pdf #6
* @param msg 消息内容
* @param channel 消息通道
* @param message 消息主体
* @throws IOException 消息确认异常
*/
@RabbitHandler
// @RabbitListener(
// queues = ZyRabbitConfig.MESSAGE_QUEUES_NAME,
// containerFactory = ZyRabbitConfig.LISTENER_FACTORY_NAME
// )
@RabbitListener(queues = ZyRabbitConfig.MESSAGE_QUEUES_NAME, containerFactory = "rabbitListenerContainerFactory")
public void listen(String msg, Channel channel, Message message) throws IOException {
@Times("Position System Message Handler")
@Override
public void onMessage(Message message, Channel channel) throws IOException {
// 消费消息
zyMessageService.consume(msg);
positionMessageService.consume(new String(message.getBody(), StandardCharsets.UTF_8));
// 手动确认消息已收到
assert channel != null;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
......@@ -8,7 +8,7 @@ package com.yiring.app.service.message;
* @version 0.1
* 2022/5/9 10:16
*/
public interface ZyMessageService {
public interface PositionMessageService {
/**
* 消费消息
* @param message 消息内容
......
......@@ -4,24 +4,27 @@ package com.yiring.app.service.message.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.yiring.app.domain.district.District;
import com.yiring.app.domain.district.DistrictRepository;
import com.yiring.app.domain.location.*;
import com.yiring.app.domain.log.ZyRealtimeLog;
import com.yiring.app.domain.log.ZyRealtimeLogRepository;
import com.yiring.app.service.message.ZyMessageService;
import com.yiring.app.service.message.PositionMessageService;
import com.yiring.app.util.GeoUtils;
import com.yiring.auth.domain.user.User;
import com.yiring.common.annotation.Times;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Resource;
import javax.persistence.criteria.Expression;
import javax.persistence.criteria.Predicate;
import lombok.extern.slf4j.Slf4j;
import org.locationtech.jts.geom.Point;
import org.springframework.data.domain.Example;
import org.springframework.data.domain.*;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
......@@ -37,7 +40,7 @@ import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Service
@Transactional(rollbackFor = Exception.class)
public class ZyMessageServiceImpl implements ZyMessageService {
public class PositionMessageServiceImpl implements PositionMessageService {
@Resource
LocationTagRepository locationTagRepository;
......@@ -46,12 +49,21 @@ public class ZyMessageServiceImpl implements ZyMessageService {
LocationLogRepository locationLogRepository;
@Resource
LocationFenceRepository locationFenceRepository;
@Resource
DistrictRepository districtRepository;
@Resource
LocationTurnoverRepository locationTurnoverRepository;
@Resource
SimpMessagingTemplate simpMessagingTemplate;
@Resource
ZyRealtimeLogRepository zyRealtimeLogRepository;
@Times
@Times("Message Consume")
@Override
public void consume(String message) {
// 将消息转换成 JSON 格式
......@@ -118,13 +130,13 @@ public class ZyMessageServiceImpl implements ZyMessageService {
LocationLog locationLog = LocationLog
.builder()
.id(id)
.raw(data)
.locationTime(locationTime)
.areaId(data.getLong("areaId"))
.floor(data.getString("floor"))
.silent(data.getBoolean("silent"))
.volt(data.getInteger("volt"))
.voltUnit(data.getString("voltUnit"))
.raw(data)
.build();
// 获取定位卡当前绑定的用户
......@@ -145,16 +157,113 @@ public class ZyMessageServiceImpl implements ZyMessageService {
locationLog.setPoint(point);
// 定位信标
Set<String> codes = Arrays
Set<String> beaconCodes = Arrays
.stream(data.getString("beacons").split(","))
.map(beacon -> beacon.replaceAll("\\(.*\\)", ""))
.collect(Collectors.toSet());
locationLog.setBeacons(new JSONArray().fluentAddAll(codes));
locationLog.setBeacons(new JSONArray().fluentAddAll(beaconCodes));
// TODO
// 并计算出入标记(围栏、区域)
// 计算出入标记(围栏、区域)
List<LocationTurnover> turnovers = new ArrayList<>();
// 查询定位在围栏内的围栏信息
List<LocationFence> fences = locationFenceRepository.findByGeometryContains(point);
Set<Long> fenceIds = fences.stream().map(LocationFence::getId).collect(Collectors.toSet());
locationLog.setFences(new JSONArray().fluentAddAll(fenceIds));
// 计算围栏进出
for (LocationFence fence : fences) {
// 查询当前围栏的防抖时间内的所有定位记录
List<LocationLog> logs = findByTagTimeIdAndThreshold(id, fence.getThreshold());
List<JSONArray> list = logs.stream().map(LocationLog::getFences).toList();
if (list.isEmpty()) {
continue;
}
// 写入数据
// 检查是否进入区域
Boolean isEnter = checkEnter(list, fence.getId());
if (isEnter != null) {
// 检查是否为重复进入/退出
Optional<LocationTurnover> enter = findRepeatTurnoverRecord(
fence.getId(),
LocationTurnover.Type.FENCE,
id.getTag(),
isEnter
);
if (enter.isEmpty()) {
// 尝试将上一次记录标记为非最新记录
trySetPrevTurnoverExpired(fence.getId(), LocationTurnover.Type.FENCE, id.getTag());
// 当前标签进入/离开围栏的动作标记为最新记录
LocationTurnover turnover = LocationTurnover
.builder()
.enter(isEnter)
.type(LocationTurnover.Type.FENCE)
.sourceId(fence.getId())
.tag(id.getTag())
.time(id.getTime())
.isLatest(true)
.build();
turnovers.add(turnover);
// 更新围栏内的标签
Set<LocationTag> tags = fence.getTags();
if (Boolean.TRUE.equals(isEnter)) {
tags.add(id.getTag());
} else {
tags.remove(id.getTag());
}
fence.setTags(tags);
// 有人员进出围栏,需要检查是否触发围栏报警规则
// TODO: 通过定时任务调度异步实现,提高定位消息消费能力
// 1. 判断是否触发围栏报警规则,触发则记录报警记录,同时记录报警记录触发所需推送的消息
// 2. 同时进行 WebSocket 消息推送
}
}
}
// 查询定位在区域内的区域信息
List<District> districts = districtRepository.findByGeometryContains(point);
Set<Long> districtIds = districts.stream().map(District::getId).collect(Collectors.toSet());
locationLog.setDistricts(new JSONArray().fluentAddAll(districtIds));
// 计算区域进出
for (District district : districts) {
// 查询当前区域的防抖时间内的所有定位记录
List<LocationLog> logs = findByTagTimeIdAndThreshold(id, district.getDebouncingDuration());
List<JSONArray> list = logs.stream().map(LocationLog::getDistricts).toList();
if (list.isEmpty()) {
continue;
}
// 检查是否进入区域
Boolean isEnter = checkEnter(list, district.getId());
if (isEnter != null) {
// 检查是否为重复进入/退出
Optional<LocationTurnover> enter = findRepeatTurnoverRecord(
district.getId(),
LocationTurnover.Type.DISTRICT,
id.getTag(),
isEnter
);
if (enter.isEmpty()) {
// 尝试将上一次记录标记为非最新记录
trySetPrevTurnoverExpired(district.getId(), LocationTurnover.Type.DISTRICT, id.getTag());
// 当前标签进入/离开围栏的动作标记为最新记录
LocationTurnover turnover = LocationTurnover
.builder()
.enter(isEnter)
.type(LocationTurnover.Type.DISTRICT)
.sourceId(district.getId())
.tag(id.getTag())
.time(id.getTime())
.isLatest(true)
.build();
turnovers.add(turnover);
}
}
}
// 写入定位数据
locationLogRepository.saveAndFlush(locationLog);
// 更新定位标签卡状态信息
......@@ -164,11 +273,109 @@ public class ZyMessageServiceImpl implements ZyMessageService {
tag.setSilent(locationLog.getSilent());
locationTagRepository.save(tag);
// 更新围栏记录的标签数据
locationFenceRepository.saveAll(fences);
// 写入围栏/区域进出记录
locationTurnoverRepository.saveAll(turnovers);
// WebSocket 推送定位消息
// 消息内容需要确定 TODO
simpMessagingTemplate.convertAndSend("/topic/position", "{}");
// TODO
// 判断围栏告警是否触发,触发写入告警记录,并推送消息
JSONObject message = new JSONObject();
message.put("type", "location");
message.put("time", id.getTime());
message.put("tagId", id.getTag().getId());
message.put("tagCode", id.getTag().getCode());
message.put("entityId", locationLog.getPoint());
message.put("point", locationLog.getPoint());
simpMessagingTemplate.convertAndSend("/topic/position", message.toJSONString());
}
/**
* 根据定位日志 ID 和防抖时间查询定位日志
* @param id 定位日志 ID
* @param threshold 防抖时间
* @return 定位日志集合
*/
public List<LocationLog> findByTagTimeIdAndThreshold(TagTimeId id, int threshold) {
return locationLogRepository.findAll((root, query, cb) -> {
Predicate predicate = cb.conjunction();
List<Expression<Boolean>> expressions = predicate.getExpressions();
expressions.add(cb.equal(root.get("id").get("tag"), id.getTag()));
expressions.add(cb.greaterThanOrEqualTo(root.get("id").get("time"), id.getTime().minusSeconds(threshold)));
expressions.add(cb.lessThanOrEqualTo(root.get("id").get("time"), id.getTime()));
return predicate;
});
}
/**
* 根据防抖期间进入的区域/围栏以及新产生的围栏/区域记录,判断是否进入
* @param array 区域/围栏集合
* @param id 区域/围栏 ID
* @return true 进入,false 退出,null 未发生变化
*/
public Boolean checkEnter(List<JSONArray> array, Long id) {
long count = array.stream().filter(ids -> ids != null && ids.contains(id)).count();
Boolean isEnter = null;
if (count == array.size()) {
isEnter = true;
} else if (count == 0) {
isEnter = false;
}
return isEnter;
}
/**
* 查找
* @param sourceId 区域/围栏 ID
* @param type 类型(区域/围栏)
* @param tag 定位标签
* @param enter true 进入,false 退出
* @return true 重复进入,false 非重复进入
*/
public Optional<LocationTurnover> findRepeatTurnoverRecord(
Long sourceId,
LocationTurnover.Type type,
LocationTag tag,
Boolean enter
) {
Pageable pageable = PageRequest.of(0, 1, Sort.by(Sort.Order.desc(LocationTurnover.Fields.time)));
Page<LocationTurnover> page = locationTurnoverRepository.findAll(
(root, query, cb) -> {
Predicate predicate = cb.conjunction();
List<Expression<Boolean>> expressions = predicate.getExpressions();
expressions.add(cb.equal(root.get(LocationTurnover.Fields.sourceId), sourceId));
expressions.add(cb.equal(root.get(LocationTurnover.Fields.type), type));
expressions.add(cb.equal(root.get(LocationTurnover.Fields.tag), tag));
return predicate;
},
pageable
);
if (page.getTotalElements() > 0) {
// 检查是否重复进入
Stream<LocationTurnover> stream = page.get();
if (enter == null || stream.anyMatch(turnover -> turnover.getEnter() == enter)) {
return stream.findFirst();
}
}
return Optional.empty();
}
/**
* 尝试设置上一个进出记录为过期标识
* @param sourceId 区域/围栏 ID
* @param type 类型(区域/围栏)
* @param tag 定位标签
*/
public void trySetPrevTurnoverExpired(Long sourceId, LocationTurnover.Type type, LocationTag tag) {
Optional<LocationTurnover> record = findRepeatTurnoverRecord(sourceId, type, tag, null);
if (record.isPresent()) {
LocationTurnover turnoverRecord = record.get();
turnoverRecord.setIsLatest(false);
locationTurnoverRepository.save(turnoverRecord);
}
}
/**
......
......@@ -23,6 +23,8 @@ public class GeoUtils {
public final GeometryFactory factory = new GeometryFactory();
public final int DEFAULT_SRID = 4326;
/**
* 创建点
*
......@@ -31,7 +33,7 @@ public class GeoUtils {
* @return 点
*/
public Point createPoint(double lon, double lat) {
return factory.createPoint(new Coordinate(lon, lat));
return createPoint(lon, lat, 0);
}
/**
......@@ -42,7 +44,9 @@ public class GeoUtils {
* @return 点
*/
public Point createPoint(double lon, double lat, double alt) {
return factory.createPoint(new Coordinate(lon, lat, alt));
Point point = factory.createPoint(new Coordinate(lon, lat, alt));
point.setSRID(DEFAULT_SRID);
return point;
}
/**
......@@ -71,13 +75,12 @@ public class GeoUtils {
y
);
// 构建经纬度坐标信息
Coordinate coordinate = new Coordinate(
// 构建一个坐标点
return createPoint(
result.getDoubleValue("lon"),
result.getDoubleValue("lat"),
root.getDoubleValue("altitude") + z
result.getDoubleValue("altitude") + z
);
return factory.createPoint(coordinate);
}
/**
......@@ -111,7 +114,7 @@ public class GeoUtils {
public Point randomPoint(double minX, double minY, double maxX, double maxY, double z) {
double x = minX + (maxX - minX) * Math.random();
double y = minY + (maxY - minY) * Math.random();
return factory.createPoint(new Coordinate(x, y, z));
return createPoint(x, y, z);
}
/**
......
......@@ -4,6 +4,7 @@ package com.yiring.app.util.zy;
import com.alibaba.fastjson.JSONObject;
/**
* 从真源拿到的引擎计算经纬度的工具类
* @author Jim
* @version 0.1
* 2022/5/9 16:41
......@@ -16,8 +17,8 @@ public class LonLatUtil {
public static JSONObject getRoot() {
JSONObject root = new JSONObject();
root.put("lon", 0D);
root.put("lat", 0D);
root.put("lon", 112.85893346021206);
root.put("lat", 30.473384230484854);
root.put("x", 0D);
root.put("y", 0D);
root.put("altitude", 0D);
......
......@@ -83,8 +83,8 @@ xxl:
logging:
level:
# sql bind parameter
# org.hibernate.type.descriptor.sql.BasicBinder: trace
org.hibernate.type.descriptor.sql.BasicBinder: error
org.hibernate.type.descriptor.sql.BasicBinder: trace
# org.hibernate.type.descriptor.sql.BasicBinder: error
# 真源定位系统相关配置
zy-config:
......@@ -97,7 +97,7 @@ zy-config:
username: admin
password: admin
virtual-host: /
queue-name: tenant_msg_${zy-config.open.client-secret}_${zy-config.open.client-id}
queue-name: tenant_msg_${zy-config.open.client-secret}_${zy-config.open.client-id}_mock
# 开放接口信息配置
open:
api: http://${zy-config.host}:789/positionApi
......
......@@ -25,9 +25,9 @@ import org.springframework.stereotype.Component;
public class TimesAspect {
@Pointcut("@annotation(com.yiring.common.annotation.Times)")
public void pointCut() {}
public void times() {}
@Around("pointCut()")
@Around("times()")
public Object around(ProceedingJoinPoint point) throws Throwable {
long start = System.currentTimeMillis();
Object result = point.proceed();
......
/* (C) 2021 YiRing, Inc. */
package com.yiring.common.aspect;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.stereotype.Component;
/**
* XxlJob 注解切面
*
* @author ifzm
* @version 0.1
*/
@Slf4j
@Aspect
@Component
public class XxlJobAspect {
@Pointcut("@annotation(com.xxl.job.core.handler.annotation.XxlJob)")
public void log() {}
@Around("log()")
public Object around(ProceedingJoinPoint point) throws Throwable {
try {
return point.proceed();
} catch (Exception e) {
log.error("XxlJob Execute Error: " + e.getMessage(), e);
throw e;
}
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论