> 技术文档 > 智能AI医疗物资/耗材管理系统升级改造方案分析

智能AI医疗物资/耗材管理系统升级改造方案分析

基于AI技术的智能物资管理系统为各级医疗机构(包括三甲医院、社区诊所、药房等)提供了一套完整的数字化管理解决方案。系统通过物联网传感器实时监控库存状态,结合机器学习算法分析历史消耗数据、季节性因素和突发公共卫生事件影响,可提前90天预测物资需求波动,使库存周转率提升40%以上。

系统具备以下核心功能:

  1. 智能预警机制:当一次性医用口罩、防护服等关键物资低于安全库存阈值时,自动触发采购流程,并通过多供应商比价模块推荐最优采购方案
  2. 效期管理系统:采用\"先进先出\"原则,对近效期药品和耗材进行分级预警,避免物资过期造成的浪费
  3. 应急调配平台:在突发公共卫生事件期间,可实时显示区域内各医疗机构的物资储备情况,支持一键发起跨机构调拨

应用场景示例:

  • 某三甲医院在2025年流感季前,系统根据门诊量增长趋势预测到输液器需求将增加35%,提前完成备货避免了供应中断
  • 某社区卫生服务中心通过效期管理功能,将过期药品损耗率从8%降至2%以下

系统功能说明

这个智能AI医疗物资管理系统包含以下核心功能:

1. 库存管理

  • 实时监控所有医疗物资库存状态
  • 分类显示库存状态(紧急、不足、正常、充足)
  • 过期状态管理(已过期、即将过期、正常)
  • 详细的库存数据表格展示

2. 使用分析

  • 按科室统计物资使用量
  • 物资使用量排名(TOP 10)
  • 物资使用趋势分析(按周)

3. AI预测与采购建议

  • 基于机器学习(随机森林)的需求预测
  • 14天使用量预测
  • 智能采购建议(根据库存和预测)
  • 库存可维持天数计算

4. 预警系统

  • 侧边栏实时显示过期物资警告
  • 库存不足物资预警
  • 过期状态分类显示

5. 系统设置

  • 库存阈值配置
  • 过期预警天数设置
  • 数据导入/导出功能
  • 系统信息查看

技术特点

  1. 用户界面:使用Streamlit构建直观的Web界面
  2. 数据可视化:使用Plotly创建交互式图表
  3. 机器学习:使用Scikit-learn的随机森林算法进行需求预测
  4. 数据管理:模拟生成医疗物资数据,支持数据导入导出
  5. 预警系统:实时监控库存和过期状态并提供警告

使用方法

  1. 安装依赖库:
pip install streamlit pandas numpy plotly scikit-learn
  1. 运行应用:
streamlit run medical_inventory_system.py
  1. 在浏览器中访问应用(默认地址:http://localhost:8501)

系统会自动生成模拟数据,您可以通过侧边栏筛选器查看不同类别的物资,并在各标签页中查看库存状态、使用分析和预测结果。

这个系统为医疗机构的物资管理提供了智能化解决方案,帮助管理人员优化库存、减少浪费、确保关键医疗物资的供应。

import streamlit as stimport pandas as pdimport numpy as npimport plotly.express as pximport plotly.graph_objects as gofrom datetime import datetime, timedeltafrom sklearn.ensemble import RandomForestRegressorfrom sklearn.model_selection import train_test_splitfrom sklearn.metrics import mean_absolute_errorimport randomimport hashlibimport base64# 设置页面st.set_page_config( page_title=\"智能AI医疗物资管理系统\", page_icon=\"🏥\", layout=\"wide\", initial_sidebar_state=\"expanded\")# 初始化会话状态if \'authenticated\' not in st.session_state: st.session_state.authenticated = Falseif \'username\' not in st.session_state: st.session_state.username = None# 模拟用户数据库users = { \"admin\": hashlib.sha256(\"admin123\".encode()).hexdigest(), \"doctor\": hashlib.sha256(\"doctor123\".encode()).hexdigest(), \"nurse\": hashlib.sha256(\"nurse123\".encode()).hexdigest(), \"pharmacist\": hashlib.sha256(\"pharmacist123\".encode()).hexdigest()}# 用户权限permissions = { \"admin\": [\"view\", \"edit\", \"manage\", \"settings\"], \"doctor\": [\"view\", \"request\"], \"nurse\": [\"view\", \"request\"], \"pharmacist\": [\"view\", \"edit\", \"manage\"]}# 登录页面def login_page(): st.title(\"🏥 智能AI医疗物资管理系统\") st.subheader(\"用户登录\") username = st.text_input(\"用户名\") password = st.text_input(\"密码\", type=\"password\") if st.button(\"登录\"): if username in users and users[username] == hashlib.sha256(password.encode()).hexdigest(): st.session_state.authenticated = True st.session_state.username = username st.success(\"登录成功!正在进入系统...\") st.experimental_rerun() else: st.error(\"用户名或密码错误\")# 主应用def main_app(): # 标题和说明 st.title(f\"🏥 智能AI医疗物资/耗材管理系统\") st.markdown(f\"欢迎回来,{st.session_state.username}!\") st.markdown(\"\"\" 

系统功能:

  • 实时库存监控与预警
  • 物资过期自动提醒
  • AI驱动的需求预测
  • 智能采购建议
  • 物资使用分析与可视化
  • 物资出入库管理
\"\"\"
, unsafe_allow_html=True) # 模拟数据生成 @st.cache_data def generate_data(): # 医疗物资类别 categories = [ \'防护用品\', \'注射器械\', \'消毒用品\', \'手术器械\', \'诊断试剂\', \'医用敷料\', \'药品\', \'一次性用品\' ] # 具体物资名称 items = { \'防护用品\': [\'N95口罩\', \'防护服\', \'护目镜\', \'手套\', \'面屏\'], \'注射器械\': [\'注射器\', \'输液器\', \'针头\', \'输液袋\', \'输液管\'], \'消毒用品\': [\'酒精\', \'碘伏\', \'手消液\', \'消毒湿巾\', \'84消毒液\'], \'手术器械\': [\'手术刀\', \'镊子\', \'缝合针\', \'止血钳\', \'持针器\'], \'诊断试剂\': [\'血糖试纸\', \'新冠检测试剂\', \'血常规试剂\', \'尿检试纸\', \'PCR试剂\'], \'医用敷料\': [\'纱布\', \'绷带\', \'棉签\', \'创可贴\', \'医用胶带\'], \'药品\': [\'阿司匹林\', \'胰岛素\', \'抗生素\', \'止痛药\', \'降压药\'], \'一次性用品\': [\'医用帽\', \'鞋套\', \'床单\', \'尿杯\', \'采血管\'] } # 生成物资数据 medical_items = [] for cat in categories: for item in items[cat]: medical_items.append({ \'物资ID\': f\"{cat[:2]}{random.randint(1000,9999)}\", \'物资名称\': item, \'类别\': cat, \'单位\': random.choice([\'个\', \'盒\', \'瓶\', \'包\', \'箱\']), \'安全库存\': random.randint(20, 100), \'当前库存\': random.randint(0, 200), \'最近进货日期\': (datetime.now() - timedelta(days=random.randint(1, 60))).strftime(\'%Y-%m-%d\'), \'过期日期\': (datetime.now() + timedelta(days=random.randint(30, 730))).strftime(\'%Y-%m-%d\'), \'供应商\': random.choice([\'康泰医疗\', \'强生医疗\', \'美敦力\', \'3M医疗\', \'国药集团\', \'稳健医疗\']), \'单价\': round(random.uniform(1, 100), 2), \'总价值\': 0 # 初始化为0,后面计算 }) # 生成使用记录 usage_records = [] for _ in range(500): item = random.choice(medical_items) usage_date = datetime.now() - timedelta(days=random.randint(1, 90)) usage_records.append({ \'日期\': usage_date.strftime(\'%Y-%m-%d\'), \'物资ID\': item[\'物资ID\'], \'物资名称\': item[\'物资名称\'], \'类别\': item[\'类别\'], \'使用量\': random.randint(1, 20), \'使用科室\': random.choice([\'急诊科\', \'内科\', \'外科\', \'儿科\', \'妇产科\', \'ICU\', \'手术室\']), \'操作人\': random.choice(list(users.keys())) }) # 生成出入库记录 transaction_records = [] for item in medical_items: # 初始入库记录 initial_quantity = random.randint(50, 150) transaction_records.append({ \'日期\': (datetime.now() - timedelta(days=random.randint(30, 90))).strftime(\'%Y-%m-%d\'), \'物资ID\': item[\'物资ID\'], \'物资名称\': item[\'物资名称\'], \'类别\': item[\'类别\'], \'操作类型\': \'入库\', \'数量\': initial_quantity, \'单价\': item[\'单价\'], \'总金额\': initial_quantity * item[\'单价\'], \'供应商\': item[\'供应商\'], \'操作人\': \'system\', \'备注\': \'系统初始化\' }) # 随机添加一些出入库记录 for _ in range(random.randint(1, 10)): is_in = random.random() > 0.5 quantity = random.randint(1, 30) transaction_date = (datetime.now() - timedelta(days=random.randint(1, 30))).strftime(\'%Y-%m-%d\') transaction_records.append({ \'日期\': transaction_date, \'物资ID\': item[\'物资ID\'], \'物资名称\': item[\'物资名称\'], \'类别\': item[\'类别\'], \'操作类型\': \'入库\' if is_in else \'出库\', \'数量\': quantity, \'单价\': item[\'单价\'] if is_in else 0, # 出库不需要单价 \'总金额\': quantity * item[\'单价\'] if is_in else 0, \'供应商\': item[\'供应商\'] if is_in else \'\', \'操作人\': random.choice(list(users.keys())), \'备注\': \'采购入库\' if is_in else \'科室领用\' }) # 计算总价值 for item in medical_items: item[\'总价值\'] = item[\'当前库存\'] * item[\'单价\'] return pd.DataFrame(medical_items), pd.DataFrame(usage_records), pd.DataFrame(transaction_records) # 加载数据 inventory_df, usage_df, transaction_df = generate_data() # 添加库存状态列 def get_inventory_status(row): if row[\'当前库存\'] <= row[\'安全库存\'] * 0.3: return \'紧急\' elif row[\'当前库存\'] <= row[\'安全库存\']: return \'不足\' elif row[\'当前库存\'] <= row[\'安全库存\'] * 1.5: return \'正常\' else: return \'充足\' inventory_df[\'库存状态\'] = inventory_df.apply(get_inventory_status, axis=1) # 添加过期状态 def get_expiry_status(row): expiry_date = datetime.strptime(row[\'过期日期\'], \'%Y-%m-%d\') days_to_expire = (expiry_date - datetime.now()).days if days_to_expire < 0: return \'已过期\' elif days_to_expire < 30: return \'即将过期\' else: return \'正常\' inventory_df[\'过期状态\'] = inventory_df.apply(get_expiry_status, axis=1) # 侧边栏 - 筛选器 st.sidebar.header(\"🔍 筛选选项\") category_filter = st.sidebar.multiselect( \"选择物资类别\", options=inventory_df[\'类别\'].unique(), default=inventory_df[\'类别\'].unique() ) status_filter = st.sidebar.multiselect( \"选择库存状态\", options=inventory_df[\'库存状态\'].unique(), default=inventory_df[\'库存状态\'].unique() ) expiry_filter = st.sidebar.multiselect( \"选择过期状态\", options=inventory_df[\'过期状态\'].unique(), default=inventory_df[\'过期状态\'].unique() ) # 应用筛选 filtered_df = inventory_df[ (inventory_df[\'类别\'].isin(category_filter)) & (inventory_df[\'库存状态\'].isin(status_filter)) & (inventory_df[\'过期状态\'].isin(expiry_filter)) ] # 主界面布局 tabs = [\"📦 库存概览\", \"📊 使用分析\", \"🔮 智能预测\", \"📈 物资管理\", \"⚙️ 系统设置\"] # 根据用户权限显示不同的标签页 if st.session_state.username in permissions: user_permissions = permissions[st.session_state.username] if \"manage\" not in user_permissions: tabs.remove(\"📈 物资管理\") if \"settings\" not in user_permissions: tabs.remove(\"⚙️ 系统设置\") tab1, tab2, tab3, *other_tabs = st.tabs(tabs) with tab1: # 库存概览 st.subheader(\"医疗物资库存概览\") # KPI 指标 col1, col2, col3, col4 = st.columns(4) col1.metric(\"物资总数\", len(filtered_df)) col2.metric(\"库存不足物资\", len(filtered_df[filtered_df[\'库存状态\'] == \'不足\'])) col3.metric(\"库存紧急物资\", len(filtered_df[filtered_df[\'库存状态\'] == \'紧急\'])) col4.metric(\"即将过期物资\", len(filtered_df[filtered_df[\'过期状态\'] == \'即将过期\'])) # 库存价值 total_value = filtered_df[\'总价值\'].sum() st.metric(\"库存总价值\", f\"¥{total_value:.2f}\") # 库存状态分布 st.subheader(\"库存状态分布\") fig1 = px.pie( filtered_df, names=\'库存状态\', color=\'库存状态\', color_discrete_map={ \'紧急\': \'red\', \'不足\': \'orange\', \'正常\': \'green\', \'充足\': \'blue\' } ) st.plotly_chart(fig1, use_container_width=True) # 过期状态分布 st.subheader(\"过期状态分布\") fig2 = px.pie( filtered_df, names=\'过期状态\', color=\'过期状态\', color_discrete_map={ \'已过期\': \'red\', \'即将过期\': \'orange\', \'正常\': \'green\' } ) st.plotly_chart(fig2, use_container_width=True) # 库存明细表 st.subheader(\"库存明细\") st.dataframe(filtered_df[[\'物资ID\', \'物资名称\', \'类别\', \'单位\', \'安全库存\', \'当前库存\', \'库存状态\', \'过期日期\', \'过期状态\', \'供应商\', \'单价\', \'总价值\']], height=400, column_config={ \"当前库存\": st.column_config.ProgressColumn( \"当前库存\", help=\"当前库存水平\", format=\"%f\", min_value=0, max_value=200, ), \"总价值\": st.column_config.NumberColumn( \"总价值\", format=\"¥%f\", ) }) # 导出数据按钮 if st.button(\"导出库存数据\"): csv = filtered_df.to_csv(sep=\'\\t\', na_rep=\'nan\') b64 = base64.b64encode(csv.encode()).decode() href = f\'<a href=\"data:file/csv;base64,{b64}\" download=\"inventory_data.csv\">点击下载数据\' st.markdown(href, unsafe_allow_html=True) with tab2: # 使用分析 st.subheader(\"物资使用分析\") # 按类别使用量 st.subheader(\"各科室物资使用量\") usage_by_dept = usage_df.groupby(\'使用科室\')[\'使用量\'].sum().reset_index() fig3 = px.bar( usage_by_dept, x=\'使用科室\', y=\'使用量\', color=\'使用科室\' ) st.plotly_chart(fig3, use_container_width=True) # 按物资使用量 st.subheader(\"物资使用量TOP 10\") usage_by_item = usage_df.groupby(\'物资名称\')[\'使用量\'].sum().reset_index().sort_values(\'使用量\', ascending=False).head(10) fig4 = px.bar( usage_by_item, x=\'物资名称\', y=\'使用量\', color=\'物资名称\' ) st.plotly_chart(fig4, use_container_width=True) # 使用趋势分析 st.subheader(\"物资使用趋势\") usage_df[\'日期\'] = pd.to_datetime(usage_df[\'日期\']) usage_df[\'周\'] = usage_df[\'日期\'].dt.isocalendar().week usage_trend = usage_df.groupby([\'周\', \'类别\'])[\'使用量\'].sum().reset_index() fig5 = px.line( usage_trend, x=\'周\', y=\'使用量\', color=\'类别\', title=\"每周物资使用趋势\" ) st.plotly_chart(fig5, use_container_width=True) # 各科室使用物资类别分布 st.subheader(\"各科室使用物资类别分布\") dept_category = usage_df.groupby([\'使用科室\', \'类别\'])[\'使用量\'].sum().reset_index() fig7 = px.bar( dept_category, x=\'使用科室\', y=\'使用量\', color=\'类别\', barmode=\'stack\' ) st.plotly_chart(fig7, use_container_width=True) with tab3: # 智能预测 st.subheader(\"AI驱动的需求预测\") # 选择预测物资 selected_item = st.selectbox(\"选择要预测的物资\", inventory_df[\'物资名称\'].unique()) # 获取选定物资的数据 item_data = usage_df[usage_df[\'物资名称\'] == selected_item].copy() item_data[\'日期\'] = pd.to_datetime(item_data[\'日期\']) if len(item_data) > 10: # 准备数据 item_data = item_data.groupby(\'日期\')[\'使用量\'].sum().reset_index() item_data = item_data.set_index(\'日期\').asfreq(\'D\').fillna(0).reset_index() # 添加时间特征 item_data[\'day_of_week\'] = item_data[\'日期\'].dt.dayofweek item_data[\'day_of_month\'] = item_data[\'日期\'].dt.day item_data[\'month\'] = item_data[\'日期\'].dt.month item_data[\'is_weekend\'] = item_data[\'day_of_week\'].isin([5, 6]).astype(int) # 添加假期特征(简化版) holidays = [ # 这里可以添加更多假期日期 pd.to_datetime(\'2025-01-01\'), # 元旦 pd.to_datetime(\'2025-02-10\'), # 春节 pd.to_datetime(\'2025-04-05\'), # 清明节 pd.to_datetime(\'2025-05-01\'), # 劳动节 pd.to_datetime(\'2025-06-07\'), # 端午节 pd.to_datetime(\'2025-09-15\'), # 中秋节 pd.to_datetime(\'2025-10-01\'), # 国庆节 ] item_data[\'is_holiday\'] = item_data[\'日期\'].isin(holidays).astype(int) # 添加滞后特征 for i in range(1, 8): # 添加前7天的使用量作为特征 item_data[f\'lag_{i}\'] = item_data[\'使用量\'].shift(i) item_data = item_data.dropna() # 删除包含NaN的行 # 分割数据 X = item_data[[\'day_of_week\', \'day_of_month\', \'month\', \'is_weekend\', \'is_holiday\'] + [f\'lag_{i}\' for i in range(1, 8)]] y = item_data[\'使用量\'] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False) # 训练模型 model = RandomForestRegressor(n_estimators=100, random_state=42) model.fit(X_train, y_train) # 预测 y_pred = model.predict(X_test) # 评估 mae = mean_absolute_error(y_test, y_pred) # 创建预测日期范围 last_date = item_data[\'日期\'].max() future_dates = pd.date_range(start=last_date + timedelta(days=1), periods=30) # 预测未来30天 # 准备未来数据 future_data = pd.DataFrame({ \'日期\': future_dates, \'day_of_week\': future_dates.dayofweek, \'day_of_month\': future_dates.day, \'month\': future_dates.month, \'is_weekend\': future_dates.dayofweek.isin([5, 6]).astype(int), \'is_holiday\': future_dates.isin(holidays).astype(int) }) # 为了预测未来值,我们需要前7天的实际/预测值 # 这里我们使用最后7天的实际值作为初始值 last_7_days = item_data[\'使用量\'].tail(7).values # 逐步预测未来30天的值 future_pred = [] for i in range(len(future_dates)): # 创建当前预测所需的特征 features = [ future_data.iloc[i][\'day_of_week\'], future_data.iloc[i][\'day_of_month\'], future_data.iloc[i][\'month\'], future_data.iloc[i][\'is_weekend\'], future_data.iloc[i][\'is_holiday\'] ] features.extend(last_7_days) # 预测当前天的值 pred = model.predict([features])[0] future_pred.append(pred) # 更新最后7天的值(将最新预测添加到末尾,并移除最旧的值) last_7_days = np.append(last_7_days[1:], pred) # 创建结果DataFrame test_dates = item_data[\'日期\'].iloc[-len(y_test):] results = pd.DataFrame({ \'日期\': test_dates, \'实际使用量\': y_test, \'预测使用量\': y_pred }) future_results = pd.DataFrame({ \'日期\': future_dates, \'预测使用量\': future_pred }) # 显示评估结果 st.metric(\"预测平均绝对误差\", f\"{mae:.2f}\") # 显示预测图表 fig6 = go.Figure() fig6.add_trace(go.Scatter( x=results[\'日期\'], y=results[\'实际使用量\'], mode=\'lines+markers\', name=\'实际使用量\' )) fig6.add_trace(go.Scatter( x=results[\'日期\'], y=results[\'预测使用量\'], mode=\'lines+markers\', name=\'预测使用量\' )) fig6.add_trace(go.Scatter( x=future_results[\'日期\'], y=future_results[\'预测使用量\'], mode=\'lines+markers\', name=\'未来预测\', line=dict(dash=\'dash\') )) fig6.update_layout( title=f\"{selected_item} 使用量预测\", xaxis_title=\"日期\", yaxis_title=\"使用量\", legend_title=\"图例\" ) st.plotly_chart(fig6, use_container_width=True) # 计算平均每日使用量 avg_daily_usage = item_data[\'使用量\'].mean() # 获取当前库存 current_stock = inventory_df.loc[inventory_df[\'物资名称\'] == selected_item, \'当前库存\'].values[0] # 计算库存可维持天数 if avg_daily_usage > 0: days_left = current_stock / avg_daily_usage else: days_left = float(\'inf\') # 生成采购建议 st.subheader(\"智能采购建议\") if days_left < 7: st.error(f\"⚠️ 紧急采购建议:{selected_item}库存仅能维持{days_left:.1f}天,请立即采购!\") elif days_left < 14: st.warning(f\"⚠️ 采购建议:{selected_item}库存仅能维持{days_left:.1f}天,建议尽快采购\") else: st.success(f\"✅ {selected_item}库存充足,可维持{days_left:.1f}天\") # 计算建议采购量 safety_stock = inventory_df.loc[inventory_df[\'物资名称\'] == selected_item, \'安全库存\'].values[0] # 基于未来30天的预测使用量计算建议采购量 total_predicted_usage = future_results[\'预测使用量\'].sum() suggested_order = max(0, total_predicted_usage * 1.2 + safety_stock - current_stock) # 增加20%的缓冲 st.info(f\"建议采购量:{suggested_order:.0f} {inventory_df.loc[inventory_df[\'物资名称\'] == selected_item, \'单位\'].values[0]}\") # 预测图表 - 月度汇总 future_results[\'month\'] = future_results[\'日期\'].dt.month_name() monthly_prediction = future_results.groupby(\'month\')[\'预测使用量\'].sum().reset_index() fig8 = px.bar( monthly_prediction, x=\'month\', y=\'预测使用量\', title=f\"{selected_item} 月度使用量预测\" ) st.plotly_chart(fig8, use_container_width=True) else: st.warning(\"该物资使用数据不足,无法进行预测\") if \"📈 物资管理\" in tabs: with other_tabs[0]: # 物资管理 st.subheader(\"物资管理\") # 物资操作选项卡 manage_tabs = st.tabs([\"入库管理\", \"出库管理\", \"出入库记录\", \"物资信息管理\"]) with manage_tabs[0]: # 入库管理 st.header(\"物资入库\") # 选择要入库的物资 入库_物资 = st.selectbox(\"选择物资\", inventory_df[\'物资名称\'].unique()) # 获取物资信息 物资_info = inventory_df[inventory_df[\'物资名称\'] == 入库_物资].iloc[0] # 显示当前库存 st.write(f\"当前库存: {物资_info[\'当前库存\']} {物资_info[\'单位\']}\") # 入库数量 入库数量 = st.number_input(\"入库数量\", min_value=1, value=10) # 入库日期 入库日期 = st.date_input(\"入库日期\", datetime.now()).strftime(\'%Y-%m-%d\') # 供应商 供应商 = st.selectbox(\"供应商\", inventory_df[\'供应商\'].unique()) # 单价 单价 = st.number_input(\"单价\", min_value=0.01, value=物资_info[\'单价\'], step=0.01) # 备注 备注 = st.text_input(\"备注\", \"采购入库\") # 入库按钮 if st.button(\"确认入库\"): # 更新库存 inventory_df.loc[inventory_df[\'物资名称\'] == 入库_物资, \'当前库存\'] += 入库数量 inventory_df.loc[inventory_df[\'物资名称\'] == 入库_物资, \'最近进货日期\'] = 入库日期 inventory_df.loc[inventory_df[\'物资名称\'] == 入库_物资, \'单价\'] = 单价 inventory_df.loc[inventory_df[\'物资名称\'] == 入库_物资, \'总价值\'] = inventory_df.loc[inventory_df[\'物资名称\'] == 入库_物资, \'当前库存\'] * 单价 # 添加入库记录 new_transaction = pd.DataFrame({ \'日期\': [入库日期], \'物资ID\': [物资_info[\'物资ID\']], \'物资名称\': [入库_物资], \'类别\': [物资_info[\'类别\']], \'操作类型\': [\'入库\'], \'数量\': [入库数量], \'单价\': [单价], \'总金额\': [入库数量 * 单价], \'供应商\': [供应商], \'操作人\': [st.session_state.username], \'备注\': [备注] }) transaction_df = pd.concat([transaction_df, new_transaction], ignore_index=True) st.success(f\"成功入库 {入库数量} {物资_info[\'单位\']} {入库_物资}\") st.balloons() with manage_tabs[1]: # 出库管理 st.header(\"物资出库\") # 选择要出库的物资 出库_物资 = st.selectbox(\"选择物资\", inventory_df[\'物资名称\'].unique()) # 获取物资信息 物资_info = inventory_df[inventory_df[\'物资名称\'] == 出库_物资].iloc[0] # 显示当前库存 st.write(f\"当前库存: {物资_info[\'当前库存\']} {物资_info[\'单位\']}\") # 出库数量 出库数量 = st.number_input(\"出库数量\", min_value=1, value=1, max_value=int(物资_info[\'当前库存\'])) # 出库日期 出库日期 = st.date_input(\"出库日期\", datetime.now()).strftime(\'%Y-%m-%d\') # 使用科室 使用科室 = st.selectbox(\"使用科室\", usage_df[\'使用科室\'].unique()) # 备注 备注 = st.text_input(\"备注\", \"科室领用\") # 出库按钮 if st.button(\"确认出库\"): # 更新库存 inventory_df.loc[inventory_df[\'物资名称\'] == 出库_物资, \'当前库存\'] -= 出库数量 inventory_df.loc[inventory_df[\'物资名称\'] == 出库_物资, \'总价值\'] = inventory_df.loc[inventory_df[\'物资名称\'] == 出库_物资, \'当前库存\'] * 物资_info[\'单价\'] # 添加出库记录 new_transaction = pd.DataFrame({ \'日期\': [出库日期], \'物资ID\': [物资_info[\'物资ID\']], \'物资名称\': [出库_物资], \'类别\': [物资_info[\'类别\']], \'操作类型\': [\'出库\'], \'数量\': [出库数量], \'单价\': [0], # 出库不需要单价 \'总金额\': [0], \'供应商\': [\'\'], \'操作人\': [st.session_state.username], \'备注\': [备注] }) transaction_df = pd.concat([transaction_df, new_transaction], ignore_index=True) # 添加使用记录 new_usage = pd.DataFrame({ \'日期\': [出库日期], \'物资ID\': [物资_info[\'物资ID\']], \'物资名称\': [出库_物资], \'类别\': [物资_info[\'类别\']], \'使用量\': [出库数量], \'使用科室\': [使用科室], \'操作人\': [st.session_state.username] }) usage_df = pd.concat([usage_df, new_usage], ignore_index=True) st.success(f\"成功出库 {出库数量} {物资_info[\'单位\']} {出库_物资}\") st.balloons() with manage_tabs[2]: # 出入库记录 st.header(\"出入库记录\") # 筛选选项 col1, col2 = st.columns(2) with col1: 记录类型 = st.selectbox(\"记录类型\", [\"全部\", \"入库\", \"出库\"]) with col2: 日期范围 = st.date_input(\"日期范围\", [datetime.now() - timedelta(days=30), datetime.now()]) # 应用筛选 filtered_transactions = transaction_df.copy() if 记录类型 != \"全部\": filtered_transactions = filtered_transactions[filtered_transactions[\'操作类型\'] == 记录类型] if len(日期范围) == 2: start_date = 日期范围[0].strftime(\'%Y-%m-%d\') end_date = 日期范围[1].strftime(\'%Y-%m-%d\') filtered_transactions = filtered_transactions[(filtered_transactions[\'日期\'] >= start_date) & (filtered_transactions[\'日期\'] <= end_date)] # 显示记录 st.dataframe(filtered_transactions, height=500) # 导出记录 if st.button(\"导出记录\"): csv = filtered_transactions.to_csv(sep=\'\\t\', na_rep=\'nan\') b64 = base64.b64encode(csv.encode()).decode() href = f\'<a href=\"data:file/csv;base64,{b64}\" download=\"transaction_records.csv\">点击下载记录\' st.markdown(href, unsafe_allow_html=True) with manage_tabs[3]: # 物资信息管理 st.header(\"物资信息管理\") # 选择要管理的物资 管理_物资 = st.selectbox(\"选择物资\", inventory_df[\'物资名称\'].unique()) # 获取物资信息 物资_info = inventory_df[inventory_df[\'物资名称\'] == 管理_物资].iloc[0] # 显示当前信息 st.subheader(\"当前信息\") col1, col2 = st.columns(2) with col1: st.write(f\"**物资ID**: {物资_info[\'物资ID\']}\") st.write(f\"**类别**: {物资_info[\'类别\']}\") st.write(f\"**单位**: {物资_info[\'单位\']}\") st.write(f\"**安全库存**: {物资_info[\'安全库存\']}\") with col2: st.write(f\"**当前库存**: {物资_info[\'当前库存\']}\") st.write(f\"**最近进货日期**: {物资_info[\'最近进货日期\']}\") st.write(f\"**过期日期**: {物资_info[\'过期日期\']}\") st.write(f\"**供应商**: {物资_info[\'供应商\']}\") # 修改信息 st.subheader(\"修改信息\") # 安全库存 新安全库存 = st.number_input(\"安全库存\", min_value=0, value=int(物资_info[\'安全库存\'])) # 供应商 新供应商 = st.selectbox(\"供应商\", inventory_df[\'供应商\'].unique(), index=list(inventory_df[\'供应商\'].unique()).index(物资_info[\'供应商\'])) # 过期日期 新过期日期 = st.date_input(\"过期日期\", datetime.strptime(物资_info[\'过期日期\'], \'%Y-%m-%d\')).strftime(\'%Y-%m-%d\') # 单价 新单价 = st.number_input(\"单价\", min_value=0.01, value=float(物资_info[\'单价\']), step=0.01) # 备注 修改备注 = st.text_input(\"修改备注\", \"系统更新\") # 修改按钮 if st.button(\"确认修改\"): # 更新物资信息 inventory_df.loc[inventory_df[\'物资名称\'] == 管理_物资, \'安全库存\'] = 新安全库存 inventory_df.loc[inventory_df[\'物资名称\'] == 管理_物资, \'供应商\'] = 新供应商 inventory_df.loc[inventory_df[\'物资名称\'] == 管理_物资, \'过期日期\'] = 新过期日期 inventory_df.loc[inventory_df[\'物资名称\'] == 管理_物资, \'单价\'] = 新单价 inventory_df.loc[inventory_df[\'物资名称\'] == 管理_物资, \'总价值\'] = inventory_df.loc[inventory_df[\'物资名称\'] == 管理_物资, \'当前库存\'] * 新单价 # 更新库存状态 inventory_df[\'库存状态\'] = inventory_df.apply(get_inventory_status, axis=1) inventory_df[\'过期状态\'] = inventory_df.apply(get_expiry_status, axis=1) st.success(f\"成功更新 {管理_物资} 的信息\") st.balloons() if \"⚙️ 系统设置\" in tabs: with other_tabs[-1]: # 系统设置 st.subheader(\"系统配置\") with st.expander(\"库存阈值设置\"): st.write(\"设置库存预警阈值\") # 获取当前设置 current_low_threshold = 70 current_critical_threshold = 30 low_threshold = st.slider(\"库存不足阈值(%)\", 30, 100, current_low_threshold) critical_threshold = st.slider(\"库存紧急阈值(%)\", 0, 50, current_critical_threshold) if st.button(\"保存库存阈值设置\"): # 这里可以添加保存设置的逻辑 st.success(f\"已更新库存阈值设置:不足({low_threshold}%),紧急({critical_threshold}%)\") with st.expander(\"过期预警设置\"): st.write(\"设置过期预警天数\") # 获取当前设置 current_expiry_warning = 30 expiry_warning = st.slider(\"即将过期预警(天)\", 1, 90, current_expiry_warning) if st.button(\"保存过期预警设置\"): # 这里可以添加保存设置的逻辑 st.success(f\"已更新过期预警设置:{expiry_warning}天\") with st.expander(\"数据管理\"): st.write(\"导入/导出数据\") col1, col2 = st.columns(2) with col1: uploaded_file = st.file_uploader(\"上传库存数据(CSV格式)\") if uploaded_file is not None: try: new_inventory = pd.read_csv(uploaded_file) # 这里可以添加数据合并的逻辑 st.success(\"文件上传成功!数据已合并\") except Exception as e: st.error(f\"文件上传失败:{str(e)}\") with col2: st.write(\"导出数据\") if st.button(\"导出库存数据\"): csv = inventory_df.to_csv(sep=\'\\t\', na_rep=\'nan\') b64 = base64.b64encode(csv.encode()).decode() href = f\'<a href=\"data:file/csv;base64,{b64}\" download=\"inventory_data.csv\">点击下载库存数据\' st.markdown(href, unsafe_allow_html=True) if st.button(\"导出使用记录\"): csv = usage_df.to_csv(sep=\'\\t\', na_rep=\'nan\') b64 = base64.b64encode(csv.encode()).decode() href = f\'<a href=\"data:file/csv;base64,{b64}\" download=\"usage_records.csv\">点击下载使用记录\' st.markdown(href, unsafe_allow_html=True) with st.expander(\"用户管理\"): st.write(\"添加/管理用户\") # 仅管理员可以管理用户 if st.session_state.username == \"admin\": new_username = st.text_input(\"新用户名\") new_password = st.text_input(\"新密码\", type=\"password\") new_role = st.selectbox(\"用户角色\", [\"doctor\", \"nurse\", \"pharmacist\", \"admin\"]) if st.button(\"添加用户\"): if new_username and new_password: # 这里可以添加用户添加的逻辑 st.success(f\"成功添加用户:{new_username},角色:{new_role}\") else: st.error(\"用户名和密码不能为空\") # 显示当前用户列表 st.write(\"当前用户列表\") user_list = pd.DataFrame({ \'用户名\': list(users.keys()), \'角色\': [permissions.get(user, [\"无权限\"])[0] for user in users.keys()] }) st.dataframe(user_list) with st.expander(\"系统信息\"): st.write(\"**版本信息**: v4.3.2\") st.write(\"**最后\") st.write(\"**最后更新**: 2025-07-30\") st.write(\"**开发者**: 医疗AI团队\") st.write(\"**技术支持**: tech-support@medai.com\") # 过期物资警告 expiring_items = inventory_df[inventory_df[\'过期状态\'].isin([\'已过期\', \'即将过期\'])] if not expiring_items.empty: st.sidebar.warning(\"⚠️ 有过期风险物资\") for _, row in expiring_items.iterrows(): days_left = (datetime.strptime(row[\'过期日期\'], \'%Y-%m-%d\') - datetime.now()).days status = \"已过期\" if days_left < 0 else f\"还剩{days_left}天\" st.sidebar.error(f\"{row[\'物资名称\']} - {status}\") # 库存不足警告 low_stock_items = inventory_df[inventory_df[\'库存状态\'].isin([\'不足\', \'紧急\'])] if not low_stock_items.empty: st.sidebar.warning(\"⚠️ 有库存不足物资\") for _, row in low_stock_items.iterrows(): st.sidebar.error(f\"{row[\'物资名称\']} - 当前库存: {row[\'当前库存\']} (安全库存: {row[\'安全库存\']})\") # 登出按钮 if st.sidebar.button(\"登出\"): st.session_state.authenticated = False st.session_state.username = None st.success(\"已成功登出\") st.experimental_rerun() # 页脚 st.markdown(\"---\") st.caption(\"© 2025 智能医疗物资管理系统 | 基于AI的医疗物资管理解决方案\")

运行应用

if name == “main”:
if st.session_state.authenticated:
main_app()
else:
login_page()

“增量升级”方案:

────────────────────

附录一、实时库存同步(对接 HIS/ERP)

────────────────────

  1. 目标
    让系统不再依赖“模拟数据”,而是实时读取 HIS/ERP 的出入库流水,并自动更新库存。

  2. 关键思路
    • 在 generate_data() 之前增加一个“数据源选择”开关:

    • “模拟数据”(默认)
    • “HIS/ERP 接口”
      • 新增 fetch_realtime_inventory()fetch_realtime_usage() 两个函数,利用 RESTful API 或数据库直连拉取数据。
      • 用 Streamlit 的 st.cache_data(ttl=300) 把结果缓存 5 分钟,既减轻源系统压力,又保证页面流畅。
  3. 代码片段

def fetch_realtime_inventory(): # 示例:调用 REST 接口 r = requests.get( st.secrets[\"his\"][\"base_url\"] + \"/api/v1/inventory\", headers={\"Authorization\": f\"Bearer {st.secrets[\'his\'][\'token\']}\"} ) r.raise_for_status() return pd.DataFrame(r.json())def fetch_realtime_usage(): # 示例:直连 SQL Server conn = pymssql.connect( server=st.secrets[\"erp\"][\"host\"], user=st.secrets[\"erp\"][\"user\"], password=st.secrets[\"erp\"][\"pwd\"], database=\"ERP\" ) sql = \"\"\" SELECT CONVERT(date, CreateTime) AS 日期,  ItemID AS 物资ID,  ItemName AS 物资名称,  Category AS 类别,  Quantity AS 使用量,  Dept  AS 使用科室 FROM t_Outbound WHERE CreateTime >= DATEADD(day, -90, GETDATE()) \"\"\" return pd.read_sql(sql, conn)
  1. 落地注意
    • 用 st.secrets 保存接口密钥,避免硬编码。
    • 若源系统无“过期日期”字段,可在本地维护一张映射表(物资ID→过期日期),再与实时库存 merge
    • 建议把“同步日志”写入侧边栏,异常时给出提示,方便运维。

────────────────────

附录二、移动端 PDA 扫码盘点

────────────────────

  1. 目标
    护士/库管员用手机或 PDA 扫条码即可快速盘点并回写库存。

  2. 关键思路
    • 用 Streamlit 的“camera_input”组件 + pyzbar/opencv 解析二维码/条形码。
    • 扫码后弹窗显示当前库存,可直接输入“盘点数量”,点击“确认”即回写后台(支持写回 HIS 或本地 SQLite)。
    • 盘点过程离线缓存盘点单,网络恢复后批量提交。

  3. 代码片段

st.subheader(\"PDA 扫码盘点\")img_file = st.camera_input(\"请对准物资条码\")if img_file: img = Image.open(img_file) barcodes = decode(img) if barcodes: code = barcodes[0].data.decode(\"utf-8\") item = inventory_df[inventory_df[\"物资ID\"] == code] if not item.empty: st.write(\"当前库存:\", item.iloc[0][\"当前库存\"]) new_qty = st.number_input(\"盘点后库存\", min_value=0, value=int(item.iloc[0][\"当前库存\"])) if st.button(\"确认更新\"): update_inventory(code, new_qty) # 调用接口或写本地 st.success(\"库存已更新\") else: st.error(\"未找到该物资\")
  1. 落地注意
    • 手机端部署:可用 Streamlit Cloud + 手机浏览器,或打包成 PWA。
    • 条码标准:若院内用 GS1-128,则解析时注意分隔符。
    • 并发冲突:盘点时给记录加乐观锁(版本号或时间戳字段)。

────────────────────

附录三、AI 预测再升级:加入“节假日/疫情”外生特征

────────────────────

  1. 目标
    让 AI 预测更准,尤其在春节、疫情爆发等特殊时期。

  2. 关键思路
    • 收集外部特征:

    • 中国法定节假日(可用 ChineseCalendar 库)
    • 本地新冠新增病例(卫健委公开接口)
    • 气温、湿度(气象局 API)
      • 把上述特征拼接到训练集,再训练 RandomForest 或 LightGBM。
      • 预测时,未来 14 天的外部特征可简单用“最近 7 天均值”填充,或调用同样接口拉取。
  3. 代码片段

from chinese_calendar import is_holidaydef add_external_features(df): df[\"是否节假日\"] = df[\"日期\"].apply(lambda x: int(is_holiday(x))) # 例:获取新增病例 df[\"新增病例\"] = fetch_covid_cases(df[\"日期\"]) return dfitem_data = add_external_features(item_data)X = item_data[[\"day_of_week\", \"是否节假日\", \"新增病例\", \"气温\"]]

动画片大全