> 技术文档 > Elasticsearch 常用任务管理命令及实战应用

Elasticsearch 常用任务管理命令及实战应用


常用任务管理命令

  • 列出所有任务
curl -X GET \"http://:/_tasks?detailed=true&pretty\" -H \'Content-Type: application/json\'
  • 获取特定类型的任务
curl -X GET \"http://:/_tasks?actions=\" -H \'Content-Type: application/json\'
  • 列出所有查询任务
curl -X GET \"http://:/_tasks?detailed=true&actions=*search\" -H \'Content-Type: application/json\'
  • 取消所有查询任务
    如果 es 查询因大任务而卡住,可以临时采取此措施
curl -X POST \"http://:/_tasks/_cancel?actions=*search\" -H \'Content-Type: application/json\'
curl -X GET \"http://:/_tasks/\" -H \'Content-Type: application/json\'
  • 取消特定任务
curl -X POST \"http://:/_tasks/_cancel?task_id=\" -H \'Content-Type: application/json\'
  • 获取特定节点上的任务
curl -X GET \"http://:/_tasks?nodes=\" -H \'Content-Type: application/json\'

实战

定时检测 Elasticsearch 后台运行的查询任务,如果任务运行时间超过 59 秒,则进行企业微信群告警通知

import requestsimport time# Elasticsearch节点的URLes_url = \"http://:@:/_tasks?detailed=true\"# 获取任务信息response = requests.get(es_url)tasks_data = response.json()# 遍历节点和任务for node_id, node_info in tasks_data.get(\'nodes\', {}).items(): for task_id, task_info in node_info.get(\'tasks\', {}).items(): running_time_seconds = task_info.get(\'running_time_in_nanos\', 0) / 1e9 description = task_info.get(\'description\', \'\') if running_time_seconds > 59 and description: running_time_formatted = f\"{running_time_seconds:.2f}\" # 准备单个任务的Markdown内容 content = ( f\"# 有大任务在 Elasticsearch 上运行\\n\" f\"- **任务 ID**: {task_id}\\n\" f\" **查询语句**: {description}\\n\" f\" **运行时间**: {running_time_formatted} seconds\\n\" ) # 发送到Webhook QYWX_BODY = { \"msgtype\": \"markdown\", \"markdown\": {  \"content\": content } } BOT_KEY = \"xxxxxxxxxxxxxxxxx\" # 企业微信群 bot key webhook_url = f\"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={BOT_KEY}\" headers = {\'Content-Type\': \'application/json; charset=utf-8\'} response = requests.post(webhook_url, json=QYWX_BODY, headers=headers) # 检查响应状态 if response.status_code == 200: print(f\"Notification for Task ID {task_id} sent successfully.\") else: print(f\"Failed to send notification for Task ID {task_id}. Status code: {response.status_code}, Response: {response.text}\") # 等待 2 秒 time.sleep(2)print(\"Processing completed.\")