ElasticSearch的RestHighLevelClient高阶操作
作者:互联网
ElasticSearch的RestHighLevelClient高阶操作
文章目录
1.导入es的高阶客户端
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.4.2</version>
</dependency>
2.给容器中注入RestHighLevelClient
@Configuration
public class ElasticsearchConfig {
/**
* 通用的请求设置项, 类似于请求头信息
*/
public static final RequestOptions COMMON_OPTIONS;
static {
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
COMMON_OPTIONS = builder.build();
}
/**
* 容器注入高阶的rest访问client
*/
@Bean
public RestHighLevelClient elasticsearchRestHighLevelClient() {
//final String hostname, final int port, final String scheme
//schemeName 默认是 http, 不传也行
HttpHost httpHost = new HttpHost("192.168.56.10", 9200);
RestClientBuilder builder = RestClient.builder(httpHost);
RestHighLevelClient client = new RestHighLevelClient(builder);
return client;
}
}
3. 自动注入高阶Client
@Autowired
private RestHighLevelClient client;
4. 测试es中保存数据
@Test
void indexData() throws IOException {
//1.新建索引
IndexRequest indexRequest = new IndexRequest("users");
//2.数据的id
indexRequest.id("1");
//第一种保存方式, k-v形式
//indexRequest.source("username", "张三", "age", 18, "gender", "男");
//第二种, 保存json字符串对象的形式
User user = new User();
user.setUsername("张三");
user.setAge(18);
user.setGender("男");
String jsonString = JSON.toJSONString(user);
//3.构建数据 XContentType ====> 必须要指定内容类型
indexRequest.source(jsonString, XContentType.JSON);
//4.执行操作
IndexResponse index = client.index(indexRequest, RequestOptions.DEFAULT);
//提取有用的响应数据
System.out.println(index);
}
5. 测试批量保存数据
@Test
public void bulkTest() throws IOException {
//1.创建emp集合
List<Emp> emps = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Emp emp = new Emp();
emp.setId(i);
emp.setName("张益达" + i + "号");
emp.setGender(i % 2 == 0 ? "男" : "女");
emp.setProfession(i % 3 == 0 ? "vue开发工程师" : "安卓开发工程师");
emps.add(emp);
}
//2.bulk请求
BulkRequest bulkRequest = new BulkRequest();
for (Emp emp : emps) {
//3.index请求
IndexRequest indexRequest = new IndexRequest("emp");
indexRequest.id(emp.getId().toString());
String jsonString = JSON.toJSONString(emp);
indexRequest.source(jsonString, XContentType.JSON);
//4.将index请求放入bulkRequest中
bulkRequest.add(indexRequest);
}
//5.执行
BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
BulkItemResponse[] items = bulk.getItems();
List<Object> collect = Arrays.stream(items).map(BulkItemResponse::getResponse).collect(Collectors.toList());
System.out.println("返回的结果集合:" + collect);
}
@Data
class Emp {
private Integer id;
private String name;
private String gender;
private String profession;
}
6. 测试检索数据 (简单查询)
@Test
void searchData() throws IOException {
//1.创建检索请求
SearchRequest searchRequest = new SearchRequest();
//2.指定索引
searchRequest.indices("bank");
//3.指定DSL检索条件, 构建查询语句
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
MatchQueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("address", "Mill");
sourceBuilder.query(matchQueryBuilder);
sourceBuilder.from(0);
sourceBuilder.size(4);
//按照年龄的值分布进行聚合
TermsAggregationBuilder ageAgg = AggregationBuilders.terms("ageAgg").field("age");
sourceBuilder.aggregation(ageAgg);
//计算平均薪资
AvgAggregationBuilder balanceAvg = AggregationBuilders.avg("balanceAvg").field("balance");
sourceBuilder.aggregation(balanceAvg);
//4.sourceBuilder放入请求中
searchRequest.source(sourceBuilder);
//5.执行检索, 拿到响应
SearchResponse searchResponse = client.search(searchRequest, ElasticsearchConfig.COMMON_OPTIONS);
System.out.println("检索条件 " + sourceBuilder.toString());
System.out.println(searchResponse.toString());
SearchHits hits = searchResponse.getHits();
//真正命中的所有记录
SearchHit[] searchHits = hits.getHits();
for (SearchHit hit : searchHits) {
/**
* "_index": "bank",
* "_type": "account",
* "_id": "970",
* "_score": 5.4032025,
* "_source":
*/
String sourceAsString = hit.getSourceAsString();
Account account = JSON.parseObject(sourceAsString, Account.class);
System.out.println(account);
}
//获取响应结果的聚合
Aggregations aggregations = searchResponse.getAggregations();
Terms ageAggRes = aggregations.get("ageAgg");
for (Terms.Bucket bucket : ageAggRes.getBuckets()) {
String keyAsString = bucket.getKeyAsString();
System.out.println("年龄分布" + keyAsString + "===>有" + bucket.getDocCount() + "人");
}
Avg balanceAvgRes = aggregations.get("balanceAvg");
System.out.println("平均薪资" + balanceAvgRes.getValue());
}
@Data
@ToString
static class Account {
private int account_number;
private int balance;
private String firstname;
private String lastname;
private int age;
private String gender;
private String address;
private String employer;
private String email;
private String city;
private String state;
}
7. 先按照年龄聚合, 再计算每个年龄段的人的平均薪资
@Test
public void searchAgeWithBalance() throws IOException {
//1.创建检索请求
SearchRequest searchRequest = new SearchRequest();
//2.指定索引
searchRequest.indices("bank");
//3.指定DSL检索条件, 构建查询语句
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchAllQuery()); //查询所有
//构造age的聚合
TermsAggregationBuilder ageAgg = AggregationBuilders.terms("ageAgg").field("age").size(20);
/**
* 先按照桶的doc_count排序, doc_count相同, 在根据key排序
*/
//方法一: 按照count升序排列
//ageAgg.order(BucketOrder.count(true));
//按照key降序排列
//ageAgg.order(BucketOrder.key(false));
//方法二: 使用BucketOrder.compound方法
List<BucketOrder> orders = new ArrayList<>();
orders.add(BucketOrder.count(true));
orders.add(BucketOrder.key(false));
//ageAgg.order(orders);
BucketOrder compound = BucketOrder.compound(orders);
ageAgg.order(compound);
//构造年龄的薪资子聚合, 即计算相同年龄段的平均薪资
AvgAggregationBuilder subBalanceAvg = AggregationBuilders.avg("subBalanceAvg").field("balance");
ageAgg.subAggregation(subBalanceAvg);
sourceBuilder.aggregation(ageAgg);
//总的平均薪资
AvgAggregationBuilder balanceAvg = AggregationBuilders.avg("balanceAvg").field("balance");
sourceBuilder.aggregation(balanceAvg);
//4.sourceBuilder放入请求中
searchRequest.source(sourceBuilder);
//5.执行检索, 拿到响应
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
System.out.println("检索条件 " + sourceBuilder.toString());
System.out.println(searchResponse.toString());
SearchHits hits = searchResponse.getHits();
for (SearchHit hit : hits.getHits()) {
String sourceAsString = hit.getSourceAsString();
Account account = JSON.parseObject(sourceAsString, Account.class);
System.out.println("account===>" + account);
}
System.out.println("---------------------------------");
Aggregations aggregations = searchResponse.getAggregations();
ParsedLongTerms ageAggTerms = aggregations.get("ageAgg");
for (Terms.Bucket bucket : ageAggTerms.getBuckets()) {
String keyAsString = bucket.getKeyAsString();
long docCount = bucket.getDocCount();
Aggregations aggregations1 = bucket.getAggregations();
ParsedAvg subBalanceAvgRes = aggregations1.get("subBalanceAvg");
double value = subBalanceAvgRes.getValue();
System.out.println(keyAsString + "岁年龄人数: " + docCount + " || 平均薪资:" + value);
}
ParsedAvg balanceAvgTerms = aggregations.get("balanceAvg");
double value = balanceAvgTerms.getValue();
System.out.println("平均薪资" + value);
}
8. 查出所有年龄分布,以及这些年龄段中【男性】的平均薪资和【女性】的平均薪资以及这个年龄段的【总体平均薪资】
@Test
public void searchBalanceGroupByGender() throws IOException {
//1.检索请求
SearchRequest searchRequest = new SearchRequest();
//2.索引
searchRequest.indices("bank");
//3.构造检索条件, DSL语句
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchAllQuery()); //查询所有
//按照年龄聚合
TermsAggregationBuilder ageAgg = AggregationBuilders.terms("ageAgg").field("age").size(20).order(BucketOrder.key(true));
TermsAggregationBuilder genderAgg = AggregationBuilders.terms("genderAgg").field("gender.keyword").order(BucketOrder.key(true));
AvgAggregationBuilder balanceAvg = AggregationBuilders.avg("balanceAvg").field("balance");
genderAgg.subAggregation(balanceAvg);
ageAgg.subAggregation(genderAgg);
ageAgg.subAggregation(balanceAvg);
sourceBuilder.aggregation(ageAgg);
//4.sourceBuilder放入请求中
searchRequest.source(sourceBuilder);
//5.执行请求
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
System.out.println("检索条件 " + sourceBuilder.toString());
Aggregations aggregations = searchResponse.getAggregations();
ParsedLongTerms ageAggTerms = aggregations.get("ageAgg");
for (Terms.Bucket bucket : ageAggTerms.getBuckets()) {
String keyAsString = bucket.getKeyAsString();
long docCount = bucket.getDocCount();
Aggregations bucketAggregations = bucket.getAggregations();
ParsedStringTerms genderAggterms = bucketAggregations.get("genderAgg");
for (Terms.Bucket genderAggtermsBucket : genderAggterms.getBuckets()) {
String genderKey = genderAggtermsBucket.getKeyAsString();
String gender = genderKey.equals("M") ? "男" : "女";
long genderDocCount = genderAggtermsBucket.getDocCount();
Aggregations genderBucketAggregations = genderAggtermsBucket.getAggregations();
ParsedAvg genderBalanceAvg = genderBucketAggregations.get("balanceAvg");
System.out.println(keyAsString + "岁的人数中," +gender+ "性有" + genderDocCount + "人,平均薪资是" + genderBalanceAvg.getValue());
}
ParsedAvg agebalanceAvg = bucketAggregations.get("balanceAvg");
System.out.println(keyAsString + "岁的人数有: " + docCount + " 平均薪资:" + agebalanceAvg.getValue() + "\n");
}
}
标签:String,sourceBuilder,private,System,ElasticSearch,println,ageAgg,高阶,RestHighLeve 来源: https://blog.csdn.net/lyh1024/article/details/118396490