Elasticsearch Ruby 客户端elasticsearch / elasticsearch-api
一、三个库各干什么
elasticsearch
:对外暴露的 Ruby 客户端,包装了传输层与 API;一般项目只装它即可。(rubydoc.info)elasticsearch-api
:把 ES REST API 封装为 Ruby 方法(如client.search
/client.index
等),不直接提供 HTTP 连接能力。(rubydoc.info)elastic-transport
:HTTP/连接池/重试等底层传输层,被elasticsearch
客户端复用。(GitHub)
二、安装与版本要求
# 推荐:安装对外客户端(已内置 API 与 Transport)gem install elasticsearch# 仅安装 API(通常不需要单独装)gem install elasticsearch-api
- 官方“Getting started | Ruby”要求 Ruby 3.0+ 或 JRuby 9.3+,示例也以
gem install elasticsearch
为主。(Elastic) elasticsearch-api
也可从 GitHub 主仓构建安装(适合追踪未发布改动)。(Elastic)
三、本地、Elastic Cloud(cloud_id + api_key)
本地默认:
require \'elasticsearch\'client = Elasticsearch::Client.new # => http://localhost:9200
Elastic Cloud:(推荐在生产使用)
client = Elasticsearch::Client.new( cloud_id: \'\', api_key: \'\')
Cloud 连接、API Key 生成与最小 CRUD 示例,官方入门文档有完整演示。(Elastic)
四、建索引、写文档、查、改、删
# 建索引client.indices.create(index: \'my_index\')# 写文档(自动生成 _id)res = client.index(index: \'my_index\', body: { title: \'Test\' })doc_id = res[\'_id\']# 读client.get(index: \'my_index\', id: doc_id)# 改(局部更新)client.update(index: \'my_index\', id: doc_id, body: { doc: { tags: %w[ruby es] } })# 删文档 / 删索引client.delete(index: \'my_index\', id: doc_id)client.indices.delete(index: \'my_index\')
client.index
/client.search
等方法由elasticsearch-api
提供,外层的elasticsearch
负责传输与连接。(rubydoc.info)
五、Bool/聚合、_source 过滤、排序
query = { query: { bool: { must: [{ match: { title: \'test\' } }], filter: [{ term: { status: \'published\' } }] } }, _source: %w[id title tags], # 只取必要字段,降低带宽 sort: [{ created_at: \'desc\' }] # 稳定排序}resp = client.search(index: \'my_index\', body: query)hits = resp.dig(\'hits\', \'hits\')
六、深分页与全量遍历 PIT + search_after(推荐)
Scroll API 不再推荐用于深分页;官方建议用 PIT(Point-in-Time)+ search_after 获得一致性与高性能。(rubydoc.info)
PIT + search_after 模板:
# 1) 打开 PITpit_id = client.open_point_in_time(index: \'my_index\', body: nil, params: { keep_alive: \'1m\' })[\'id\']# 2) 首次查询(注意:使用 pit,不再在 body 里指定 index)body = { size: 1000, sort: [{ created_at: \'asc\' }, { _shard_doc: \'asc\' }], # 使用稳定且唯一的排序键组合 pit: { id: pit_id, keep_alive: \'1m\' }}resp = client.search(body: body)# 3) 迭代分页loop do hits = resp.dig(\'hits\', \'hits\') break if hits.empty? # 处理 hits ... last_sort = hits.last[\'sort\'] resp = client.search(body: body.merge(search_after: last_sort))end# 4) 用完可显式关闭 PIT(也可等待 keep_alive 自动过期)client.close_point_in_time(body: { id: pit_id })
- PIT 背后语义、为何与
search_after
组合更一致,详见官方 API 文档。(Elastic)
经验小结:
keep_alive
只需覆盖到下一次请求即可,不必囤很久;- 排序字段应全局唯一且稳定(常见做法是业务字段 +
_shard_doc
)。(rubydoc.info)
七、Bulk 的安全与高吞吐实践
ops = []1000.times do |i| ops << { index: { _index: \'my_index\', _id: i } } ops << { title: \"Doc #{i}\", created_at: Time.now.utc.iso8601 }endresp = client.bulk(body: ops)raise \"bulk errors\" if resp[\'errors\']
建议:
- 控制单批大小(例如 5–15MB 或 1k–5k 条);
- 捕获并拆分“部分失败”;
- 合理设置
request_timeout
与重试(见第 9 节)。
八、响应对象与数据访问
elasticsearch
返回的是 Elasticsearch::API::Response
包装对象,既可当 Hash 访问(resp[\'hits\']
),也可读 status
/headers
等底层信息(来自 transport 层)。(rubydoc.info)
如需“面向对象式”访问,可选 Hashie::Mash
包装:
require \'hashie\'m = Hashie::Mash.new(resp)m.hits.hits.first._source.title
九、错误处理、超时与重试、日志追踪
client = Elasticsearch::Client.new( request_timeout: 30, # 单请求超时 retry_on_failure: 3, # 传输层重试 logger: Logger.new($stdout) # 调试阶段打开,生产可接入更完善日志)begin client.search(index: \'not_exist\', body: { query: { match_all: {} } })rescue => e # e 是 transport 层异常,包含 status/response warn \"ES error: #{e.class}: #{e.message}\"end
也可替换不同 HTTP 适配器(Net::HTTP、Faraday 等),Elastic 官方对多种 Ruby HTTP 库的对接有示例与对比。(Elastic)
十、使用 elasticsearch-api
自定义客户端:perform_request
契约
当你不想用官方 elasticsearch
客户端,而是将 API 混入自己的类时,你的类必须实现:
perform_request(method, path, params, body, headers = nil)
,返回具备status
/body
/headers
方法的对象。(rubydoc.info, GitHub)
最小 Faraday 示例:
require \'multi_json\'require \'faraday\'require \'elasticsearch/api\'class MySimpleClient include Elasticsearch::API CONN = Faraday.new(url: \'http://localhost:9200\') def perform_request(method, path, params, body, headers = nil) puts \"--> #{method.upcase} #{path} #{params} #{body} #{headers}\" resp = CONN.run_request( method.downcase.to_sym, build_url(path, params), body ? MultiJson.dump(body) : nil, { \'Content-Type\' => \'application/json\' }.merge(headers || {}) ) # Faraday::Response 已有 status/headers/body resp end private def build_url(path, params) return path if params.nil? || params.empty? q = params.is_a?(String) ? params : URI.encode_www_form(params) \"#{path}?#{q}\" endendc = MySimpleClient.newputs c.cluster.health.body # => 原始 JSON 字符串
十一、自定义 JSON 序列化、Jbuilder 构建查询
elasticsearch-api
默认通过 MultiJson 进行序列化,你也可以替换为自定义 JSON 库(需实现 load/dump
接口);亦可用 Jbuilder 生成复杂查询的字符串后传入 body
。(rubydoc.info)
# 自定义序列化器(示例)Elasticsearch::API.settings[:serializer] = JrJackson::Json# Jbuilder 构建查询require \'jbuilder\'query = Jbuilder.encode do |json| json.query do json.match do json.title do json.query \'test 1\' json.operator \'and\' end end endendclient.search(index: \'my_index\', body: query)
十二、性能与工程化建议
- HTTP 适配器选择:在吞吐敏感场景,评估 Net::HTTP(默认)、
http.rb
、Faraday 适配器等差异。(Elastic) - 连接与重试:合理配置
request_timeout
、retry_on_failure
,并在高并发下监控连接池与队列。 - 字段与映射:使用
_source
过滤与合适的stored_fields
,避免大文档传输。 - 查询稳定性:深分页改用 PIT + search_after;只在必要时使用 Scroll(导出型任务)。(rubydoc.info)
十三、常见坑与排查
- 认证失败:Elastic Cloud 请使用
cloud_id
+api_key
;自建集群确认basic
/bearer
配置与证书链。(Elastic) - 深分页性能差:不要用
from/size
拉大页,改为 PIT + search_after。(Elastic) - PIT 使用报错:确保查询体不再指定
index
、合理设置keep_alive
,且每次用最新 PIT id 继续翻页。(Elastic) - Scroll 一直占用资源:Scroll 适合短期导出,长期分页会占系统资源;能用 PIT 的尽量用 PIT。(rubydoc.info)
十四、速查表(Cheat Sheet)
# 连接client = Elasticsearch::Client.new # 本地client = Elasticsearch::Client.new(cloud_id: \'...\', api_key: \'...\') # 云端# CRUDclient.indices.create(index: \'idx\')res = client.index(index: \'idx\', body: {title: \'A\'})client.get(index: \'idx\', id: res[\'_id\'])client.update(index: \'idx\', id: res[\'_id\'], body: {doc: {title: \'B\'}})client.delete(index: \'idx\', id: res[\'_id\'])client.indices.delete(index: \'idx\')# 搜索client.search(index: \'idx\', body: {query: {match: {title: \'A\'}}})# Bulkclient.bulk(body: [{ index: { _index: \'idx\', _id: 1 } }, {title: \'A\'}])# PIT + search_after(伪代码骨架)pit = client.open_point_in_time(index: \'idx\', params: { keep_alive: \'1m\' })[\'id\']resp = client.search(body: { size: 1000, sort: [{ts: \'asc\'}, {_shard_doc: \'asc\'}], pit: { id: pit, keep_alive: \'1m\' } })while (hits = resp.dig(\'hits\',\'hits\')).any? # ... resp = client.search(body: { ... , search_after: hits.last[\'sort\'] })endclient.close_point_in_time(body: { id: pit })
十五、参考与延伸阅读
- 官方 Ruby 客户端总览与入门(安装、连接、CRUD 示例)。(Elastic)
elasticsearch-api
说明(API 方法集;不含传输层/客户端)。(Elastic, rubydoc.info)elasticsearch
封装与组件关系(API + Transport)。(rubydoc.info)- 传输层
elastic-transport
项目主页(适配器/连接层)。(GitHub) - 深分页:Scroll 不再推荐;使用 PIT + search_after。(rubydoc.info, Elastic)
- Ruby HTTP 生态与传输选型示例对比。(Elastic)