建议大家看官网,我也是根据官网来写的
Java High Level REST Client
为什么要使用es:
主要有两点,一个是速度快,另外一个就是查询实时数据,不用每天定时去统计。
查询时间段 | sql | es |
---|---|---|
2019-01-01~2019-03-01 | 9336ms | 1098ms |
2019-01-01~2020-03-01 | 55318ms | 3031ms |
2019-11-01~2019-11-30 | 14602ms | 1323ms |
2019-11-01~2019-11-01 | 7513ms | 902ms |
这边分析一个聚合查询的写法
先来看下要统计的sql
SQL 写法
SELECT a.area_code,ifnull( o.count_order, 0 ) count_order,ifnull( o.sum_price, 0 ) sum_order_price,
ifnull( o.count_app_order, 0 ) count_app_order,ifnull( o.sum_app_price, 0 ) sum_app_order_price,
ifnull( o.count_wechat_order, 0 ) count_wechat_order,ifnull( o.sum_wechat_price, 0 ) sum_wechat_order_price,
ifnull( o.sum_refund_price, 0 ) sum_refund_price,ifnull( o.count_stand_wash_order, 0 ) count_stand_wash_order,
ifnull( o.sum_stand_wash_order_price, 0 ) sum_stand_wash_order_price,ifnull( o.count_quick_wash_order, 0 ) count_quick_wash_order,
ifnull( o.sum_quick_wash_order_price, 0 ) sum_quick_wash_order_price,ifnull( o.count_strong_wash_order, 0 ) count_strong_wash_order,
ifnull( o.sum_strong_wash_order_price, 0 ) sum_strong_wash_order_price,ifnull( o.count_single_dehydration_order, 0 ) count_single_dehydration_order,ifnull( o.sum_single_dehydration_price, 0 ) sum_single_dehydration_price
FROM ( SELECT area_code FROM t_area a WHERE a.area_level = 4 AND is_del = 0 ) a
LEFT JOIN (SELECT LEFT( o.area_code, 8 ) area_code,count( 1 ) count_order,sum( o.price ) sum_price,
count(IF( o.platform = 0, o.platform, null )) count_app_order, sum(IF( o.platform = 0, o.price, 0 )) sum_app_price,
count(IF( o.platform = 1, o.platform, null )) count_wechat_order,sum(IF( o.platform = 1, o.price, 0 )) sum_wechat_price,
sum(IF( r.refund_state = 1, r.refund_amount, 0 )) sum_refund_price,
count(IF(o.model_type =1, o.id,null)) count_stand_wash_order,
sum(IF(o.model_type =1, o.price,0)) sum_stand_wash_order_price,
count(IF(o.model_type =2, o.id,null)) count_quick_wash_order,
sum(IF(o.model_type =2, o.price,0)) sum_quick_wash_order_price,
count(IF(o.model_type =3, o.id,null)) count_strong_wash_order,
sum(IF(o.model_type =3, o.price,0)) sum_strong_wash_order_price,
count(IF(o.model_type =4, o.id,null)) count_single_dehydration_order,
sum(IF(o.model_type =4, o.price,0)) sum_single_dehydration_price
FROM t_order o
LEFT JOIN t_refund r ON o.order_number = r.order_number
WHERE o.is_del = 0 AND o.pay_way NOT IN ( 3, 4, 20 ) AND o.pay_state = 1
AND date_format( o.order_time, '%Y-%m-%d' ) BETWEEN '2019-11-01' and '2019-11-01'
GROUP BY LEFT ( o.area_code, 8 )) o ON a.area_code = o.area_code
DSL 写法
POST order/_search
{
"size": 0,
"query": {
"bool": {
"must": [
{
"term": {
"payState": {
"value": 1,
"boost": 1
}
}
},
{
"term": {
"isDel": {
"value": 0,
"boost": 1
}
}
},
{
"range": {
"orderDate": {
"from": "2020-03-16",
"to": "2020-03-17",
"include_lower": true,
"include_upper": true,
"boost": 1
}
}
}
],
"must_not": [
{
"multi_match": {
"query": "payWay",
"fields": [
"20^1.0",
"3^1.0",
"4^1.0"
],
"type": "best_fields",
"operator": "OR",
"slop": 0,
"prefix_length": 0,
"max_expansions": 50,
"zero_terms_query": "NONE",
"auto_generate_synonyms_phrase_query": true,
"fuzzy_transpositions": true,
"boost": 1
}
}
],
"adjust_pure_negative": true,
"boost": 1
}
},
"aggregations": {
"branchOfficeAreaCodes": {
"terms": {
"field": "branchOfficeAreaCode",
"size": 1000000,
"min_doc_count": 1,
"shard_min_doc_count": 0,
"show_term_doc_count_error": false,
"order": [
{
"_count": "desc"
},
{
"_key": "asc"
}
]
},
"aggregations": {
"branchOfficeAreaCode": {
"top_hits": {
"from": 0,
"size": 1,
"version": false,
"seq_no_primary_term": false,
"explain": false
}
},
"countOrder": {
"value_count": {
"field": "id"
}
},
"sumOrderPrice": {
"sum": {
"field": "price"
}
},
"sumRefundPrice": {
"sum": {
"field": "sumRefundPrice"
}
},
"countAppOrders": {
"filter": {
"term": {
"platform": {
"value": "0",
"boost": 1
}
}
},
"aggregations": {
"countAppOrder": {
"value_count": {
"field": "id"
}
},
"countWechatOrder": {
"value_count": {
"field": "id"
}
}
}
},
"sum_app_price": {
"filter": {
"term": {
"platform": {
"value": "0",
"boost": 1
}
}
},
"aggregations": {
"sumAppOrderPrice": {
"sum": {
"field": "price"
}
}
}
},
"countWechatOrders": {
"filter": {
"term": {
"platform": {
"value": "1",
"boost": 1
}
}
}
},
"sum_wechat_price": {
"filter": {
"term": {
"platform": {
"value": "1",
"boost": 1
}
}
},
"aggregations": {
"sumWechatOrderPrice": {
"sum": {
"field": "price"
}
}
}
},
"bucket_field": {
"bucket_sort": {
"sort": [
{
"sumOrderPrice": {
"order": "desc"
}
}
],
"from": 0,
"size": 100,
"gap_policy": "SKIP"
}
}
}
}
}
}
DSL
查询结果
High Level REST Client写法
StopWatch watch = new StopWatch();
watch.start();
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
QueryBuilder queryBuilder = QueryBuilders.boolQuery()
.mustNot(QueryBuilders.multiMatchQuery("payWay", "3", "4", "20"))
.must(QueryBuilders.termQuery("payState", 1))
.must(QueryBuilders.termQuery("isDel", 0))
.must(QueryBuilders.rangeQuery("orderDate").from("2020-01-03").to("2020-01-03"));
searchSourceBuilder.query(queryBuilder);
TermsAggregationBuilder aggregation = AggregationBuilders.terms("branchOfficeAreaCodes")
.field("branchOfficeAreaCode").size(10000);
aggregation.subAggregation(AggregationBuilders.topHits("branchOfficeAreaCode").size(1));
aggregation.subAggregation(AggregationBuilders.count("countOrder").field("id"));
aggregation.subAggregation(AggregationBuilders.sum("sumOrderPrice").field("price"));
aggregation.subAggregation(AggregationBuilders.sum("sumRefundPrice").field("sumRefundPrice"));
AggregationBuilder countAppOrder = AggregationBuilders.filter("countAppOrders", QueryBuilders.termQuery("platform", "0"));
countAppOrder.subAggregation(AggregationBuilders.count("countAppOrder").field("id"));
aggregation.subAggregation(countAppOrder);
AggregationBuilder sumAppPrice = AggregationBuilders.filter("sum_app_price", QueryBuilders.termQuery("platform", "0"));
sumAppPrice.subAggregation(AggregationBuilders.sum("sumAppOrderPrice").field("price"));
aggregation.subAggregation(sumAppPrice);
AggregationBuilder countWeChatOrder = AggregationBuilders.filter("countWechatOrders", QueryBuilders.termQuery("platform", "1"));
countAppOrder.subAggregation(AggregationBuilders.count("countWechatOrder").field("id"));
aggregation.subAggregation(countWeChatOrder);
AggregationBuilder sumWeChatPrice = AggregationBuilders.filter("sum_wechat_price", QueryBuilders.termQuery("platform", "1"));
sumWeChatPrice.subAggregation(AggregationBuilders.sum("sumWechatOrderPrice").field("price"));
aggregation.subAggregation(sumWeChatPrice);
List<FieldSortBuilder> fieldSorts=new ArrayList<>();
fieldSorts.add(new FieldSortBuilder("sumOrderPrice").order(SortOrder.DESC));
aggregation.subAggregation(new BucketSortPipelineAggregationBuilder("bucket_field", fieldSorts).from(10).size(10));
searchSourceBuilder.aggregation(aggregation);
searchSourceBuilder.size(0);
searchRequest.source(searchSourceBuilder);
searchRequest.indices("order");
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
watch.stop();
long millis = watch.getTotalTimeMillis();
log.info("查询用时:{}", millis);
Aggregations aggregations = searchResponse.getAggregations();
List<JSONObject> list = new ArrayList<>();
ParsedStringTerms agg1 = aggregations.get("branchOfficeAreaCodes");
List<? extends Terms.Bucket> buckets = agg1.getBuckets();
for (Terms.Bucket bucket : buckets) {
Aggregations aggregationResult = bucket.getAggregations();
Map<String, Aggregation> mapAgg = aggregationResult.getAsMap();
JSONObject data = new JSONObject();
data.put("areaCode", bucket.getKeyAsString());
data.put("areaCodeStr", "areaCodeStr" + bucket.getKeyAsString());
for (Map.Entry<String, Aggregation> aggs : mapAgg.entrySet()) {
String key = aggs.getKey();
Aggregation agg = aggs.getValue();
if ("filter".equals(agg.getType())) {
Aggregations filter = ((ParsedFilter) agg).getAggregations();
Map<String, Aggregation> sub = filter.getAsMap();
for (Map.Entry<String, Aggregation> subAggs : sub.entrySet()) {
Aggregation subAgg = subAggs.getValue();
if ("sum".equals(subAgg.getType())) {
ParsedSum parsedSum = (ParsedSum) subAgg;
double subSun = ((ParsedSum) subAgg).getValue();
data.put(parsedSum.getName(), parsedSum.getValue());
} else if ("value_count".equals(subAgg.getType())) {
ParsedValueCount parsedValueCount = (ParsedValueCount) subAgg;
long copunt = ((ParsedValueCount) subAgg).getValue();
data.put(parsedValueCount.getName(), parsedValueCount.getValue());
}
}
} else if ("sum".equals(agg.getType())) {
ParsedSum parsedSum = (ParsedSum) agg;
double sun = ((ParsedSum) agg).getValue();
data.put(parsedSum.getName(), parsedSum.getValue());
} else if ("value_count".equals(agg.getType())) {
ParsedValueCount parsedValueCount = (ParsedValueCount) agg;
long copunt = ((ParsedValueCount) agg).getValue();
data.put(parsedValueCount.getName(), parsedValueCount.getValue());
} else if ("top_hits".equals(agg.getType())) {
TopHits topHits = (TopHits) agg;
SearchHits searchHits = topHits.getHits();
SearchHit searchHit = searchHits.getAt(0);
Map<String, Object> sourceAsMap = searchHit.getSourceAsMap();
data.put("areaCodeStr", sourceAsMap.get("officeName") + " " + sourceAsMap.get("branchOfficeName"));
data.put("city", sourceAsMap.get("cityName"));
}
}
list.add(data);
}
ES JPA 写法
继承Repository 其他就跟jpa一样的
不用去写dsl语法
public interface AreaDetailRepository extends ElasticsearchRepository<EsAreaDetail, Integer> {
}
评论区