MongoDB按照天數(shù)或小時(shí)聚合
需求
最近接到需求,需要對(duì)用戶賬戶下的設(shè)備狀態(tài),分別按照天以及小時(shí)進(jìn)行聚合,以此為基礎(chǔ)繪制設(shè)備狀態(tài)趨勢(shì)圖.
實(shí)現(xiàn)思路是啟動(dòng)定時(shí)任務(wù),對(duì)各用戶的設(shè)備狀態(tài)數(shù)據(jù)分別按照小時(shí)以及天進(jìn)行聚合,并存儲(chǔ)進(jìn)數(shù)據(jù)庫(kù)中供用戶后續(xù)查詢.
涉及到的技術(shù)棧分別為:Spring Boot
,MongoDB,Morphia
.
數(shù)據(jù)模型
@Data
@Builder
@Entity(value = "rawDevStatus", noClassnameStored = true)
// 設(shè)備狀態(tài)索引
@Indexes({
// 設(shè)置數(shù)據(jù)超時(shí)時(shí)間(TTL,MongoDB根據(jù)TTL在后臺(tái)進(jìn)行數(shù)據(jù)刪除操作)
@Index(fields = @Field("time"), options = @IndexOptions(expireAfterSeconds = 3600 * 24 * 72)),
@Index(fields = {@Field("userId"), @Field(value = "time", type = IndexType.DESC)})
})
public class RawDevStatus {
@Id
@JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
private ObjectId objectId;
private String userId;
private Instant time;
@Embedded("points")
ListPoint> protocolPoints;
@Data
@AllArgsConstructor
public static class Point {
/**
* 協(xié)議類型
*/
private Protocol protocol;
/**
* 設(shè)備總數(shù)
*/
private Integer total;
/**
* 設(shè)備在線數(shù)目
*/
private Integer onlineNum;
/**
* 處于啟用狀態(tài)設(shè)備數(shù)目
*/
private Integer enableNum;
}
}
上述代碼是設(shè)備狀態(tài)實(shí)體類,其中設(shè)備狀態(tài)數(shù)據(jù)是按照設(shè)備所屬協(xié)議進(jìn)行區(qū)分的.
@Data
@Builder
@Entity(value = "aggregationDevStatus", noClassnameStored = true)
@Indexes({
@Index(fields = @Field("expireAt"), options = @IndexOptions(expireAfterSeconds = 0)),
@Index(fields = {@Field("userId"), @Field(value = "time", type = IndexType.DESC)})
})
public class AggregationDevStatus {
@Id
@JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
private ObjectId objectId;
/**
* 用戶ID
*/
private String userId;
/**
* 設(shè)備總數(shù)
*/
private Double total;
/**
* 設(shè)備在線數(shù)目
*/
private Double onlineNum;
/**
* 處于啟用狀態(tài)設(shè)備數(shù)目
*/
private Double enableNum;
/**
* 聚合類型(按照小時(shí)還是按照天聚合)
*/
@Property("aggDuration")
private AggregationDuration aggregationDuration;
private Instant time;
/**
* 動(dòng)態(tài)設(shè)置文檔過(guò)期時(shí)間
*/
private Instant expireAt;
}
上述代碼是期待的聚合結(jié)果,其中構(gòu)建兩個(gè)索引:(1)超時(shí)索引;(2)復(fù)合索引,程序會(huì)根據(jù)用戶名以及時(shí)間查詢?cè)O(shè)備狀態(tài)聚合結(jié)果.
聚合操作符介紹
聚合操作類似于管道,管道中的每一步操作產(chǎn)生的中間結(jié)果作為下一步的輸入源,最終輸出聚合結(jié)果.
此次聚合主要涉及以下操作:
•$project:指定輸出文檔中的字段.
•$unwind:拆分?jǐn)?shù)據(jù)中的數(shù)組;
•match:選擇要處理的文檔數(shù)據(jù);
•group:根據(jù)key分組聚合結(jié)果.
原始聚合語(yǔ)句
db.getCollection('raw_dev_status').aggregate([
{$match:
{
time:{$gte: ISODate("2019-06-27T00:00:00Z")},
}
},
{$unwind: "$points"},
{$project:
{
userId:1,points:1,
tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z", date: "$time" } }
}
},
{$project:
{
userId:1,points:1,
groupTime: {$dateFromString: { dateString: "$tmp", format: "%Y:%m:%dT%H:%M:%SZ", } }
}
},
{$group:
{
_id:{user_id:'$userId', cal_time:'$groupTime'},
devTotal:{'$avg':'$points.total'},
onlineTotal:{'$avg':'$points.onlineNum'},
enableTotal:{'$avg':'$points.enableNum'}
}
},
])
上述代碼是按小時(shí)聚合數(shù)據(jù),以下來(lái)逐步介紹處理思路:
(1) $match
根據(jù)小時(shí)聚合數(shù)據(jù),因?yàn)橹恍枰@取近24小時(shí)的聚合結(jié)果,所以對(duì)數(shù)據(jù)進(jìn)行初步篩選.
(2) $unwind
raw_dev_status中的設(shè)備狀態(tài)是按照協(xié)議區(qū)分的數(shù)組,因此需要對(duì)其進(jìn)行展開(kāi),以便下一步進(jìn)行篩選;
(3) $project
{$project:
{
userId:1,points:1,
tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z", date: "$time" } }
}
}
選擇需要輸出的數(shù)據(jù),分別為:userId,points
以及tmp.
需要注意,為了按照時(shí)間聚合,對(duì)$time屬性進(jìn)行操作,提取%Y:%m:%dT%H時(shí)信息至$tmp作為下一步的聚合依據(jù).
如果需要按天聚合,則format數(shù)據(jù)可修改為:%Y:%m:%dT00:00:00Z
即可滿足要求.
(4) $project
{$project:
{
userId:1,points:1,
groupTime: {$dateFromString: { dateString: "$tmp", format: "%Y:%m:%dT%H:%M:%SZ", } }
}
}
因?yàn)樯弦徊絧roject操作中,tmp為字符串?dāng)?shù)據(jù),最終的聚合結(jié)果需要時(shí)間戳(主要懶,不想在程序中進(jìn)行轉(zhuǎn)換操作).
因此,此處對(duì)$tmp進(jìn)行操作,轉(zhuǎn)換為時(shí)間類型數(shù)據(jù),即groupTime.
(5) $group
對(duì)聚合結(jié)果進(jìn)行分類操作,并生成最終輸出結(jié)果.
{$group:
{
# 根據(jù)_id進(jìn)行分組操作,依據(jù)是`user_id`以及`$groupTime`
_id:{user_id:'$userId', cal_time:'$groupTime'},
# 求設(shè)備總數(shù)平均值
devTotal:{'$avg':'$points.total'},
# 求設(shè)備在線數(shù)平均值
onlineTotal:{'$avg':'$points.onlineNum'},
# ...
enableTotal:{'$avg':'$points.enableNum'}
}
}
代碼編寫
此處ODM選擇Morphia,亦可以使用MongoTemplate,原理類似.
/**
* 創(chuàng)建聚合條件
*
* @param pastTime 過(guò)去時(shí)間段
* @param dateToString 格式化字符串(%Y:%m:%dT%H:00:00Z或%Y:%m:%dT00:00:00Z)
* @return 聚合條件
*/
private AggregationPipeline createAggregationPipeline(Instant pastTime, String dateToString, String stringToDate) {
QueryRawDevStatus> query = datastore.createQuery(RawDevStatus.class);
return datastore.createAggregation(RawDevStatus.class)
.match(query.field("time").greaterThanOrEq(pastTime))
.unwind("points", new UnwindOptions().preserveNullAndEmptyArrays(false))
.match(query.field("points.protocol").equal("ALL"))
.project(Projection.projection("userId"),
Projection.projection("points"),
Projection.projection("convertTime",
Projection.expression("$dateToString",
new BasicDBObject("format", dateToString)
.append("date", "$time"))
)
)
.project(Projection.projection("userId"),
Projection.projection("points"),
Projection.projection("convertTime",
Projection.expression("$dateFromString",
new BasicDBObject("format", stringToDate)
.append("dateString", "$convertTime"))
)
)
.group(
Group.id(Group.grouping("userId"), Group.grouping("convertTime")),
Group.grouping("total", Group.average("points.total")),
Group.grouping("onlineNum", Group.average("points.onlineNum")),
Group.grouping("enableNum", Group.average("points.enableNum"))
);
}
/**
* 獲取聚合結(jié)果
*
* @param pipeline 聚合條件
* @return 聚合結(jié)果
*/
private ListAggregationMidDevStatus> getAggregationResult(AggregationPipeline pipeline) {
ListAggregationMidDevStatus> statuses = new ArrayList>();
IteratorAggregationMidDevStatus> resultIterator = pipeline.aggregate(
AggregationMidDevStatus.class, AggregationOptions.builder().allowDiskUse(true).build());
while (resultIterator.hasNext()) {
statuses.add(resultIterator.next());
}
return statuses;
}
//......................................................................................
// 獲取聚合結(jié)果(省略若干代碼)
AggregationPipeline pipeline = createAggregationPipeline(pastTime, dateToString, stringToDate);
ListAggregationMidDevStatus> midStatuses = getAggregationResult(pipeline);
if (CollectionUtils.isEmpty(midStatuses)) {
log.warn("Can not get dev status aggregation result.");
return;
}
總結(jié)
以上所述是小編給大家介紹的基于Morphia實(shí)現(xiàn)MongoDB按小時(shí)、按天聚合操作方法,希望對(duì)大家有所幫助,如果大家有任何疑問(wèn)請(qǐng)給我留言,小編會(huì)及時(shí)回復(fù)大家的。在此也非常感謝大家對(duì)腳本之家網(wǎng)站的支持!
如果你覺(jué)得本文對(duì)你有幫助,歡迎轉(zhuǎn)載,煩請(qǐng)注明出處,謝謝!
您可能感興趣的文章:- JAVA mongodb 聚合幾種查詢方式詳解
- Mongodb中MapReduce實(shí)現(xiàn)數(shù)據(jù)聚合方法詳解
- Mongodb聚合函數(shù)count、distinct、group如何實(shí)現(xiàn)數(shù)據(jù)聚合操作
- MongoDB教程之聚合(count、distinct和group)
- MongoDB聚合功能淺析