> 技术文档 > Springboot中使用Elasticsearch(部署+使用+讲解 最完整)_springboot elasticsearch

Springboot中使用Elasticsearch(部署+使用+讲解 最完整)_springboot elasticsearch

目录

引言

一、docker中安装Elasticsearch

1、创建es专有的网络

2、开放端口

3、在es-net网络上安装es和kibana

4、可能出现的问题

5、测试

6、安装IK分词器

7、测试IK分词器

二、结合业务实战

1、准备依赖

2、配置yml

3、读取yml配置

4、准备es配置类

5、编写测试代码

6、使用mq异步修改es的表数据

7、实现搜索功能

三、简单介绍Elasticsearch

1、表结构与Mysql的对比

2、Mapping映射属性

3、索引库的CRUD

创建索引库和映射( * ):

查询索引库:

修改索引库:

删除索引库:

4、文档操作的CRUD

新增文档:

查询文档:

删除文档:

修改文档:

批处理:

四、RestAPI

1、初始化RestClient

(1)引入es的RestHighLevelClient依赖

(2)初始化RestHighLevelClient

2、在kibana的客户端准备创建索引库

3、Java客户端创建索引库

五、RestClient操作文档(重在方法理解)

1、准备实体类

2、Java实现CRUD(重点)

(1)增:

(2)删:

(3)改:

(4)查:

注意:

3、批量导入文档:

六、JavaRestClient查询

基本步骤(重点)

1、叶子查询

2、复合查询

3、排序和分页

4、高亮

七、数据聚合

八、自己封装的一个Utils

九、亿级别数据查询

1、线程池配置文件类

2、线程池工具类

3、线程池配置注入bean

4、游标查询代码


引言

Elasticsearch:用于数据存储、计算和搜索

  • Mysql:擅长事务类型操作,可以确保数据的安全和一致性

  • Elasticsearch:擅长海量数据的搜索、分析、计算

基于这个特点我打算改造用户方面的功能,基于用户量比较多,可能达到一万以上甚至更多,需要对用户进行搜索或者各种操作,我相信es也比较适合。

在这篇文章前面是实战后面是具体讲解,对于某些方法可以在后面讲解中对应查找来使用

一、docker中安装Elasticsearch

先说命令,后面再说可能会出现的问题。

1、创建es专有的网络

因为测试需要部署kibana容器作为一个图形化界面,创建一个网络方便让es和kibana容器互联。

docker network create es-net

2、开放端口

宝塔:

腾讯云:

56019200 9300

3、在es-net网络上安装es和kibana

这里我安装7.12.1版本的es和kibana,因为之前学习有现有的镜像包安装更快

分别执行这两条指令:

docker run -d \\ --name es \\ -e \"ES_JAVA_OPTS=-Xms512m -Xmx512m\" \\ -e \"discovery.type=single-node\" \\ -v es-data:/usr/share/elasticsearch/data \\ -v es-plugins:/usr/share/elasticsearch/plugins \\ --privileged \\ --network es-net \\ -p 9200:9200 \\ -p 9300:9300 \\ elasticsearch:7.12.1 docker run -d \\--name kibana \\-e ELASTICSEARCH_HOSTS=http://es:9200 \\--network=es-net \\-p 5601:5601 \\kibana:7.12.1

4、可能出现的问题

这里我是在宝塔上部署的,由于我之前创建容器的时候没有开启防火墙的端口,应该先去开启防火墙再去安装docker容器,我这些流程出现混淆,导致出现下面这些类似的报错:

设置失败!500 Server Error for http+docker://localhost/v1.45/containers/1e013
Error response from daemon: Failed to Setup IP tables: Unable to enable SKIP DNAT rule: (iptables failed: iptables --wait -t nat -I DOCKER -i br-b649822bbcff -j RETURN: iptables: No chain/target/match by that name. (exit status 1))

解决办法是先去开放端口然后重启docker服务再去安装es和kibana

重启docker:

systemctl restart docker

然后再去重新安装就行

可以参考:【DockerCE】运行Docker的服务器报“Failed to Setup IP tables“的解决方法_error response from daemon: failed to setup ip tab-CSDN博客

5、测试

es:

服务器ip:9200

kibana:

服务器ip:5601

选择Explore on my own之后,进入主页面:

测试安装成功!

查看docker:

或者使用指令:

docker ps

记住kibana是用于你开发的时候测试使用,比较方便的图形化界面,实际开发也只是用es。

6、安装IK分词器

docker exec -it es ./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.12.1/elasticsearch-analysis-ik-7.12.1.zip

重启es容器使其生效:

docker restart es

还有其他的方式,可以参考下面的博客,个人也觉得离线的方式比较块一些:

【Elasticsearch】IK分词器的下载及使用_ik分词器下载-CSDN博客文章浏览阅读4.1k次,点赞10次,收藏25次。安装IK分词器_ik分词器下载 https://blog.csdn.net/qq_73639699/article/details/139347283?ops_request_misc=%257B%2522request%255Fid%2522%253A%25226344a75df117eed8ed196abb46ce022f%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=6344a75df117eed8ed196abb46ce022f&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduend~default-1-139347283-null-null.142^v101^pc_search_result_base6&utm_term=%E6%80%8E%E4%B9%88%E4%B8%8B%E8%BD%BDik%E5%88%86%E8%AF%8D%E5%99%A8&spm=1018.2226.3001.4187

7、测试IK分词器

IK分词器包含两种模式:

  • ik_smart:智能语义切分

  • ik_max_word:最细粒度切分

进入Dev tools:

先测试Elasticsearch官方提供的标准分词器:

POST /_analyze{ \"analyzer\": \"standard\", \"text\": \"在CSDN学习java太棒了\"}

测试IK分词器:

POST /_analyze{ \"analyzer\": \"ik_smart\", \"text\": \"在CSDN学习java太棒了\"}

测试成功,安装分词器成功!

二、结合业务实战

一般使用easy-es比较多,下面是基于原生es的操作,如果想看easy-es可以参考我照片博客:

SpringBoot中easy-es入门实战(结合官方文档版)-CSDN博客文章浏览阅读386次,点赞10次,收藏15次。本文主要是参考官方文档进行编写,记录一下自己一些比较常使用easy-es使用方法和内容,其实他的使用和MybatisPlus差不多的,之前我还写了一些关于es的博客可以参考一下:Springboot中使用Elasticsearch(部署+使用+讲解 最完整)_spring boot elasticsearch-CSDN博客最完整最详细的springboot中使用es,在前面有服务器部署es相关的东西,在后面有使用java的实战,对于实战的方法使用结合官网深度去研究和讲解。https://blog.csdn.net/qq_73440769/article/details/144790200?spm=1001.2014.3001.5501https://blog.csdn.net/qq_73440769/article/details/144790200?spm=1001.2014.3001.5501https://blog.csdn.net/qq_73440769/article/details/144790200?spm=1001.2014.3001.5501https://blog.csdn.net/qq_73440769/article/details/144790200?spm=1001.2014.3001.5501https://blog.csdn.net/qq_73440769/article/details/144790200?spm=1001.2014.3001.5501https://blog.csdn.net/qq_73440769/article/details/144790200?spm=1001.2014.3001.5501https://blog.csdn.net/qq_73440769/article/details/144790200?spm=1001.2014.3001.5501

有个工具类在最后面,应该是忘记加了,后面我还新增了一个亿级别数据量的优化查询,如果有大佬有更好的查询方案推荐,欢迎评论区说说。

数据同步原理:当mysql数据发生改变时发送消息到mq,es服务接收消息,进行更新,可以参考我下面两个博客:

ELK实战(最详细)-CSDN博客文章浏览阅读1.2k次,点赞29次,收藏19次。使用logstash对MySQL和ElasticSearch做数据同步_elkhttps://blog.csdn.net/qq_73440769/article/details/144984232?spm=1001.2014.3001.5501https://blog.csdn.net/qq_73440769/article/details/144984232?spm=1001.2014.3001.5501https://blog.csdn.net/qq_73440769/article/details/144984232?spm=1001.2014.3001.5501https://blog.csdn.net/qq_73440769/article/details/144984232?spm=1001.2014.3001.5501https://blog.csdn.net/qq_73440769/article/details/144984232?spm=1001.2014.3001.5501使用Canal将MySQL数据同步到ES(Linux)_canal 同步es-CSDN博客文章浏览阅读1.3k次,点赞24次,收藏12次。通过canal实现MySQL和ES数据同步,同时里面也讲到自己遇到的一些bug,有完整的实现测试的流程_canal 同步eshttps://blog.csdn.net/qq_73440769/article/details/144146890?spm=1001.2014.3001.5501https://blog.csdn.net/qq_73440769/article/details/144146890?spm=1001.2014.3001.5501https://blog.csdn.net/qq_73440769/article/details/144146890?spm=1001.2014.3001.5501https://blog.csdn.net/qq_73440769/article/details/144146890?spm=1001.2014.3001.5501https://blog.csdn.net/qq_73440769/article/details/144146890?spm=1001.2014.3001.5501

es操作步骤:

1.创建Request2.准备请求参数3.聚合参数4.发送请求5.解析聚合结果 5.1.获取聚合 5.2.获取聚合中的桶 5.3.遍历桶内数据

1、准备依赖

  7.12.1     org.elasticsearch.client elasticsearch-rest-high-level-client ${elasticsearch.version}   

2、配置yml

quick: elasticsearch: host: ${quick.elasticsearch.host} # 服务器IP地址 port: ${quick.elasticsearch.port} # 服务器端口号

3、读取yml配置

import lombok.Data;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.stereotype.Component;/* * @读取yml配置 */@Component@Data@ConfigurationProperties(prefix = \"quick.elasticsearch\")public class ElasticSearchProperties { // es地址 private String host; // es端口 private int port;}

4、准备es配置类

import com.quick.properties.ElasticSearchProperties;import org.apache.http.HttpHost;import org.elasticsearch.client.RestClient;import org.elasticsearch.client.RestHighLevelClient;import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * es配置类 */@Configurationpublic class ElasticSearchConfig { @Bean(destroyMethod = \"close\") //程序开始时交给bean对象注入, 指定了当bean被销毁时应该调用其close方法 @ConditionalOnMissingBean//保证spring容器里面只有一个utils对象(当没有这个bean对象再去创建,有就没必要去创建了) public RestHighLevelClient client(ElasticSearchProperties elasticSearchProperties){ return new RestHighLevelClient(RestClient.builder( new HttpHost( elasticSearchProperties.getHost(), elasticSearchProperties.getPort(), \"http\" ) )); }}

5、编写测试代码

UserDoc:
/* User索引库实体类*/@Data@Builder@NoArgsConstructor@AllArgsConstructorpublic class UserDoc { // 这里设计为String类型,因为在发送消息的时候是以字符的形式 @Schema(description = \"用户ID\") private String id; @Schema(description = \"用户编号\") private String quickUserId; @Schema(description = \"姓名\") private String name; @Schema(description = \"手机号\") private String phone; @Schema(description = \"关注数\") private Long follow; @Schema(description = \"粉丝数\") private Long fan; @Schema(description = \"性别 0 女 1 男\") private String sex; @Schema(description = \"头像\") private String avatar; @JsonFormat(pattern = \"yyyy-MM-dd HH:mm:ss\") @Schema(description = \"注册时间\") private LocalDateTime createTime; @Schema(description = \"用龄,单位:年\") private Long useTime; @Schema(description = \"收藏数\") private Long collectNumber; @Schema(description = \"评分数\") private Long markNumber; @Schema(description = \"个人简介\") private String briefIntroduction;}
UserDocHandleResponseVO:
/** * 用户文档处理响应 */@Data@Builderpublic class UserDocHandleResponseVO { ListuserDocList; Long total;}

controller:

@RestController@RequestMapping(\"/user/es-user\")@Tag(name=\"C端-用户es相关接口\")@Slf4jpublic class EsUserController { @Resource private UserService userService; @Operation(summary = \"es查询所有用户\") @GetMapping(\"/query-all-user\") public Result queryAllUser() throws IOException { return Result.success(userService.queryAllUser()); }}

service:

public interface UserService extends IService { UserDocHandleResponseVO queryAllUser() throws IOException;}

impl:

@Service@Slf4jpublic class UserServiceImpl extends ServiceImpl implements UserService { @Resource private RestHighLevelClient restHighLevelClient; @Override public UserDocHandleResponseVO queryAllUser() throws IOException { // 1.创建Request SearchRequest request = new SearchRequest(\"user\"); // 2.组织请求参数 request.source().query(QueryBuilders.matchAllQuery()); // 3.发送请求 SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); // 4.解析响应 return handleResponse(response); } private static UserDocHandleResponseVO handleResponse(SearchResponse response) { SearchHits searchHits = response.getHits(); // 4.1 获取总条数 long total = 0L; if (searchHits.getTotalHits() != null) { total = searchHits.getTotalHits().value; } // 4.2 获取命中的数据 SearchHit[] hits = searchHits.getHits(); List userDocList=new ArrayList(); for (SearchHit hit : hits) { // 4.2.1 获取source结果(结果是一个json对象) String json = hit.getSourceAsString(); // 4.2.2 转为实体对象 UserDoc userDoc = JSONUtil.toBean(json, UserDoc.class); userDocList.add(userDoc); } System.out.println(\"userDocList = \" + userDocList); System.out.println(\"total = \" + total); return UserDocHandleResponseVO.builder() .userDocList(userDocList) .total(total) .build(); }}

 

测试:

测试成功!!!

这里我将解析es的代码封装成一个工具类的方法

import com.quick.es.GenericSearchResponseVO;import org.elasticsearch.action.search.SearchResponse;import org.elasticsearch.search.SearchHit;import org.elasticsearch.search.SearchHits; import com.alibaba.fastjson.JSON;import java.util.ArrayList; import java.util.List;/** * es处理搜索响应的工具类 */public class SearchResponseUtil { /** * 处理ES搜索响应 * * @param response ES搜索响应对象 * @param clazz 目标文档对象的类类型 * @return 封装后的搜索响应对象 * @param  泛型,表示文档的类型,用于封装返回对应类型文档的返回结果 */ public static  GenericSearchResponseVO handleResponse(SearchResponse response, Class clazz) { // 获取搜索命中的结果 SearchHits searchHits = response.getHits(); // 初始化总命中数为0 long total = 0L; // 如果总命中数不为空,则赋值 if (searchHits.getTotalHits() != null) { total = searchHits.getTotalHits().value; } // 初始化文档列表 List docList = new ArrayList(); // 获取所有命中的文档 SearchHit[] hits = searchHits.getHits(); // 遍历所有命中的文档 for (SearchHit hit : hits) { // 获取文档的JSON字符串 String json = hit.getSourceAsString(); // 将JSON字符串解析为目标类型的对象 //T doc = JSON.parseObject(json, clazz); 使用这个的话如果反序列化会报错 T doc = JSONUtil.toBean(json, clazz); // 将解析后的文档对象添加到列表中 docList.add(doc); } // 构建并返回封装后的搜索响应对象 return GenericSearchResponseVO.builder() .total(total) // 设置总命中数 .docList(docList) // 设置文档列表 .build(); }}

T doc = JSON.parseObject(json, clazz); 如果工具类用这个解析json的话反序列化会报错,具体怎么解决欢迎在评论区说一下。

 将返回的对象封装成一个目标返回对象

@Data@Builderpublic class GenericSearchResponseVO { private Long total; private List docList; }

 修改impl的代码

 @Override public GenericSearchResponseVO queryAllUser() throws IOException { // 1.创建Request SearchRequest request = new SearchRequest(\"user\"); // 2.组织请求参数 request.source().query(QueryBuilders.matchAllQuery()); // 3.发送请求 SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); // 4.解析响应 /*return handleResponse(response);*/ return SearchResponseUtil.handleResponse(response, UserDoc.class); }

6、使用mq异步修改es的表数据

可以参考:五、2、(3) ===>修改操作

service:

public interface EsUserDocService { GenericSearchResponseVO queryAllUserDoc() throws IOException; // 修改UserDoc void updateUserDocByOne(UserDoc userDoc) throws IOException;}

impl:

import cn.hutool.json.JSONUtil;import com.quick.vo.GenericSearchResponseVO;import com.quick.entity.UserDoc;import com.quick.service.EsUserDocService;import com.quick.utils.ElasticsearchUtil;import jakarta.annotation.Resource;import lombok.extern.slf4j.Slf4j;import org.elasticsearch.action.search.SearchRequest;import org.elasticsearch.action.search.SearchResponse;import org.elasticsearch.action.update.UpdateRequest;import org.elasticsearch.client.RequestOptions;import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.common.xcontent.XContentType;import org.elasticsearch.index.query.QueryBuilders;import org.springframework.stereotype.Service;import java.io.IOException;@Slf4j@Servicepublic class EsUserDocServiceImpl implements EsUserDocService { @Resource private RestHighLevelClient restHighLevelClient; @Override public GenericSearchResponseVO queryAllUserDoc() throws IOException { // 页码 int pageNumber = 2; // 每页数量 int pageSize = 10; // 计算起始位置 int from = ElasticsearchUtil.calculateFrom(pageNumber,pageSize); // 1.创建Request SearchRequest request = new SearchRequest(\"user\"); // 2.组织请求参数 request.source() .query(QueryBuilders.matchAllQuery()) .from(from) .size(pageSize); // 3.发送请求 SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); // 4.解析响应 /*return handleResponse(response);*/ return ElasticsearchUtil.handleResponse(response, UserDoc.class,pageNumber); } @Override public void updateUserDocByOne(UserDoc userDoc) throws IOException { // 1.准备Request UpdateRequest request = new UpdateRequest(\"user\",userDoc.getId() ); // 2.准备请求参数 // 将UserDoc转json String doc = JSONUtil.toJsonStr(userDoc); // 准备Json文档,XContentType.JSON表示json格式 request.doc(doc, XContentType.JSON); // 3.发送请求 restHighLevelClient.update(request, RequestOptions.DEFAULT); log.info(\"更新用户在es中数据成功,修改后文档为:{}\",doc); }}

编写mq监听:

import com.quick.entity.UserDoc;import com.quick.service.EsUserDocService;import lombok.RequiredArgsConstructor;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;/** * Es中UserDoc相关 接收消息 */@Component@RequiredArgsConstructorpublic class EsUserDocListener { public static final String UPDATE_USER_DOC_QUEUE_NAME = \"userDoc.updateUserDocByOne.queue\"; public static final String UPDATE_USER_DOC_EXCHANGE_NAME = \"updateUserDocByOne.direct\"; public static final String UPDATE_USER_DOC_ROUTING_KEY = \"updateUserDocByOne.success\"; private final EsUserDocService esUserDocService; @RabbitListener(bindings = @QueueBinding( value=@Queue(name = UPDATE_USER_DOC_QUEUE_NAME,durable = \"true\"), exchange = @Exchange(name = UPDATE_USER_DOC_EXCHANGE_NAME), key = UPDATE_USER_DOC_ROUTING_KEY ), // 在@RabbitListener注解中指定容器工厂 containerFactory = \"customContainerFactory\") public void listenUpdateUserDoc(UserDoc userDoc) throws IOException { esUserDocService.updateUserDocByOne(userDoc); }}

编写实现修改操作的发送消息端:

 @Override public void update(UserDTO userDTO) { User user=userMapper.selectById(userDTO.getUserId()); BeanUtils.copyProperties(userDTO,user); userMapper.updateById(user); UserDoc userDoc = BeanUtil.copyProperties(user, UserDoc.class); //发送mq异步消息修改 try { rabbitTemplate.convertAndSend(  EsUserDocListener.UPDATE_USER_DOC_EXCHANGE_NAME, // 交换机名称  EsUserDocListener.UPDATE_USER_DOC_ROUTING_KEY, // 路由键  userDoc // 消息内容 ); } catch (AmqpException e) { log.error(\"发送消息失败\", e); } }

测试:

7、实现搜索功能

controller:

 @Operation(summary = \"搜索功能\") @GetMapping(\"/search\") public Result<GenericSearchResponseVO> search( @RequestParam(required = false) String searchKeyword, @RequestParam(required = false) Integer pageNumber, @RequestParam(required = false) Integer pageSize ) throws IOException { return Result.success(esUserDocService.search(searchKeyword,pageNumber,pageSize)); }

service:

GenericSearchResponseVO search(String searchKeyword,Integer pageNumber,Integer pageSize)throws IOException;

impl:

 @Override public GenericSearchResponseVO search(String searchKeyword,Integer pageNumber,Integer pageSize) throws IOException{ // 如果不传就是默认 if (pageNumber == null) { // 页码 pageNumber = 1; } if (pageSize == null) { // 每页数量 pageSize = 10; } // 计算起始位置 int from = ElasticsearchUtil.calculateFrom(pageNumber,pageSize); // 1.创建Request SearchRequest request=new SearchRequest(\"user\"); // 2.组织请求参数 BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); if (searchKeyword != null && !searchKeyword.isEmpty()) { boolQueryBuilder.must(QueryBuilders.multiMatchQuery(searchKeyword, \"name\", \"briefIntroduction\", \"phone\",\"quickUserId\")); } request.source() .query(boolQueryBuilder) // 查询条件 .from(from) .size(pageSize) .sort(\"fan\", SortOrder.DESC); // 3.发送请求 SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT); // 4.解析响应 return ElasticsearchUtil.handleResponse(response, UserDoc.class); }

测试:

quickUserId精确查询:

 

默认按粉丝最多排序:

 

名字词条查询:

其他的es业务逻辑也是差不多这两个实现, 可以参考后面的一些语法进行对应的操作,后续我还会持续更新一些es拓展和升级的操作。

三、简单介绍Elasticsearch

这里只做演示和介绍,如果只需要了解在Java中使用可跳过,去看第四部分,但是这些还是很有必要了解一下。

具体的DSL操作参考:Docs

1、表结构与Mysql的对比

MySQL

Elasticsearch

说明

Table

Index

索引(index),就是文档的集合,类似数据库的表(table)

Row

Document

文档(Document),就是一条条的数据,类似数据库中的行(Row),文档都是JSON格式

Column

Field

字段(Field),就是JSON文档中的字段,类似数据库中的列(Column)

Schema

Mapping

Mapping(映射)是索引中文档的约束,例如字段类型约束。类似数据库的表结构(Schema)

SQL

DSL

DSL是elasticsearch提供的JSON风格的请求语句,用来操作elasticsearch,实现CRUD

2、Mapping映射属性

Mapping是对索引库中文档的约束,常见的Mapping属性包括:

type:字段数据类型,常见的简单类型有:

(1)字符串:text(可分词的文本)、keyword(精确值,例如:品牌、国家、ip地址)

(2)数值:longintegershortbytedoublefloat

(3)布尔:boolean

(4)日期:date

(5)对象:object

index:是否创建索引,默认为true

analyzer:使用哪种分词器

properties:该字段的子字段

3、索引库的CRUD

因为到时候具体创建索引库还是需要使用这个语法操作,文档的CRUD可以使用Java代码替代,还是需要重视的

创建索引库和映射* ):

基本语法

  • 请求方式:PUT

  • 请求路径:/索引库名,可以自定义

  • 请求参数:mapping映射

示例:

PUT /索引库名称{ \"mappings\": { \"properties\": { \"字段名\":{ \"type\": \"text\", \"analyzer\": \"ik_smart\" }, \"字段名2\":{ \"type\": \"keyword\", \"index\": \"false\" }, \"字段名3\":{ \"properties\": { \"子字段\": { \"type\": \"keyword\" } } }, // ...略 } }}

索引库的其他CRUD如下:

查询索引库
GET /索引库名
修改索引库
PUT /索引库名/_mapping{ \"properties\": { \"新字段名\":{ \"type\": \"integer\" } }}
删除索引库
DELETE /索引库名

4、文档操作的CRUD

了解即可,毕竟是使用Java实现比较实际,但是语法的熟悉还是很重要的,就像Mysql有mybatisplus,但是还要了解sql。

新增文档:
POST /索引库名/_doc/文档id{ \"字段1\": \"值1\", \"字段2\": \"值2\", \"字段3\": { \"子属性1\": \"值3\", \"子属性2\": \"值4\" },}
查询文档:
GET /{索引库名称}/_doc/{id}
删除文档:
DELETE /{索引库名}/_doc/id值
修改文档:

全量修改(覆盖之前,如果改id不存在则为新增):

PUT /{索引库名}/_doc/文档id{ \"字段1\": \"值1\", \"字段2\": \"值2\", // ... 略}

局部修改(局部某个字段):

POST /{索引库名}/_update/文档id{ \"doc\": { \"字段名\": \"新的值\", }}
批处理:

批处理采用POST请求,基本语法如下:

POST _bulk{ \"index\" : { \"_index\" : \"test\", \"_id\" : \"1\" } }{ \"field1\" : \"value1\" }{ \"delete\" : { \"_index\" : \"test\", \"_id\" : \"2\" } }{ \"create\" : { \"_index\" : \"test\", \"_id\" : \"3\" } }{ \"field1\" : \"value3\" }{ \"update\" : {\"_id\" : \"1\", \"_index\" : \"test\"} }{ \"doc\" : {\"field2\" : \"value2\"} }

注意:批操作,顾名思义,几个操作一起执行,可以多种操作,也可以一个操作多条。

其中:

index代表新增操作

  • _id指定要操作的文档id

  • { \"field1\" : \"value1\" }:则是要新增的文档内容

delete代表删除操作

  • _index:指定索引库名

  • _id指定要操作的文档id

update代表更新操作

  • _index:指定索引库名

  • _id指定要操作的文档id

  • { \"doc\" : {\"field2\" : \"value2\"} }:要更新的文档字段

四、RestAPI

为什么要使用:

ES官方提供了各种不同语言的客户端,用来操作ES。这些客户端的本质就是组装DSL语句,通过http请求发送给ES。

官方文档地址:Elasticsearch Clients | Elastic

针对我们的版本:

在这里有该版本的各种操作API,可以参考来写代码

1、初始化RestClient

(1)引入esRestHighLevelClient依赖

依赖:

  7.12.1     org.elasticsearch.client elasticsearch-rest-high-level-client ${elasticsearch.version}   
(2)初始化RestHighLevelClient

基本语法如下:

RestHighLevelClient client = new RestHighLevelClient(RestClient.builder( HttpHost.create(\"http://服务器IP地址:9200\")));

做一个测试类测试一下:

成功有输出,测试代码参考如下:

package com.quick.es;import org.apache.http.HttpHost;import org.elasticsearch.client.RestClient;import org.elasticsearch.client.RestHighLevelClient;import org.junit.jupiter.api.AfterEach;import org.junit.jupiter.api.BeforeEach;import org.junit.jupiter.api.Test;import java.io.IOException;public class EsTest { private RestHighLevelClient client; // 初始化 @BeforeEach void setUp() { this.client = new RestHighLevelClient(RestClient.builder( HttpHost.create(\"http://你的服务器IP地址:9200\") )); } // 测试连接es @Test void testConnect() { System.out.println(\"client: \"+client); } // 销毁 @AfterEach void tearDown() throws IOException { this.client.close(); }}

2、在kibana的客户端准备创建索引库

下面为我对应我的用户表创建的索引库

注意:上面演示的图片中quick_user_id位置type后面多加了给逗号,当时不注意到时候记得注意这个错误,后面的Json没有问题。

PUT /user{ \"mappings\": { \"properties\": { \"id\": { \"type\": \"keyword\" }, \"quick_user_id\":{ \"type\": \"keyword\" }, \"name\":{ \"type\": \"text\", \"analyzer\": \"ik_max_word\" }, \"sex\":{ \"type\": \"keyword\" }, \"avatar\":{ \"type\": \"keyword\", \"index\": false }, \"phone\":{ \"type\": \"text\", \"analyzer\": \"ik_max_word\" }, \"follow\":{ \"type\": \"integer\" }, \"fan\":{ \"type\": \"integer\" }, \"use_time\":{ \"type\": \"integer\" }, \"collect_number\":{ \"type\": \"integer\", \"index\": false }, \"mark_number\":{ \"type\": \"integer\", \"index\": false }, \"brief_introduction\":{ \"type\": \"text\", \"index\": false }, \"create_time\":{ \"type\": \"date\" } } }}

拿着上面这些创建好的映射在Java客户端创建

3、Java客户端创建索引库

关于一些知识点,这里我拿之前在b站学习的PPT的内容展示一下,我觉得这个已经很直观的体现出创建索引库的一些解释:

下面给出测试类所有代码,记得服务器IP地址替换成自己的。

package com.quick.es;import org.apache.http.HttpHost;import org.elasticsearch.client.RequestOptions;import org.elasticsearch.client.RestClient;import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.client.indices.CreateIndexRequest;import org.elasticsearch.common.xcontent.XContentType;import org.junit.jupiter.api.AfterEach;import org.junit.jupiter.api.BeforeEach;import org.junit.jupiter.api.Test;import java.io.IOException;public class EsTest { private RestHighLevelClient client; static final String USER_MAPPING_TEMPLATE =\"{\\n\" + \" \\\"mappings\\\": {\\n\" + \" \\\"properties\\\": {\\n\" + \" \\\"id\\\": {\\n\" + \" \\\"type\\\": \\\"keyword\\\"\\n\" + \" },\\n\" + \" \\\"quick_user_id\\\":{\\n\" + \" \\\"type\\\": \\\"keyword\\\"\\n\" + \" },\\n\" + \" \\\"name\\\":{\\n\" + \" \\\"type\\\": \\\"text\\\",\\n\" + \" \\\"analyzer\\\": \\\"ik_max_word\\\"\\n\" + \" },\\n\" + \" \\\"sex\\\":{\\n\" + \" \\\"type\\\": \\\"keyword\\\"\\n\" + \" },\\n\" + \" \\\"avatar\\\":{\\n\" + \" \\\"type\\\": \\\"keyword\\\",\\n\" + \" \\\"index\\\": false\\n\" + \" },\\n\" + \" \\\"phone\\\":{\\n\" + \" \\\"type\\\": \\\"text\\\",\\n\" + \" \\\"analyzer\\\": \\\"ik_max_word\\\"\\n\" + \" },\\n\" + \" \\\"follow\\\":{\\n\" + \" \\\"type\\\": \\\"integer\\\"\\n\" + \" },\\n\" + \" \\\"fan\\\":{\\n\" + \" \\\"type\\\": \\\"integer\\\"\\n\" + \" },\\n\" + \" \\\"use_time\\\":{\\n\" + \" \\\"type\\\": \\\"integer\\\"\\n\" + \" },\\n\" + \" \\\"collect_number\\\":{\\n\" + \" \\\"type\\\": \\\"integer\\\",\\n\" + \" \\\"index\\\": false\\n\" + \" },\\n\" + \" \\\"mark_number\\\":{\\n\" + \" \\\"type\\\": \\\"integer\\\",\\n\" + \" \\\"index\\\": false\\n\" + \" },\\n\" + \" \\\"brief_introduction\\\":{\\n\" + \" \\\"type\\\": \\\"text\\\",\\n\" + \" \\\"index\\\": false\\n\" + \" },\\n\" + \" \\\"create_time\\\":{\\n\" + \" \\\"type\\\": \\\"date\\\"\\n\" + \" }\\n\" + \" }\\n\" + \" }\\n\" + \"}\"; // 初始化 @BeforeEach void setUp() { this.client = new RestHighLevelClient(RestClient.builder( HttpHost.create(\"http://服务器IP地址:9200\") )); } // 测试连接es @Test void testConnect() { System.out.println(\"client: \"+client); } // 创建索引库 @Test void testCreateIndex() throws IOException { // 1.创建Request对象 CreateIndexRequest request = new CreateIndexRequest(\"user\"); // 2.准备请求参数 request.source(USER_MAPPING_TEMPLATE, XContentType.JSON); // 3.发送请求 client.indices().create(request, RequestOptions.DEFAULT); } // 销毁 @AfterEach void tearDown() throws IOException { this.client.close(); }}

测试:

去kibana客户端测试:

创建成功!

五、RestClient操作文档(重在方法理解)

1、准备实体类

准备一个对接索引库的es实体类

/* User索引库实体类*/@Data@Builder@NoArgsConstructor@AllArgsConstructorpublic class UserDoc { // 这里设计为String类型,因为在发送消息的时候是以字符的形式 @Schema(description = \"用户ID\") private String id; @Schema(description = \"用户编号\") private String quickUserId; @Schema(description = \"姓名\") private String name; @Schema(description = \"手机号\") private String phone; @Schema(description = \"关注数\") private Long follow; @Schema(description = \"粉丝数\") private Long fan; @Schema(description = \"性别 0 女 1 男\") private String sex; @Schema(description = \"头像\") private String avatar; @Schema(description = \"注册时间\") private LocalDateTime createTime; @Schema(description = \"用龄,单位:年\") private Long useTime; @Schema(description = \"收藏数\") private Long collectNumber; @Schema(description = \"评分数\") private Long markNumber; @Schema(description = \"个人简介\") private String briefIntroduction;}

这里的id用的是String类型,因为使用RestClient去根据id查,需要传过去的是字符类型的数据,所以在这里需要进行一个转变。

2、Java实现CRUD(重点)

下面讲解一下简单的crud的代码和需要注意的东西,在代码的后面会对注意的东西进行讲解。我会给出测试类全部代码,防止有些同学测试类跑不通,然后会对增删改查逐一给代码,也方便各位同学以后针对性的拿那些方法去改造自己的代码。

注意!注意!注意!===>重要的事情说三遍

代码:

package com.quick.es;import cn.hutool.core.bean.BeanUtil;import cn.hutool.json.JSONUtil;import com.quick.entity.User;import com.quick.service.UserService;import jakarta.annotation.Resource;import org.apache.http.HttpHost;import org.elasticsearch.action.delete.DeleteRequest;import org.elasticsearch.action.get.GetRequest;import org.elasticsearch.action.get.GetResponse;import org.elasticsearch.action.index.IndexRequest;import org.elasticsearch.action.update.UpdateRequest;import org.elasticsearch.client.RequestOptions;import org.elasticsearch.client.RestClient;import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.common.xcontent.XContentType;import org.junit.jupiter.api.AfterEach;import org.junit.jupiter.api.BeforeEach;import org.junit.jupiter.api.Test;import org.springframework.boot.test.context.SpringBootTest;import java.io.IOException;import java.util.HashMap;import java.util.Map;@SpringBootTest(properties = \"spring.profiles.active=dev\")public class EsDocTest { @Resource private UserService userService; private RestHighLevelClient client; // 初始化 @BeforeEach void setUp() { this.client = new RestHighLevelClient(RestClient.builder( HttpHost.create(\"http://服务器IP地址:9200\") )); } // 测试连接es @Test void testConnect() { System.out.println(\"client: \"+client); } // 测试添加文档信息 @Test void testAddDocument() throws IOException { // 1.根据id查询商品数据 User user = userService.getById(1L); System.out.println(\"user = \" + user); // 2.转换为文档类型 UserDoc userDoc = BeanUtil.copyProperties(user, UserDoc.class); System.out.println(\"userDoc = \" + userDoc); // 3.将UserDoc转json String doc = JSONUtil.toJsonStr(userDoc); // 1.准备Request对象 /*IndexRequest request = new IndexRequest(\"user\").id(String.valueOf(userDoc.getId()));*/ IndexRequest request = new IndexRequest(\"user\").id(userDoc.getId()); // 2.准备Json文档 request.source(doc, XContentType.JSON); // 3.发送请求 client.index(request, RequestOptions.DEFAULT); } // 测试删除文档 @Test void testDeleteDocument() throws IOException { // 1.准备Request,两个参数,第一个是索引库名,第二个是文档id DeleteRequest request = new DeleteRequest(\"user\", \"1\"); // 2.发送请求 client.delete(request, RequestOptions.DEFAULT); } // 测试更新文档 @Test void testUpdateDocument() throws IOException { // 1.准备Request UpdateRequest request = new UpdateRequest(\"user\", \"1\"); // 2.准备请求参数 // 方法一 /*request.doc( \"userTime\", 1, \"briefIntroduction\", \"hello world\" );*/ // 方法二 /*UserDoc userDoc=new UserDoc(); userDoc.setUseTime(1L); userDoc.setBriefIntroduction(\"hello world\"); // 构造参数 Map jsonMap = new HashMap(); jsonMap.put(\"userTime\", userDoc.getUseTime()); jsonMap.put(\"briefIntroduction\", userDoc.getBriefIntroduction()); // 将数据放入请求参数 request.doc(jsonMap);*/ // 方法三 UserDoc userDoc=new UserDoc(); //userDoc.setUseTime(1L); userDoc.setBriefIntroduction(\"hello world!\"); // 将UserDoc转json String doc = JSONUtil.toJsonStr(userDoc); // 准备Json文档,XContentType.JSON表示json格式 request.doc(doc, XContentType.JSON); // 3.发送请求 client.update(request, RequestOptions.DEFAULT); } // 测试根据id查询文档 @Test void testGetDocumentById() throws IOException { // 1.准备Request对象 GetRequest request = new GetRequest(\"user\").id(\"1\"); // 2.发送请求 GetResponse response = client.get(request, RequestOptions.DEFAULT); // 3.获取响应结果中的source String json = response.getSourceAsString(); UserDoc userDoc = JSONUtil.toBean(json, UserDoc.class); System.out.println(\"userDoc= \" + userDoc); } // 销毁 @AfterEach void tearDown() throws IOException { this.client.close(); }}

其中:

(1)增:
// 测试添加文档信息 @Test void testAddDocument() throws IOException { // 1.根据id查询商品数据 User user = userService.getById(1L); System.out.println(\"user = \" + user); // 2.转换为文档类型 UserDoc userDoc = BeanUtil.copyProperties(user, UserDoc.class); System.out.println(\"userDoc = \" + userDoc); // 3.将UserDoc转json String doc = JSONUtil.toJsonStr(userDoc); // 1.准备Request对象 /*IndexRequest request = new IndexRequest(\"user\").id(String.valueOf(userDoc.getId()));*/ IndexRequest request = new IndexRequest(\"user\").id(userDoc.getId()); // 2.准备Json文档 request.source(doc, XContentType.JSON); // 3.发送请求 client.index(request, RequestOptions.DEFAULT); }
(2)删:
// 测试删除文档 @Test void testDeleteDocument() throws IOException { // 1.准备Request,两个参数,第一个是索引库名,第二个是文档id DeleteRequest request = new DeleteRequest(\"user\", \"1\"); // 2.发送请求 client.delete(request, RequestOptions.DEFAULT); }
(3)改:
// 测试更新文档 @Test void testUpdateDocument() throws IOException { // 1.准备Request UpdateRequest request = new UpdateRequest(\"user\", \"1\"); // 2.准备请求参数 UserDoc userDoc=new UserDoc(); userDoc.setUseTime(1L); userDoc.setBriefIntroduction(\"hello world!\"); // 将UserDoc转json String doc = JSONUtil.toJsonStr(userDoc); // 准备Json文档,XContentType.JSON表示json格式 request.doc(doc, XContentType.JSON); // 3.发送请求 client.update(request, RequestOptions.DEFAULT); }
(4)查:
// 测试根据id查询文档 @Test void testGetDocumentById() throws IOException { // 1.准备Request对象 GetRequest request = new GetRequest(\"user\").id(\"1\"); // 2.发送请求 GetResponse response = client.get(request, RequestOptions.DEFAULT); // 3.获取响应结果中的source String json = response.getSourceAsString(); UserDoc userDoc = JSONUtil.toBean(json, UserDoc.class); System.out.println(\"userDoc= \" + userDoc); }
注意:

可以看到在增加和修改那边会构造请求参数,我在改那边提供了三个方法,在上面测试类的完整代码中有那三种方法,其实添加的构造请求参数的实现也是一样的,下面我来逐一讲解一下构造的实现:

官方API文档位置:Update API | Java REST Client [7.12] | Elastic

在前面有教过怎么去找到对应版本文档

 方法一:

官网的链式编程也很推荐,下面就是浓缩的修改操作:

UpdateRequest request = new UpdateRequest(\"posts\", \"1\") .doc(\"updated\", new Date(), \"reason\", \"daily update\"); client.update(request, RequestOptions.DEFAULT);

方法二:

因为根据方法一可知那个数据的格式类似 Map 这样的格式,可以通过map来构造。官网示例如下:

方法三:

官网在这里也提到,可以先构造默认Json格式,然后再换一种类型的Json

此外官网还提供了一个方法我觉得也很优雅,当然还不只这个。

XContentBuilder builder = XContentFactory.jsonBuilder();builder.startObject();{ builder.timeField(\"updated\", new Date()); builder.field(\"reason\", \"daily update\");}builder.endObject();UpdateRequest request = new UpdateRequest(\"posts\", \"1\") .doc(builder); 

3、批量导入文档:

我们需要导入我们用户表里面的数据,非常多,不可能一个一个操作,基本上是批操作,这就需要我们学会批量导入文档

我们利用BulkRequest实现这个操作。BulkRequest本身其实并没有请求参数,其本质就是将多个普通的CRUD请求组合在一起发送,利用他的add方法来实现这个过程,BulkRequest中提供了add方法,用以添加其它CRUD的请求。

能添加的请求有:

  • IndexRequest,也就是新增

  • UpdateRequest,也就是修改

  • DeleteRequest,也就是删除

在我的理解add相当于加入你的请求到那里面,然后再根据具体请求的实现来执行各样的操作

基本语法如下:

@Testvoid testBulk() throws IOException { // 1.创建Request BulkRequest request = new BulkRequest(); // 2.准备请求参数 request.add(new IndexRequest(\"items\").id(\"1\").source(\"json doc1\", XContentType.JSON)); request.add(new IndexRequest(\"items\").id(\"2\").source(\"json doc2\", XContentType.JSON)); // 3.发送请求 client.bulk(request, RequestOptions.DEFAULT);}

下面是实战,用于添加我用户表信息:

在之前那个EsDocTest测试类里面加上这么一个测试方法:

 @Test void testLoadUserDocs() throws IOException { // 分页查询商品数据 int pageNo = 1; int size = 100; while (true) { Page page = userService.lambdaQuery().page(new Page(pageNo, size)); // 非空校验 List users = page.getRecords(); if (CollUtil.isEmpty(users)) { return; } log.info(\"加载第{}页数据,共{}条\", pageNo, users.size()); // 1.创建Request BulkRequest request = new BulkRequest(\"user\"); // 2.准备参数,添加多个新增的Request for (User user : users) { // 2.1.转换为文档类型ItemDTO UserDoc userDoc = BeanUtil.copyProperties(user, UserDoc.class); // 2.2.创建新增文档的Request对象 request.add(new IndexRequest() .id(userDoc.getId()) .source(JSONUtil.toJsonStr(userDoc), XContentType.JSON)); } // 3.发送请求 client.bulk(request, RequestOptions.DEFAULT); // 翻页 pageNo++; } }

运行:

再去随便搜一个id的用户:

来到kibana:

六、JavaRestClient查询

基本步骤(重点)

查询的基本步骤是:

1. 创建 SearchRequest 对象 2. 准备 Request.source () ,也就是 DSL QueryBuilders 来构建查询条件传入 Request.source () query() 方法 3. 发送请求,得到结果 4. 解析结果(参考 JSON 结果,从外到内,逐层解析)

 @Test void testSearch() throws IOException { // 1.创建Request SearchRequest request = new SearchRequest(\"user\"); // 2.组织请求参数 request.source().query(QueryBuilders.matchAllQuery()); // 3.发送请求 SearchResponse response = client.search(request, RequestOptions.DEFAULT); // 4.解析响应 handleResponse(response); } private static void handleResponse(SearchResponse response) { SearchHits searchHits = response.getHits(); // 4.1 获取总条数 long total = 0; if (searchHits.getTotalHits() != null) { total = searchHits.getTotalHits().value; } // 4.2 获取命中的数据 SearchHit[] hits = searchHits.getHits(); List userDocList=new ArrayList(); for (SearchHit hit : hits) { // 4.2.1 获取source结果(结果是一个json对象) String json = hit.getSourceAsString(); // 4.2.2 转为实体对象 UserDoc userDoc = JSONUtil.toBean(json, UserDoc.class); userDocList.add(userDoc); } System.out.println(\"userDocList = \" + userDocList); System.out.println(\"total = \" + total); }

下面是对一些查询的讲解,这里我用学习的资料总结展示一下,如果只想实战可以参考后面实战

1、叶子查询

全文检索查询(Full Text Queries):利用分词器对用户输入搜索条件先分词,得到词条,然后再利用倒排索引搜索词条。例如:

match全文检索查询的一种,会对用户输入内容分词,然后去倒排索引库检索

// 单字段查询QueryBuilders.matchQuery(\"name\", \"脱脂牛奶\");

multi_match( 与match查询类似,只不过允许同时查询多个字段)

// 多字段查询QueryBuilders.multiMatchQuery(\"脱脂牛奶\", \"name\", \"category\");

精确查询(Term-level queries):不对用户输入搜索条件分词,根据字段内容精确值匹配。但只能查找keyword、数值、日期、boolean类型的字段。例如

term (词条查询)

equest.source().query(QueryBuilders.termQuery(\"brand\", \"华为\"));

range(范围查询

request.source().query(QueryBuilders.rangeQuery(\"price\").gte(10000).lte(30000))

2、复合查询

bool查询(基于逻辑运算组合叶子查询,实现组合条件)

// 创建布尔查询BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();// 添加filter条件boolQuery.must(QueryBuilders.termQuery(\"brand\", \"华为\"));// 添加filter条件boolQuery.filter(QueryBuilders.rangeQuery(\"price\").lte(2500));

3、排序和分页

排序:elasticsearch默认是根据相关度算分(_score)来排序,但是也支持自定义方式对搜索结果排序。不过分词字段无法排序,能参与排序字段类型有:keyword类型、数值类型、地理坐标类型、日期类型等。

分页:elasticsearch 默认情况下只返回top10的数据。而如果要查询更多数据就需要修改分页参数了。elasticsearch中通过修改fromsize参数来控制要返回的分页结果:

  • from:从第几个文档开始

  • size:总共查询几个文档

// 查询request.source().query(QueryBuilders.matchAllQuery());// 分页request.source().from(0).size(5);// 价格排序request.source().sort(\"price\", SortOrder.ASC);

4、高亮

 // 1.创建Request SearchRequest request = new SearchRequest(\"items\"); // 2.组织请求参数 // 2.1.query条件 request.source().query(QueryBuilders.matchQuery(\"name\", \"脱脂牛奶\")); // 2.2.高亮条件 request.source().highlighter( SearchSourceBuilder.highlight()  .field(\"name\")  .preTags(\"\")  .postTags(\"\") ); // 3.发送请求 SearchResponse response = client.search(request, RequestOptions.DEFAULT); // 4.解析响应 handleResponse(response);

 SearchHits searchHits = response.getHits(); // 1.获取总条数 long total = searchHits.getTotalHits().value; System.out.println(\"共搜索到\" + total + \"条数据\"); // 2.遍历结果数组 SearchHit[] hits = searchHits.getHits(); for (SearchHit hit : hits) { // 3.得到_source,也就是原始json文档 String source = hit.getSourceAsString(); // 4.反序列化 ItemDoc item = JSONUtil.toBean(source, ItemDoc.class); // 5.获取高亮结果 Map hfs = hit.getHighlightFields(); if (CollUtils.isNotEmpty(hfs)) { // 5.1.有高亮结果,获取name的高亮结果 HighlightField hf = hfs.get(\"name\"); if (hf != null) { // 5.2.获取第一个高亮结果片段,就是商品名称的高亮值 String hfName = hf.getFragments()[0].string(); item.setName(hfName); } } System.out.println(item); }

七、数据聚合

 聚合(aggregations)可以让我们极其方便的实现对数据的统计、分析、运算。

request.source().size(0); // 分页request.source().aggregation( AggregationBuilders .terms(\"brand_agg\") // 聚合名称 .field(\"brand\") // 聚合字段 .size(20)); // 聚合结果条数

 // 解析聚合结果 Aggregations aggregations = response.getAggregations(); // 根据名称获取聚合结果 Terms brandTerms = aggregations.get(\"brand_agg\"); // 获取桶 List buckets = brandTerms.getBuckets(); // 遍历 for (Terms.Bucket bucket : buckets) { // 获取key,也就是品牌信 String brandName = bucket.getKeyAsString(); System.out.println(brandName); }

八、自己封装的一个Utils

下面给一下我自己封装的一个utils,可以减少很多代码量,由于后续项目我新增或者修改了一些功能,可能Utils有一点点改变,大家可以参考进行使用:

GenericSearchResponseVO:
import io.swagger.v3.oas.annotations.media.Schema;import lombok.Builder;import lombok.Data;import java.util.List; @Data@Builderpublic class GenericSearchResponseVO { @Schema(description = \"总记录数\") private Long total; @Schema(description = \"页码\") private Integer pageNumber; @Schema(description = \"数据集合\") private List docList;}
ElasticsearchUtil:
import cn.hutool.json.JSONUtil;import com.quick.vo.GenericSearchResponseVO;import org.elasticsearch.action.search.SearchResponse;import org.elasticsearch.search.SearchHit;import org.elasticsearch.search.SearchHits;import java.util.ArrayList; import java.util.List;/** * es的工具类 */public class ElasticsearchUtil { /** * 处理ES搜索响应 * 不设置pageNumber,默认第一页 * @param response ES搜索响应对象 * @param clazz 目标文档对象的类类型 * @return 封装后的搜索响应对象 * @param  泛型,表示文档的类型,用于封装返回对应类型文档的返回结果 */ public static  GenericSearchResponseVO handleResponse(SearchResponse response, Class clazz) { // 获取搜索命中的结果 SearchHits searchHits = response.getHits(); // 初始化总命中数为0 long total = 0L; // 如果总命中数不为空,则赋值 if (searchHits.getTotalHits() != null) { total = searchHits.getTotalHits().value; } // 初始化文档列表 List docList = new ArrayList(); // 获取所有命中的文档 SearchHit[] hits = searchHits.getHits(); // 遍历所有命中的文档 for (SearchHit hit : hits) { // 获取文档的JSON字符串 String json = hit.getSourceAsString(); System.out.println(\"json = \" + json); // 将JSON字符串解析为目标类型的对象 T doc = JSONUtil.toBean(json, clazz); // 将解析后的文档对象添加到列表中 docList.add(doc); } // 构建并返回封装后的搜索响应对象 return GenericSearchResponseVO.builder() .total(total) // 设置总命中数 .pageNumber(1) // 设置当前页码 .docList(docList) // 设置文档列表 .build(); } /** * 处理ES搜索响应 * * @param response ES搜索响应对象 * @param clazz 目标文档对象的类类型 * @return 封装后的搜索响应对象 * @param  泛型,表示文档的类型,用于封装返回对应类型文档的返回结果 */ public static  GenericSearchResponseVO handleResponse(SearchResponse response, Class clazz,Integer pageNumber) { // 如果 pageNumber 为空,则默认设置为第一页 if (pageNumber == null) { pageNumber = 1; } // 获取搜索命中的结果 SearchHits searchHits = response.getHits(); // 初始化总命中数为0 long total = 0L; // 如果总命中数不为空,则赋值 if (searchHits.getTotalHits() != null) { total = searchHits.getTotalHits().value; } // 初始化文档列表 List docList = new ArrayList(); // 获取所有命中的文档 SearchHit[] hits = searchHits.getHits(); // 遍历所有命中的文档 for (SearchHit hit : hits) { // 获取文档的JSON字符串 String json = hit.getSourceAsString(); System.out.println(\"json = \" + json); // 将JSON字符串解析为目标类型的对象 T doc = JSONUtil.toBean(json, clazz); // 将解析后的文档对象添加到列表中 docList.add(doc); } // 构建并返回封装后的搜索响应对象 return GenericSearchResponseVO.builder() .total(total) // 设置总命中数 .pageNumber(pageNumber) // 设置当前页码 .docList(docList) // 设置文档列表 .build(); } /** * 计算Elasticsearch查询的from参数值 * * @param pageNumber 当前页码,默认值为1 * @param pageSize 每页显示的数据条数,默认值为10 * @return 计算后的from值 */ public static Integer calculateFrom(Integer pageNumber,Integer pageSize) { if (pageNumber < 1) { pageNumber = 1; // 确保页码至少为1 } if (pageSize < 1) { pageSize = 10; // 默认每页显示10条数据 } return (pageNumber - 1) * pageSize; } /** * 计算Elasticsearch查询的from参数值 * * @param pageNumber 当前页码,默认值为1 * 只传页码默认size为10 * @return 计算后的from值 */ public static Integer calculateFrom(Integer pageNumber) { if (pageNumber < 1) { pageNumber = 1; // 确保页码至少为1 } int pageSize = 10; // 默认每页显示10条数据 return (pageNumber - 1) * pageSize; } /** * 计算Elasticsearch查询的from参数值 * 没有传就默认第一页,起始位置是0 * @return 计算后的from值 */ public static Integer calculateFrom() { int pageNumber = 1; // 默认第一页 int pageSize = 10; // 默认每页显示10条数据 return (pageNumber - 1) * pageSize; }}

九、亿级别数据查询

欢迎大佬给出更好的优化查询意见

1、线程池配置文件类

import lombok.Data;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.stereotype.Component;/*** 

* description: 线程池配置文件类*

** @author: bluefoxyu* @date: 2025-01-16 22:41:22*/@Data@Component@ConfigurationProperties(prefix = \"quick.thread\")public class ThreadPoolConfigProperties { //核心线程数量 private Integer coreSize; //最大线程数 private Integer maxSize; //线程存活时间 private Integer keepAliveTime;}

由于这些我都配置在nacos了,所以仅供参考

2、线程池工具类

import java.util.ArrayList;import java.util.List;import java.util.concurrent.CompletableFuture;/*** 

* description: 线程池工具类*

** @author: bluefoxyu* @date: 2025-01-17 11:21:56*/public class ThreadPoolUtil { /** *

* description: 数据切割 *

* * @param dataList 数据 * @param splitSize 切割长度 * @return: java.util.List<java.util.List> * @author: bluefoxyu * @date: 2025-01-17 11:22:06 */ public static List<List> slicingData(List dataList, Integer splitSize) { int dataSize = dataList.size(); int groupSize = dataSize / splitSize; if (dataSize % splitSize != 0) { groupSize = groupSize + 1; } List<List> chunks = new ArrayList(groupSize); for (int i = 0; i < groupSize; i++) { int fromIndex = i * splitSize; int toIndex = fromIndex + splitSize; List suppliesReadData; if (i == groupSize - 1) { suppliesReadData = dataList.subList(fromIndex, dataList.size()); } else { suppliesReadData = dataList.subList(fromIndex, toIndex); } chunks.add(suppliesReadData); } return chunks; } /** *

* description: 线程等待 *

* * @param completableFutures 异步线程返回对象 * @param flagList 每一个异步线程的数据是否正确 * @return: boolean 是否验证成功,true成功,false失败 * @author: bluefoxyu * @date: 2025-01-17 11:22:25 */ public static boolean allFuturesWait(List<CompletableFuture> completableFutures, List flagList) { // 等待所有异步任务完成后,可以继续进行下一步操作 allFuturesWait(completableFutures); boolean verifyFlag = true; for (Boolean flag : flagList) { if (StringUtil.notNull(flag)) { if (!flag) { verifyFlag = false; } } } return verifyFlag; } /** *

* description: 线程等待 *

* * @param completableFutures 异步线程返回对象 * @return: void * @author: bluefoxyu * @date: 2025-01-17 11:22:50 */ public static void allFuturesWait(List<CompletableFuture> completableFutures) { // 等待所有异步任务完成后,可以继续进行下一步操作 CompletableFuture allFutures = CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])); allFutures.join(); }}

3、线程池配置注入bean

参考下面的配置

import cn.hutool.core.thread.NamedThreadFactory;import com.quick.properties.ThreadPoolConfigProperties;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.concurrent.LinkedBlockingDeque;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;/*** 

* description: 线程池配置类*

** @author: bluefoxyu* @date: 2025-01-16 22:43:54*/@Configurationpublic class ThreadConfig { /** *

* description: 默认线程池 *

* * @param threadPoolConfigProperties 线程池配置文件类 * @return: java.util.concurrent.ThreadPoolExecutor 线程池执行对象 * @author: bluefoxyu * @date: 2025-01-16 22:44:38 */ @Bean public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties threadPoolConfigProperties) { return new ThreadPoolExecutor(threadPoolConfigProperties.getCoreSize(), threadPoolConfigProperties.getMaxSize(), threadPoolConfigProperties.getKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingDeque(10000), new NamedThreadFactory(\"default\", true), new ThreadPoolExecutor.AbortPolicy()); } /** *

* description: 删除店铺线程池 *

* * @param threadPoolConfigProperties 线程池配置文件类 * @return: java.util.concurrent.ThreadPoolExecutor 线程池执行对象 * @author: bluefoxyu * @date: 2025-01-16 22:52:37 */ @Bean public ThreadPoolExecutor deleteStoreThreadPoolExecutor(ThreadPoolConfigProperties threadPoolConfigProperties) { return new ThreadPoolExecutor(threadPoolConfigProperties.getCoreSize(), threadPoolConfigProperties.getMaxSize(), threadPoolConfigProperties.getKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingDeque(10000), new NamedThreadFactory(\"deleteStore\", true), new ThreadPoolExecutor.AbortPolicy()); }}

4、游标查询代码

private final ThreadPoolExecutor threadPoolExecutor;
/** * 

* description: 根据工作时间进行范围查询数据 *

* * @param month 月份 * @return: 通用响应结果 R */@GetMapping(\"/test/getDataByWorkDate\")public Map testGetDataByWorkDate(@RequestParam String month) { // 获取起始和结束时间 LocalDateTime startDate = getStartAndEndTimeFromParseStringMonth(month).get(\"start\"); LocalDateTime endDate = getStartAndEndTimeFromParseStringMonth(month).get(\"end\"); // 格式化日期为 Elasticsearch 的日期格式 DateTimeFormatter formatter = DateTimeFormatter.ofPattern(\"yyyy-MM-dd HH:mm:ss\"); List dataList = new ArrayList(); // 构建查询条件 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query( QueryBuilders.boolQuery() .must(QueryBuilders.termQuery(\"statusCode\", 1)) // 状态码 .must(QueryBuilders.rangeQuery(\"hourField\").gt(0.00)) // 小时 .must(QueryBuilders.boolQuery() .should(QueryBuilders.termsQuery(\"status\", Arrays.asList(1, 2, 3))) // 状态 .should(QueryBuilders.termQuery(\"flagField\", \"1\"))) // 标志字段 .must(QueryBuilders.rangeQuery(\"dateField\") // 日期 .gte(startDate.format(formatter)) .lte(endDate.format(formatter))) ); sourceBuilder.size(1000); // 每次查询最多指定条数数据 // 构建初始查询请求 SearchRequest searchRequest = new SearchRequest(\"index_name_placeholder\"); // 替换索引名称 searchRequest.source(sourceBuilder); searchRequest.scroll(TimeValue.timeValueMinutes(1)); // 设置滚动时间 String scrollId = null; try { SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); // 打印 DSL 语句和总命中数量 log.info(\"ES查询DSL语句:\\nGET {}\\n{}\", String.format(\"/%s/_search\", searchRequest.indices()[0]), searchRequest.source()); log.info(\"命中总数量:{}\", searchResponse.getHits().getTotalHits()); scrollId = searchResponse.getScrollId(); SearchHit[] searchHits = searchResponse.getHits().getHits(); while (searchHits != null && searchHits.length > 0) { // 异步处理查询结果 List<CompletableFuture> futures = new ArrayList(); for (SearchHit hit : searchHits) { CompletableFuture future = CompletableFuture.runAsync(() -> { EntityClass entity = JSONUtil.toBean(hit.getSourceAsString(), EntityClass.class); synchronized (dataList) { dataList.add(entity); } }, threadPoolExecutor); // 替换线程池名称 futures.add(future); } // 等待所有任务完成 CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); // 获取下一页 SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId); scrollRequest.scroll(TimeValue.timeValueMinutes(1)); searchResponse = restHighLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT); scrollId = searchResponse.getScrollId(); searchHits = searchResponse.getHits().getHits(); } } catch (Exception e) { log.error(\"查询失败\", e); throw new RuntimeException(\"查询失败\", e); } finally { // 清除滚动上下文 if (scrollId != null) { try { ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); clearScrollRequest.addScrollId(scrollId); restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT); } catch (Exception e) { log.warn(\"清除滚动上下文失败\", e); } } } Map result = new HashMap(); result.put(\"size\", dataList.size()); result.put(\"dataList\", dataList); return result ;}