侧边栏壁纸
博主头像
程彬彬博主等级

业精于勤 荒于嬉 行成于思 毁于随

  • 累计撰写 101 篇文章
  • 累计创建 26 个标签
  • 累计收到 20 条评论

目 录CONTENT

文章目录
ELK

High Level REST Client操作es数据

程彬彬
2020-03-18 / 0 评论 / 0 点赞 / 1,782 阅读 / 0 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2020-03-18,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。
广告 广告

建议大家看官网,我也是根据官网来写的
Java High Level REST Client

github demo 代码

为什么要使用es:
主要有两点,一个是速度快,另外一个就是查询实时数据,不用每天定时去统计。

查询时间段sqles
2019-01-01~2019-03-019336ms1098ms
2019-01-01~2020-03-0155318ms3031ms
2019-11-01~2019-11-3014602ms1323ms
2019-11-01~2019-11-017513ms902ms

es1.pnges2.png

这边分析一个聚合查询的写法

先来看下要统计的sql

SQL 写法

 SELECT a.area_code,ifnull( o.count_order, 0 ) count_order,ifnull( o.sum_price, 0 ) sum_order_price,
ifnull( o.count_app_order, 0 ) count_app_order,ifnull( o.sum_app_price, 0 ) sum_app_order_price,
ifnull( o.count_wechat_order, 0 ) count_wechat_order,ifnull( o.sum_wechat_price, 0 ) sum_wechat_order_price,
ifnull( o.sum_refund_price, 0 ) sum_refund_price,ifnull( o.count_stand_wash_order, 0 ) count_stand_wash_order,
ifnull( o.sum_stand_wash_order_price, 0 ) sum_stand_wash_order_price,ifnull( o.count_quick_wash_order, 0 ) count_quick_wash_order,
ifnull( o.sum_quick_wash_order_price, 0 ) sum_quick_wash_order_price,ifnull( o.count_strong_wash_order, 0 ) count_strong_wash_order,
ifnull( o.sum_strong_wash_order_price, 0 ) sum_strong_wash_order_price,ifnull( o.count_single_dehydration_order, 0 ) count_single_dehydration_order,ifnull( o.sum_single_dehydration_price, 0 ) sum_single_dehydration_price
FROM ( SELECT area_code FROM t_area a WHERE a.area_level = 4 AND is_del = 0 ) a
LEFT JOIN (SELECT LEFT( o.area_code, 8 ) area_code,count( 1 ) count_order,sum( o.price ) sum_price,
count(IF( o.platform = 0, o.platform, null )) count_app_order, sum(IF( o.platform = 0, o.price, 0 )) sum_app_price,
count(IF( o.platform = 1, o.platform, null )) count_wechat_order,sum(IF( o.platform = 1, o.price, 0 )) sum_wechat_price,
sum(IF( r.refund_state = 1, r.refund_amount, 0 )) sum_refund_price,
count(IF(o.model_type =1, o.id,null)) count_stand_wash_order,
sum(IF(o.model_type =1, o.price,0)) sum_stand_wash_order_price,
count(IF(o.model_type =2, o.id,null)) count_quick_wash_order,
sum(IF(o.model_type =2, o.price,0)) sum_quick_wash_order_price,
count(IF(o.model_type =3, o.id,null)) count_strong_wash_order,
sum(IF(o.model_type =3, o.price,0)) sum_strong_wash_order_price,
count(IF(o.model_type =4, o.id,null)) count_single_dehydration_order,
sum(IF(o.model_type =4, o.price,0)) sum_single_dehydration_price
FROM t_order o
LEFT JOIN t_refund r ON o.order_number = r.order_number
WHERE o.is_del = 0 AND o.pay_way NOT IN ( 3, 4, 20 ) AND o.pay_state = 1
AND date_format( o.order_time, '%Y-%m-%d' ) BETWEEN  '2019-11-01' and '2019-11-01'
GROUP BY  LEFT ( o.area_code, 8 )) o ON a.area_code = o.area_code

DSL 写法


POST order/_search
{
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "term": {
            "payState": {
              "value": 1,
              "boost": 1
            }
          }
        },
        {
          "term": {
            "isDel": {
              "value": 0,
              "boost": 1
            }
          }
        },
        {
          "range": {
            "orderDate": {
              "from": "2020-03-16",
              "to": "2020-03-17",
              "include_lower": true,
              "include_upper": true,
              "boost": 1
            }
          }
        }
      ],
      "must_not": [
        {
          "multi_match": {
            "query": "payWay",
            "fields": [
              "20^1.0",
              "3^1.0",
              "4^1.0"
            ],
            "type": "best_fields",
            "operator": "OR",
            "slop": 0,
            "prefix_length": 0,
            "max_expansions": 50,
            "zero_terms_query": "NONE",
            "auto_generate_synonyms_phrase_query": true,
            "fuzzy_transpositions": true,
            "boost": 1
          }
        }
      ],
      "adjust_pure_negative": true,
      "boost": 1
    }
  },
  "aggregations": {
    "branchOfficeAreaCodes": {
      "terms": {
        "field": "branchOfficeAreaCode",
        "size": 1000000,
        "min_doc_count": 1,
        "shard_min_doc_count": 0,
        "show_term_doc_count_error": false,
        "order": [
          {
            "_count": "desc"
          },
          {
            "_key": "asc"
          }
        ]
      },
      "aggregations": {
        "branchOfficeAreaCode": {
          "top_hits": {
            "from": 0,
            "size": 1,
            "version": false,
            "seq_no_primary_term": false,
            "explain": false
          }
        },
        "countOrder": {
          "value_count": {
            "field": "id"
          }
        },
        "sumOrderPrice": {
          "sum": {
            "field": "price"
          }
        },
        "sumRefundPrice": {
          "sum": {
            "field": "sumRefundPrice"
          }
        },
        "countAppOrders": {
          "filter": {
            "term": {
              "platform": {
                "value": "0",
                "boost": 1
              }
            }
          },
          "aggregations": {
            "countAppOrder": {
              "value_count": {
                "field": "id"
              }
            },
            "countWechatOrder": {
              "value_count": {
                "field": "id"
              }
            }
          }
        },
        "sum_app_price": {
          "filter": {
            "term": {
              "platform": {
                "value": "0",
                "boost": 1
              }
            }
          },
          "aggregations": {
            "sumAppOrderPrice": {
              "sum": {
                "field": "price"
              }
            }
          }
        },
        "countWechatOrders": {
          "filter": {
            "term": {
              "platform": {
                "value": "1",
                "boost": 1
              }
            }
          }
        },
        "sum_wechat_price": {
          "filter": {
            "term": {
              "platform": {
                "value": "1",
                "boost": 1
              }
            }
          },
          "aggregations": {
            "sumWechatOrderPrice": {
              "sum": {
                "field": "price"
              }
            }
          }
        },
        "bucket_field": {
          "bucket_sort": {
            "sort": [
              {
                "sumOrderPrice": {
                  "order": "desc"
                }
              }
            ],
            "from": 0,
            "size": 100,
            "gap_policy": "SKIP"
          }
        }
      }
    }
  }
}

DSL
查询结果
image.png

High Level REST Client写法

StopWatch watch = new StopWatch();
watch.start();
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
QueryBuilder queryBuilder = QueryBuilders.boolQuery()
        .mustNot(QueryBuilders.multiMatchQuery("payWay", "3", "4", "20"))
        .must(QueryBuilders.termQuery("payState", 1))
        .must(QueryBuilders.termQuery("isDel", 0))
        .must(QueryBuilders.rangeQuery("orderDate").from("2020-01-03").to("2020-01-03"));
searchSourceBuilder.query(queryBuilder);
TermsAggregationBuilder aggregation = AggregationBuilders.terms("branchOfficeAreaCodes")
        .field("branchOfficeAreaCode").size(10000);
aggregation.subAggregation(AggregationBuilders.topHits("branchOfficeAreaCode").size(1));
aggregation.subAggregation(AggregationBuilders.count("countOrder").field("id"));
aggregation.subAggregation(AggregationBuilders.sum("sumOrderPrice").field("price"));
aggregation.subAggregation(AggregationBuilders.sum("sumRefundPrice").field("sumRefundPrice"));
AggregationBuilder countAppOrder = AggregationBuilders.filter("countAppOrders", QueryBuilders.termQuery("platform", "0"));
countAppOrder.subAggregation(AggregationBuilders.count("countAppOrder").field("id"));
aggregation.subAggregation(countAppOrder);
AggregationBuilder sumAppPrice = AggregationBuilders.filter("sum_app_price", QueryBuilders.termQuery("platform", "0"));
sumAppPrice.subAggregation(AggregationBuilders.sum("sumAppOrderPrice").field("price"));
aggregation.subAggregation(sumAppPrice);
AggregationBuilder countWeChatOrder = AggregationBuilders.filter("countWechatOrders", QueryBuilders.termQuery("platform", "1"));
countAppOrder.subAggregation(AggregationBuilders.count("countWechatOrder").field("id"));
aggregation.subAggregation(countWeChatOrder);
AggregationBuilder sumWeChatPrice = AggregationBuilders.filter("sum_wechat_price", QueryBuilders.termQuery("platform", "1"));
sumWeChatPrice.subAggregation(AggregationBuilders.sum("sumWechatOrderPrice").field("price"));
aggregation.subAggregation(sumWeChatPrice);
List<FieldSortBuilder> fieldSorts=new ArrayList<>();
fieldSorts.add(new FieldSortBuilder("sumOrderPrice").order(SortOrder.DESC));
aggregation.subAggregation(new BucketSortPipelineAggregationBuilder("bucket_field", fieldSorts).from(10).size(10));
searchSourceBuilder.aggregation(aggregation);
searchSourceBuilder.size(0);
searchRequest.source(searchSourceBuilder);
searchRequest.indices("order");
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
watch.stop();
long millis = watch.getTotalTimeMillis();
log.info("查询用时:{}", millis);
Aggregations aggregations = searchResponse.getAggregations();
List<JSONObject> list = new ArrayList<>();
ParsedStringTerms agg1 = aggregations.get("branchOfficeAreaCodes");
List<? extends Terms.Bucket> buckets = agg1.getBuckets();
for (Terms.Bucket bucket : buckets) {
    Aggregations aggregationResult = bucket.getAggregations();
    Map<String, Aggregation> mapAgg = aggregationResult.getAsMap();
    JSONObject data = new JSONObject();
    data.put("areaCode", bucket.getKeyAsString());
    data.put("areaCodeStr", "areaCodeStr" + bucket.getKeyAsString());
    for (Map.Entry<String, Aggregation> aggs : mapAgg.entrySet()) {
        String key = aggs.getKey();
        Aggregation agg = aggs.getValue();
     
        if ("filter".equals(agg.getType())) {
            Aggregations filter = ((ParsedFilter) agg).getAggregations();
            Map<String, Aggregation> sub = filter.getAsMap();
            for (Map.Entry<String, Aggregation> subAggs : sub.entrySet()) {
                Aggregation subAgg = subAggs.getValue();
                if ("sum".equals(subAgg.getType())) {
                    ParsedSum parsedSum = (ParsedSum) subAgg;
                    double subSun = ((ParsedSum) subAgg).getValue();
                    data.put(parsedSum.getName(), parsedSum.getValue());
                } else if ("value_count".equals(subAgg.getType())) {
                    ParsedValueCount parsedValueCount = (ParsedValueCount) subAgg;
                    long copunt = ((ParsedValueCount) subAgg).getValue();
                    data.put(parsedValueCount.getName(), parsedValueCount.getValue());
                }
            }
        } else if ("sum".equals(agg.getType())) {
            ParsedSum parsedSum = (ParsedSum) agg;
            double sun = ((ParsedSum) agg).getValue();
            data.put(parsedSum.getName(), parsedSum.getValue());
        } else if ("value_count".equals(agg.getType())) {
            ParsedValueCount parsedValueCount = (ParsedValueCount) agg;
            long copunt = ((ParsedValueCount) agg).getValue();
            data.put(parsedValueCount.getName(), parsedValueCount.getValue());
        } else if ("top_hits".equals(agg.getType())) {
            TopHits topHits = (TopHits) agg;
            SearchHits searchHits = topHits.getHits();
            SearchHit searchHit = searchHits.getAt(0);
            Map<String, Object> sourceAsMap = searchHit.getSourceAsMap();
            data.put("areaCodeStr", sourceAsMap.get("officeName") + " " + sourceAsMap.get("branchOfficeName"));
            data.put("city", sourceAsMap.get("cityName"));
        }
    }
    list.add(data);
}

ES JPA 写法

继承Repository 其他就跟jpa一样的
不用去写dsl语法

public interface AreaDetailRepository extends ElasticsearchRepository<EsAreaDetail, Integer> {
}
0
广告 广告

评论区