> 技术文档 > Dify Python调用API_dify api调用

Dify Python调用API_dify api调用


前期准备

在调用API之前得先拿到API KEY,具体操作如下:

文件上传

如果你的工作流的参数涉及到文件上传以及图片上传的(音频、视频也类似),需要先调用upload方法拿到file_id,具体代码如下:

import osimport requestsimport logging# 假设 logger_info 已定义好logger_info = logging.getLogger(__name__)class YourUploaderClass: def __init__(self, base_url, your_api_key): self.base_url = base_url self.api_key = your_api_key #随意一条工作流的API_Key def upload_file(self, file_path): \"\"\" 文件上传,支持文档和图片 :param file_path: 本地文件路径 :return: 返回上传成功后的文件 ID,失败返回 None \"\"\" headers = { \"Authorization\": f\"Bearer {self.api_key}\" } # 获取文件扩展名并确定 MIME 类型 file_extension = os.path.splitext(file_path)[1].lower() # 定义文档类型的 MIME 映射 document_mime_map = { \'.txt\': \'text/plain\', \'.md\': \'text/markdown\', \'.markdown\': \'text/markdown\', \'.pdf\': \'application/pdf\', \'.html\': \'text/html\', \'.xlsx\': \'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet\', \'.xls\': \'application/vnd.ms-excel\', \'.docx\': \'application/vnd.openxmlformats-officedocument.wordprocessingml.document\', \'.csv\': \'text/csv\', \'.eml\': \'message/rfc822\', \'.msg\': \'application/vnd.ms-outlook\', \'.pptx\': \'application/vnd.openxmlformats-officedocument.presentationml.presentation\', \'.ppt\': \'application/vnd.ms-powerpoint\', \'.xml\': \'application/xml\', \'.epub\': \'application/epub+zip\' } # 定义图片类型的扩展名映射到对应的 MIME 类型 image_ext_to_mime = { \'.png\': \'image/png\', \'.jpg\': \'image/jpeg\', \'.jpeg\': \'image/jpeg\', \'.webp\': \'image/webp\', \'.gif\': \'image/gif\' } # 判断是文档还是图片 mime_type = document_mime_map.get(file_extension) if not mime_type: mime_type = image_ext_to_mime.get(file_extension) # 如果文件类型不支持,返回 None if mime_type is None: return None current_file_path = os.path.abspath(__file__) dir = os.path.dirname(os.path.dirname(os.path.dirname(current_file_path))) path = os.path.join(dir, file_path) try: with open(path, \"rb\") as f: files = {  \"file\": (os.path.basename(file_path), f, mime_type) } data = {  \"user\": \"abc_user\" } upload_url = f\"{self.base_url}/files/upload\" response = requests.post(upload_url, headers=headers, files=files, data=data) logger_info.info(f\"dify upload_file 请求结果:{response.json()}\") if response.status_code == 201:  result = response.json()  logger_info.info(f\"dify upload_file 成功:{result}\")  return result.get(\"id\") else:  return None except requests.exceptions.RequestException as e: logging.error(f\"dify upload_file 请求出错:{e}\") return None except Exception as e: logging.error(f\"上传文件时发生异常:{e}\") return None

流式输出 streaming

基于 SSE(Server-Sent Events)实现类似打字机输出方式的流式返回。注意,以下代码展示的是chatflow模式,所以参数inputs为空,如果是工作流,入参放在inputs即可,形式是:

\"inputs\": { \"outline\": outline}

 def user_intent(self, question, upload_file_id=\"\"): \"\"\" 第一步:用户意图分析 :param question: :param upload_file_id: :return: \"\"\" headers = { \"Authorization\": f\"Bearer {self.intent_api_key}\", \"Content-Type\": \"application/json\" } data = { \"inputs\": {}, \"query\": question, \"response_mode\": \"streaming\", # 根据需求选择 streaming 或 blocking \"user\": \"abc_user\" # 与上传文件时的用户标识一致 } if upload_file_id: data[\"inputs\"][\"file\"] = [{ \"type\": \"document\", # 根据实际文件类型修改 \"transfer_method\": \"local_file\", \"upload_file_id\": upload_file_id }] workflow_url = f\"{self.base_url}/chat-messages\" try: response = requests.post(workflow_url, headers=headers, json=data, stream=True) if response.status_code == 200: for line in response.iter_lines():  if line: # 过滤掉keep-alive新行 # 解码为字符串 line_str = line.decode(\'utf-8\') # 确保是有效的 JSON 行 if line_str.startswith(\"data:\"): json_str = line_str[5:].strip() # 去掉前缀并清理空格 if json_str: json_data = json.loads(json_str) #这里不一定是message,具体你得打印json_data你才知道 if json_data.get(\"event\") == \"message\": yield json_data logger_info.info(f\"user_intent 请求结果:{response.json()}\") return response.json() else: logger.error(f\"user_intent 请求出错:{response.json()}\") return {} except Exception as e: logger.error(f\"user_intent 请求出错:{e}\") return {}

阻塞模式 blocking

阻塞模式,等待执行完毕后返回结果。(请求若流程较长可能会被中断)。 由于 Cloudflare 限制,请求会在 100 秒超时无返回后中断。注意:以下代码是chatflow模式,不是工作流,但是代码是一样的,只不过取值不一样

 def judge_steps(self, question): \"\"\" 判断步骤 :param question: :return: \"\"\" headers = { \"Authorization\": f\"Bearer {self.judge_steps_api_key}\", \"Content-Type\": \"application/json\" } data = { \"inputs\": {}, \"query\": question, \"response_mode\": \"blocking\", # 根据需求选择 streaming 或 blocking \"user\": \"abc_user\" # 与上传文件时的用户标识一致 } #注意,我这里用的是CHATFLOW,不是工作流 workflow_url = f\"{self.base_url}/chat-messages\" try: response = requests.post(workflow_url, headers=headers, json=data) if response.status_code == 200: #由于不是工作流,所以取出来的不一定是[\"answer\"],方法都是一样的,到时候自己打印response.json()出来就知道了 answer = response.json()[\"answer\"] if answer:  logger_info.info(f\"dify judge_steps 请求结果:{response.json()}\")  return answer else:  return \"\" else: logging.error(f\"judge_steps 请求出错:{response.json()}\") return 0 except Exception as e: print(e) logging.error(f\"judge_steps 请求出错:{e}\") return 0