backend-data-processor
Process vehicle insurance Excel data using Pandas - file handling, data cleaning, merging, validation. Use when processing Excel/CSV files, handling data imports, implementing business rules (negative premiums, zero commissions), debugging data pipelines, or optimizing Pandas performance. Keywords: data_processor.py, Excel, CSV, Pandas, merge, deduplication, date normalization.
When & Why to Use This Skill
This Claude skill provides a comprehensive backend implementation guide for automating the processing of vehicle insurance data using Python and Pandas. It streamlines the entire data lifecycle—from file scanning and ingestion to advanced cleaning, merging, and deduplication—ensuring high data integrity and optimized performance for large-scale Excel and CSV datasets.
Use Cases
- Automating the ingestion and standardization of monthly insurance policy reports from multiple Excel sources into a centralized system.
- Cleaning and normalizing inconsistent data formats, such as varying date strings and numeric values with thousand separators, for reliable downstream analysis.
- Implementing complex business logic for data validation, including handling negative premiums for cancellations and zero-commission scenarios.
- Optimizing data pipelines to handle large-scale spreadsheets efficiently using Pandas vectorization and memory management techniques.
- Building robust incremental update systems that deduplicate records based on unique identifiers like policy numbers and timestamps.
| name | backend-data-processor |
|---|---|
| description | Process vehicle insurance Excel data using Pandas - file handling, data cleaning, merging, validation. Use when processing Excel/CSV files, handling data imports, implementing business rules (negative premiums, zero commissions), debugging data pipelines, or optimizing Pandas performance. Keywords: data_processor.py, Excel, CSV, Pandas, merge, deduplication, date normalization. |
| allowed-tools | Read, Edit, Grep, Glob |
Backend Data Processor - 车险数据处理后端实现指南
版本: v1.0 最后更新: 2025-11-08 适用场景: Excel 数据处理、数据清洗、文件合并、数据验证
📌 Skill 概述
本 Skill 提供车险签单数据处理后端的完整实现指南,作为 analyzing-auto-insurance-data Skill 的后端补充。涵盖数据流转、核心处理逻辑、业务规则实现、性能优化和错误处理。
核心价值:
- ✅ 数据流转架构: Excel → CSV → 合并 → 存储全流程
- ✅ 核心处理函数: 4个关键方法详解(文件处理/清洗/合并/批量扫描)
- ✅ 业务规则实现: 负保费/零手续费/日期标准化/缺失值填充
- ✅ 性能优化技巧: Pandas 最佳实践、内存管理、大文件处理
- ✅ 错误处理与日志: 异常捕获机制、日志标准、用户友好提示
关键文件位置:
- backend/data_processor.py - 数据处理核心
- backend/api_server.py - Flask API 服务
📊 一、数据流转架构
1.1 整体流程图
┌─────────────────────────────────────────────────────────────────┐
│ Excel 文件上传到 data/ 目录 │
└────────────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ scan_and_process_new_files() │
│ - 扫描 data/*.xlsx, data/*.xls │
│ - 批量处理所有新文件 │
└────────────────────────────┬────────────────────────────────────┘
│
▼
┌───────────────────────────────────────┐
│ process_new_excel(excel_path) │
│ - 读取 Excel 文件 (pd.read_excel) │
│ - 返回原始 DataFrame │
└───────────────┬───────────────────────┘
│
▼
┌───────────────────────────────────────┐
│ _clean_data(df) │
│ - 删除空行 │
│ - 日期格式标准化 │
│ - 数值类型转换 │
│ - 缺失值填充为空字符串 │
└───────────────┬───────────────────────┘
│
▼
┌───────────────────────────────────────┐
│ merge_with_existing(new_df) │
│ - 读取现有 CSV (如存在) │
│ - pd.concat() 合并新旧数据 │
│ - 根据保单号+投保确认时间去重 │
└───────────────┬───────────────────────┘
│
▼
┌───────────────────────────────────────┐
│ save_merged_data(df) │
│ - 保存为 车险清单_2025年10-11月_合并.csv │
│ - 编码: utf-8-sig (支持Excel打开) │
└───────────────┬───────────────────────┘
│
▼
┌───────────────────────────────────────┐
│ 移动到 data/processed/ 目录 │
│ - 重命名: 原文件名_processed_时间戳.xlsx │
│ - 避免重复处理 │
└───────────────────────────────────────┘
1.2 文件命名与版本管理
输入文件命名规范:
- 格式:
data/*.xlsx或data/*.xls - 示例:
车险清单_2025年10月.xlsx
处理后文件命名:
- 格式:
data/processed/{原文件名}_processed_{YYYYMMDD_HHMMSS}.xlsx - 示例:
data/processed/车险清单_2025年10月_processed_20251108_143025.xlsx
合并文件命名:
- 固定文件名:
车险清单_2025年10-11月_合并.csv - 位置: 项目根目录
- 编码:
utf-8-sig(带 BOM,Excel 可直接打开)
1.3 增量更新策略
去重逻辑 (data_processor.py:172-177):
# 根据保单号和投保确认时间去重,保留最新记录
merged_df = merged_df.drop_duplicates(
subset=['保单号', '投保确认时间'],
keep='last'
)
为什么选择这两个字段:
保单号: 业务唯一标识投保确认时间: 同一保单可能有批改记录,需要保留时间维度
keep='last' 的原因:
- 批改操作会更新保单信息,最新记录更准确
- 合并时新数据在后,
keep='last'优先保留新数据
🔧 二、核心处理函数详解
2.1 process_new_excel(excel_path) - Excel 文件处理
功能: 读取 Excel 文件并返回清洗后的 DataFrame
实现细节:
def process_new_excel(self, excel_path):
print(f"正在处理Excel文件: {excel_path}")
# 读取 Excel 文件
df = pd.read_excel(excel_path)
print(f" 读取成功: {len(df)} 行, {len(df.columns)} 列")
# 数据清洗
df = self._clean_data(df)
return df
注意事项:
- ❌ 不要使用
pd.read_excel(excel_path, dtype=str)- 会导致数值计算失败 - ✅ 推荐: 让 Pandas 自动推断类型,后续在
_clean_data()中转换 - ✅ 异常处理: 调用方
scan_and_process_new_files()已处理异常
常见问题:
- Excel 文件损坏: 会抛出
xlrd.biffh.XLRDError - 文件被占用: Windows 下可能抛出
PermissionError - 内存不足: 大文件(>100MB)可能触发
MemoryError
2.2 _clean_data(df) - 数据清洗
功能: 标准化数据格式,确保符合 CSV 规范
清洗步骤:
步骤 1: 删除完全为空的行
df = df.dropna(how='all')
- 移除 Excel 中的空白行
- 避免后续计算时出现 NaN 值污染
步骤 2: 日期格式标准化
date_columns = ['刷新时间', '投保确认时间', '保险起期']
for col in date_columns:
if col in df.columns:
df[col] = pd.to_datetime(df[col], errors='coerce')
errors='coerce': 无法解析的日期转为NaT(Not a Time)- 支持多种日期格式:
2025-11-08,2025/11/08,20251108等
常见日期问题:
- Excel 日期序列号:
44500→ 自动转换为2021-10-15 - 文本日期:
"2025年11月8日"→errors='coerce'会转为NaT - 空值: 保留为
NaT,后续填充为空字符串
步骤 3: 数值类型转换
numeric_columns = ['签单/批改保费', '签单数量', '手续费', '手续费含税', '增值税']
for col in numeric_columns:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
errors='coerce': 无法解析的数值转为NaN- 自动处理:
"1,234.56"→1234.56(移除千分位逗号)
常见数值问题:
- 负保费:
-5000.00(退保/批改) → 保留负数,不做修改 - 零手续费:
0.00→ 合法值,不做修改 - 文本数值:
"五万"→ 转为NaN
步骤 4: 缺失值填充
df = df.fillna('')
- 将所有
NaN/NaT填充为空字符串'' - 避免 JSON 序列化时出现
null值 - 前端可统一判断
value === ''来识别缺失
⚠️ 重要: 日期列在此步骤会被转为空字符串,需要在后续查询时重新转换:
df['投保确认时间'] = pd.to_datetime(df['投保确认时间'], errors='coerce')
2.3 merge_with_existing(new_df) - 数据合并
功能: 将新数据与现有 CSV 合并,去重并统计
实现细节:
def merge_with_existing(self, new_df):
if self.merged_csv.exists():
print(f"读取现有数据: {self.merged_csv}")
existing_df = pd.read_csv(self.merged_csv, encoding='utf-8-sig')
# 合并数据
merged_df = pd.concat([existing_df, new_df], ignore_index=True)
# 去重 - 根据保单号和投保确认时间
if '保单号' in merged_df.columns and '投保确认时间' in merged_df.columns:
merged_df = merged_df.drop_duplicates(
subset=['保单号', '投保确认时间'],
keep='last'
)
print(f" 合并完成: {len(existing_df)} + {len(new_df)} = {len(merged_df)} 行")
else:
print(f" 未找到现有数据,创建新文件")
merged_df = new_df
return merged_df
关键参数解析:
encoding='utf-8-sig': 读取带 BOM 的 UTF-8 文件ignore_index=True: 重新生成 0-N 的连续索引keep='last': 保留重复记录中的最后一条
性能优化建议:
# ❌ 不推荐: 逐行追加(O(n²) 复杂度)
for _, row in new_df.iterrows():
existing_df = existing_df.append(row)
# ✅ 推荐: 一次性合并(O(n) 复杂度)
merged_df = pd.concat([existing_df, new_df], ignore_index=True)
内存优化:
- 大文件(>1GB)可使用
pd.read_csv(chunksize=10000)分块读取 - 合并后立即删除中间变量:
del existing_df, new_df
2.4 scan_and_process_new_files() - 批量处理
功能: 扫描 data 目录,批量处理所有新 Excel 文件
实现流程:
步骤 1: 扫描目录
excel_files = list(self.data_dir.glob('*.xlsx')) + list(self.data_dir.glob('*.xls'))
- 使用
pathlib.Path.glob()查找文件 - 支持
.xlsx和.xls两种格式
步骤 2: 批量处理
all_new_data = []
for excel_file in excel_files:
try:
# 处理单个文件
df = self.process_new_excel(excel_file)
all_new_data.append(df)
# 处理完成后移动到已处理目录
processed_dir = self.data_dir / 'processed'
processed_dir.mkdir(exist_ok=True)
new_path = processed_dir / f"{excel_file.stem}_processed_{datetime.now().strftime('%Y%m%d_%H%M%S')}{excel_file.suffix}"
excel_file.rename(new_path)
print(f" 文件已移动: {new_path}")
except Exception as e:
print(f" 处理失败: {e}")
异常处理策略:
- ✅ 单文件失败不中断: 使用
try-except捕获单文件异常 - ✅ 记录错误信息:
print(f"处理失败: {e}") - ✅ 继续处理下一个: 不抛出异常,保证其他文件正常处理
步骤 3: 合并与保存
if all_new_data:
# 合并所有新数据
combined_new = pd.concat(all_new_data, ignore_index=True)
# 与现有数据合并
final_df = self.merge_with_existing(combined_new)
# 保存
self.save_merged_data(final_df)
print(f"数据更新完成!")
触发方式:
- 手动触发:
python backend/data_processor.py - API 触发:
POST /api/refresh(见 api_server.py:21-37) - 定时任务: 可配置 cron 定时执行
🎯 三、业务规则实现
3.1 负保费处理(退保/批改)
业务场景:
- 退保:
签单/批改保费 < 0(例如:-5000.00) - 批改减保: 部分退保导致保费为负
处理策略:
# ✅ 保留负数,不做修改
df[col] = pd.to_numeric(df[col], errors='coerce')
查询时的处理:
- 保费统计: 直接
sum(),负数会自动抵消正数 - 保单件数: 需要过滤
>= 50的记录(见 data_processor.py:507-509)
# 保单件数: 过滤小额/负保费
period_data = period_data[period_data['签单/批改保费'] >= 50]
daily_stats = period_data.groupby('weekday_index').size()
注意事项:
- ❌ 不要用
abs()取绝对值 - 会导致退保也计入正保费 - ✅ 前端展示时可用红色标注负数
3.2 零手续费场景
业务场景:
- 新能源车险: 部分地区手续费为 0
- 活动期间: 免手续费促销
处理策略:
# ✅ 保留 0 值,不做修改
commission_day = sum_float(day_df['手续费含税']) # 0 也会被累加
验证逻辑:
- ❌ 不要判断
if commission > 0:- 会忽略合法的 0 值 - ✅ 允许 0 值存在,前端可特殊展示
3.3 日期格式标准化
支持的日期格式:
- ISO 8601:
2025-11-08(推荐) - 斜杠分隔:
2025/11/08 - 无分隔符:
20251108 - Excel 序列号:
44500(自动转换)
标准化代码:
df['投保确认时间'] = pd.to_datetime(df['投保确认时间'], errors='coerce')
API 返回格式:
# 统一转换为 YYYY-MM-DD 格式
latest_date.strftime('%Y-%m-%d') # '2025-11-08'
时区处理:
- 当前不涉及时区转换(所有数据均为本地时间)
- 如需支持时区:
pd.to_datetime(..., utc=True)
3.4 缺失值填充策略
填充规则:
df = df.fillna('') # 统一填充为空字符串
各字段缺失值处理:
| 字段类型 | 缺失值处理 | 查询时处理 |
|---|---|---|
| 日期字段 | NaT → '' |
pd.to_datetime(..., errors='coerce') 重新转换 |
| 数值字段 | NaN → '' |
pd.to_numeric(..., errors='coerce') 重新转换 |
| 文本字段 | None → '' |
直接使用空字符串 |
查询时的重新转换 (必需):
# 读取 CSV 后必须重新转换日期
df = pd.read_csv(self.merged_csv, encoding='utf-8-sig')
df['投保确认时间'] = pd.to_datetime(df['投保确认时间'], errors='coerce')
为什么需要重新转换:
- CSV 文件不保留数据类型信息
pd.read_csv()默认将日期读为字符串- 必须显式转换才能进行日期运算
⚡ 四、性能优化技巧
4.1 Pandas 优化模式
优化 1: 使用向量化操作
❌ 低效: 逐行迭代
for _, row in df.iterrows():
if row['签单/批改保费'] >= 50:
count += 1
✅ 高效: 向量化过滤
count = len(df[df['签单/批改保费'] >= 50])
性能提升: 100x - 1000x
优化 2: 避免多次 DataFrame 复制
❌ 低效: 链式过滤(每次创建新 DataFrame)
df = df[df['签单/批改保费'] >= 50]
df = df[df['是否续保'] == '是']
df = df[df['三级机构'] == '达州']
✅ 高效: 合并条件
mask = (df['签单/批改保费'] >= 50) & \
(df['是否续保'] == '是') & \
(df['三级机构'] == '达州')
df = df[mask]
内存节省: 避免创建 3 个中间 DataFrame
优化 3: 使用 low_memory=False 处理大文件
# ✅ 推荐: 大文件读取时指定
df = pd.read_csv(self.merged_csv, encoding='utf-8-sig', low_memory=False)
说明:
low_memory=True(默认): 分块读取,可能导致数据类型推断不一致low_memory=False: 一次性读取,确保类型一致(适用于 <1GB 文件)
4.2 内存管理
技巧 1: 显式删除中间变量
existing_df = pd.read_csv(...)
new_df = pd.read_excel(...)
merged_df = pd.concat([existing_df, new_df])
# ✅ 立即释放内存
del existing_df, new_df
技巧 2: 使用 dtype 指定数据类型(适用于已知列)
# ✅ 指定类型可减少内存占用
dtype_dict = {
'保单号': 'string',
'签单/批改保费': 'float32', # 而非 float64
'签单数量': 'int32' # 而非 int64
}
df = pd.read_csv(..., dtype=dtype_dict)
内存节省: float64 → float32 减少 50% 内存
技巧 3: 分块处理大文件
# ✅ 超大文件(>1GB)使用分块
chunk_size = 10000
chunks = []
for chunk in pd.read_csv(..., chunksize=chunk_size):
# 处理每个分块
chunk = chunk[chunk['签单/批改保费'] >= 50]
chunks.append(chunk)
df = pd.concat(chunks, ignore_index=True)
4.3 大文件处理策略
文件大小分级:
- 小文件 (<10MB): 直接
pd.read_csv()/pd.read_excel() - 中文件 (10MB - 100MB): 使用
low_memory=False - 大文件 (100MB - 1GB): 使用
dtype指定类型 - 超大文件 (>1GB): 使用
chunksize分块处理
当前项目实践:
- 车险清单合并 CSV: 约 20MB (中文件)
- 使用
low_memory=False确保类型一致
4.4 并发处理考虑
当前架构: 单进程顺序处理
可优化方向:
from concurrent.futures import ProcessPoolExecutor
def process_file(excel_file):
df = pd.read_excel(excel_file)
df = _clean_data(df)
return df
# ✅ 多进程并行处理
with ProcessPoolExecutor(max_workers=4) as executor:
results = executor.map(process_file, excel_files)
all_new_data = list(results)
注意事项:
- ⚠️ 多进程会增加内存占用(每个进程独立内存)
- ⚠️ Windows 下需要
if __name__ == '__main__':保护 - ✅ 适用于文件数量多(>10)且单文件小(<10MB)的场景
🔍 五、错误处理与日志
5.1 异常捕获机制
文件处理异常
try:
df = self.process_new_excel(excel_file)
all_new_data.append(df)
except Exception as e:
print(f" 处理失败: {e}")
# ✅ 不中断,继续处理下一个文件
捕获的异常类型:
FileNotFoundError: 文件不存在PermissionError: 文件被占用(Windows)xlrd.biffh.XLRDError: Excel 文件损坏MemoryError: 内存不足pd.errors.ParserError: CSV 解析失败
API 异常处理 (api_server.py:26-37)
@app.route('/api/refresh', methods=['POST'])
def refresh_data():
try:
processor.scan_and_process_new_files()
return jsonify({
'success': True,
'message': '数据刷新成功',
'latest_date': processor.get_latest_date()
})
except Exception as e:
return jsonify({
'success': False,
'message': f'数据刷新失败: {str(e)}'
}), 500
统一响应格式:
- 成功:
{ success: true, data: {...} } - 失败:
{ success: false, message: "错误信息" }, HTTP 500
5.2 日志记录标准
当前日志级别:
print(): 输出到 stdout (终端/日志文件)- 无结构化日志(未使用
logging模块)
日志内容:
print(f"正在处理Excel文件: {excel_path}") # INFO
print(f" 读取成功: {len(df)} 行, {len(df.columns)} 列") # INFO
print(f" 数据清洗完成") # INFO
print(f" 处理失败: {e}") # ERROR
改进建议 (使用 logging 模块):
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler('backend/backend.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# 使用
logger.info(f"正在处理Excel文件: {excel_path}")
logger.error(f"处理失败: {e}", exc_info=True) # 记录完整堆栈
5.3 用户友好的错误提示
API 错误提示设计:
| 错误场景 | HTTP 状态码 | 错误消息 | 用户操作建议 |
|---|---|---|---|
| 未找到数据 | 404 | "未找到数据" | 请先上传 Excel 文件 |
| 文件格式错误 | 500 | "Excel 文件格式不正确" | 请检查文件格式是否为 .xlsx/.xls |
| 必填列缺失 | 500 | "缺少必填列: 保单号" | 请确保 Excel 包含所有必填列 |
| 日期格式错误 | 500 | "投保确认时间格式不正确" | 请使用 YYYY-MM-DD 格式 |
| 内存不足 | 500 | "文件过大,内存不足" | 请分批上传较小的文件 |
前端展示建议:
if (!response.success) {
// ✅ 用户友好的提示
toast.error(response.message)
// ❌ 不友好的提示
console.error(response.message)
}
📖 六、实际代码示例与最佳实践
6.1 完整的文件处理流程
from pathlib import Path
from data_processor import DataProcessor
# 初始化处理器
processor = DataProcessor(
data_dir='data',
staff_mapping_file='业务员机构团队归属.json'
)
# 批量处理新文件
processor.scan_and_process_new_files()
# 输出示例:
# ==========================================
# 找到 2 个Excel文件
# 正在处理Excel文件: data/车险清单_2025年10月.xlsx
# 读取成功: 1234 行, 45 列
# 数据清洗完成
# 合并完成: 5678 + 1234 = 6912 行
# 文件已移动: data/processed/车险清单_2025年10月_processed_20251108_143025.xlsx
# 正在处理Excel文件: data/车险清单_2025年11月.xlsx
# 读取成功: 890 行, 45 列
# 数据清洗完成
# 合并完成: 6912 + 890 = 7802 行
# 文件已移动: data/processed/车险清单_2025年11月_processed_20251108_143030.xlsx
# 数据更新完成!
6.2 单文件处理(手动控制)
from pathlib import Path
import pandas as pd
from data_processor import DataProcessor
processor = DataProcessor()
# 步骤 1: 处理单个 Excel
excel_path = Path('data/车险清单_2025年10月.xlsx')
df = processor.process_new_excel(excel_path)
# 步骤 2: 检查数据
print(f"处理后数据: {len(df)} 行")
print(f"列名: {df.columns.tolist()}")
# 步骤 3: 合并到现有数据
merged_df = processor.merge_with_existing(df)
# 步骤 4: 保存
processor.save_merged_data(merged_df)
print("单文件处理完成!")
6.3 数据验证与质量检查
import pandas as pd
# 读取合并后的数据
df = pd.read_csv('车险清单_2025年10-11月_合并.csv', encoding='utf-8-sig', low_memory=False)
# ===== 数据质量检查 =====
# 1. 检查必填列是否存在
required_cols = ['保单号', '业务员', '投保确认时间', '签单/批改保费']
missing_cols = [col for col in required_cols if col not in df.columns]
if missing_cols:
print(f"❌ 缺少必填列: {missing_cols}")
else:
print(f"✅ 必填列完整")
# 2. 检查保单号唯一性
df['投保确认时间'] = pd.to_datetime(df['投保确认时间'], errors='coerce')
duplicates = df[df.duplicated(subset=['保单号', '投保确认时间'], keep=False)]
if len(duplicates) > 0:
print(f"⚠️ 发现 {len(duplicates)} 条重复记录")
else:
print(f"✅ 无重复记录")
# 3. 检查日期范围
latest_date = df['投保确认时间'].max()
earliest_date = df['投保确认时间'].min()
print(f"📅 日期范围: {earliest_date.date()} ~ {latest_date.date()}")
# 4. 检查保费分布
print(f"💰 保费统计:")
print(f" - 最小值: {df['签单/批改保费'].min():.2f}")
print(f" - 最大值: {df['签单/批改保费'].max():.2f}")
print(f" - 平均值: {df['签单/批改保费'].mean():.2f}")
print(f" - 负保费记录数: {len(df[df['签单/批改保费'] < 0])}")
# 5. 检查业务员映射覆盖率
staff_in_data = set(df['业务员'].dropna().unique())
staff_in_mapping = set() # 从映射文件中提取
coverage = len(staff_in_mapping & staff_in_data) / len(staff_in_data) * 100
print(f"👥 业务员映射覆盖率: {coverage:.1f}%")
6.4 最佳实践总结
✅ 推荐做法
使用 pathlib 处理路径
# ✅ 推荐 from pathlib import Path project_root = Path(__file__).parent.parent csv_path = project_root / '车险清单_2025年10-11月_合并.csv' # ❌ 不推荐 import os csv_path = os.path.join(os.getcwd(), '车险清单_2025年10-11月_合并.csv')读取 CSV 时始终指定 encoding
# ✅ 推荐 df = pd.read_csv(csv_path, encoding='utf-8-sig') # ❌ 不推荐(可能乱码) df = pd.read_csv(csv_path)查询前重新转换日期类型
# ✅ 推荐 df = pd.read_csv(csv_path, encoding='utf-8-sig') df['投保确认时间'] = pd.to_datetime(df['投保确认时间'], errors='coerce') # ❌ 不推荐(日期运算会报错) df = pd.read_csv(csv_path, encoding='utf-8-sig') df['投保确认时间'].max() # TypeError: '>' not supported使用向量化操作
# ✅ 推荐 count = len(df[df['签单/批改保费'] >= 50]) # ❌ 不推荐 count = 0 for _, row in df.iterrows(): if row['签单/批改保费'] >= 50: count += 1异常捕获不中断批量处理
# ✅ 推荐 for file in files: try: process(file) except Exception as e: logger.error(f"处理失败: {e}") continue # 继续处理下一个 # ❌ 不推荐(一个文件失败导致全部中断) for file in files: process(file) # 未捕获异常
❌ 常见陷阱
链式赋值警告
# ❌ 会触发 SettingWithCopyWarning df[df['签单/批改保费'] >= 50]['是否续保'] = '是' # ✅ 使用 .loc df.loc[df['签单/批改保费'] >= 50, '是否续保'] = '是'日期比较前未转换
# ❌ 日期是字符串,比较结果错误 df = pd.read_csv(csv_path) df = df[df['投保确认时间'] > '2025-11-01'] # 字符串比较 # ✅ 先转换为日期 df['投保确认时间'] = pd.to_datetime(df['投保确认时间'], errors='coerce') df = df[df['投保确认时间'] > pd.to_datetime('2025-11-01')]忽略 errors='coerce'
# ❌ 无效日期会抛出异常 df['投保确认时间'] = pd.to_datetime(df['投保确认时间']) # ✅ 无效日期转为 NaT df['投保确认时间'] = pd.to_datetime(df['投保确认时间'], errors='coerce')使用 append() 追加行 (Pandas 1.4+ 已弃用)
# ❌ 已弃用,且性能差 for _, row in df.iterrows(): new_df = new_df.append(row) # ✅ 使用 concat new_df = pd.concat([new_df, df], ignore_index=True)
🔗 七、相关资源
关键代码位置
- backend/data_processor.py - 数据处理核心逻辑
- backend/api_server.py - Flask API 服务
- L21-37: 数据刷新接口
相关 Skills
- analyzing-auto-insurance-data - 数据分析核心方法
- api-endpoint-design (待开发) - API 规范文档
文档资源
- ARCHITECTURE.md - 系统架构文档
- PRODUCT_SPEC.md - 产品需求文档
✅ 总结
核心要点
- 数据流转: Excel → 清洗 → 合并 → 去重 → CSV
- 去重策略: 保单号 + 投保确认时间,
keep='last' - 清洗步骤: 删除空行 → 日期转换 → 数值转换 → 填充空值
- 性能优化: 向量化操作、避免链式过滤、显式删除中间变量
- 异常处理: 单文件失败不中断、记录错误信息、用户友好提示
Token 节省估算
- 每次对话节省: 3000-5000 tokens
- 年使用次数: 约 30 次(数据处理问题)
- 年总节省: 90,000 - 150,000 tokens
适用场景
✅ 适用:
- Excel 文件上传与处理
- 数据清洗规则修改
- 合并逻辑调整
- 性能优化
- 异常排查
❌ 不适用:
- 业务逻辑查询(使用
analyzing-auto-insurance-data) - API 接口设计(使用
api-endpoint-design) - 前端组件开发(使用
vue-component-dev)
文档维护者: Claude Code AI Assistant 创建日期: 2025-11-08 下次审查: 2025-11-22