loading-insurance-data
加载并预处理保险保单周度数据,支持智能周期检测、多周数据加载、数据验证和清洗。在开始任何保险数据分析任务时使用。
When & Why to Use This Skill
This Claude skill automates the end-to-end process of loading and preprocessing weekly insurance policy data. It features intelligent week detection, multi-year support (2024/2025), and robust data validation to ensure high-quality datasets for insurance analytics and financial reporting. By standardizing CSV inputs and handling common data issues like encoding and type conversion, it significantly reduces the manual effort required for insurance data preparation.
Use Cases
- Automating the aggregation of weekly insurance premium reports from multiple CSV files across different time periods.
- Standardizing raw insurance datasets by filtering specific organizations and converting data types for consistent downstream analysis.
- Performing automated data quality checks to identify anomalies like negative premiums or missing mandatory fields before reporting.
- Preparing multi-year comparative datasets (2024-2025) for insurance trend analysis and business intelligence tasks.
| name | loading-insurance-data |
|---|---|
| description | 加载并预处理保险保单周度数据,支持智能周期检测、多周数据加载、数据验证和清洗。在开始任何保险数据分析任务时使用。 |
保险数据加载器
核心功能
处理保险业务周度CSV数据的完整加载流程:
- ✅ 智能检测可用周次
- ✅ 灵活周期范围设置
- ✅ 多年度数据支持(2024/2025)
- ✅ 数据质量验证
- ✅ 标准化预处理
立即使用
from pathlib import Path
import pandas as pd
# 1. 检测可用周次
def detect_available_weeks(data_folder="2025年保单"):
import re
available = set()
for file in Path(data_folder).glob("*保单第*周*.csv"):
match = re.search(r'第(\d+)周', file.name)
if match:
available.add(int(match.group(1)))
return sorted(available)
# 2. 加载单周数据
def load_week_data(week, data_folder="2025年保单"):
pattern = f"*保单第{week}周*.csv"
files = list(Path(data_folder).glob(pattern))
if not files:
return None
df = pd.read_csv(files[0], encoding='utf-8-sig')
df['week_number'] = week
return df
# 3. 批量加载
weeks = detect_available_weeks()
print(f"可用周次: {weeks}")
data = {}
for week in weeks:
df = load_week_data(week)
if df is not None:
data[week] = df
print(f"✅ 第{week}周: {len(df):,}行")
数据文件结构
文件命名规范
{YEAR}保单第{WEEK}周变动成本明细表.csv
示例:
- 2025保单第28周变动成本明细表.csv
- 2025保单第43周变动成本明细表.csv
关键字段
| 字段名 | 类型 | 说明 |
|---|---|---|
policy_start_year |
int | 保单年度 |
third_level_organization |
str | 三级机构 |
business_type_category |
str | 业务类型 |
is_new_energy_vehicle |
bool | 是否新能源车 |
signed_premium_yuan |
float | 签单保费 |
matured_premium_yuan |
float | 满期保费 |
reported_claim_payment_yuan |
float | 已报告赔款 |
expense_amount_yuan |
float | 费用金额 |
claim_case_count |
int | 赔案件数 |
policy_count |
int | 保单件数 |
数据处理流程
步骤1: 周期检测
# 自动扫描目录
available_weeks = detect_available_weeks()
# 确定分析周期
start_week = 28
end_week = 43
analysis_weeks = list(range(start_week, end_week + 1))
# 检查缺失
missing = [w for w in analysis_weeks if w not in available_weeks]
if missing:
print(f"⚠️ 缺失周次: {missing}")
步骤2: 数据加载
loaded_data = {}
for week in analysis_weeks:
if week in available_weeks:
df = load_week_data(week)
if df is not None:
loaded_data[week] = df
步骤3: 数据清洗
def preprocess_data(df):
"""标准化数据处理"""
# 过滤本部
df = df[df['third_level_organization'] != '本部'].copy()
# 提取保单年度
df['policy_year'] = df['policy_start_year'].astype(str).str.extract(r'(202[45])')[0]
# 数值型字段转换
numeric_cols = [
'signed_premium_yuan',
'matured_premium_yuan',
'reported_claim_payment_yuan',
'expense_amount_yuan',
'claim_case_count',
'policy_count'
]
for col in numeric_cols:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
return df
# 应用清洗
for week in loaded_data:
loaded_data[week] = preprocess_data(loaded_data[week])
步骤4: 数据验证
def validate_data(df, week):
"""数据质量检查"""
issues = []
# 检查空数据
if len(df) == 0:
issues.append("数据为空")
# 检查关键字段
required = ['signed_premium_yuan', 'third_level_organization']
missing = [col for col in required if col not in df.columns]
if missing:
issues.append(f"缺失字段: {missing}")
# 检查负值
if 'signed_premium_yuan' in df.columns:
if (df['signed_premium_yuan'] < 0).any():
issues.append("存在负保费")
if issues:
print(f"⚠️ 第{week}周问题: {', '.join(issues)}")
return False
return True
数据输出结构
按年度分组
def group_by_year(loaded_data):
"""按保单年度分组"""
grouped = {'2024': {}, '2025': {}}
for week, df in loaded_data.items():
for year in ['2024', '2025']:
year_df = df[df['policy_year'] == year]
if len(year_df) > 0:
grouped[year][week] = year_df
return grouped
# 使用示例
data_by_year = group_by_year(loaded_data)
print(f"2024保单周次: {list(data_by_year['2024'].keys())}")
print(f"2025保单周次: {list(data_by_year['2025'].keys())}")
常见问题
Q1: 文件编码错误
问题: UnicodeDecodeError
解决: 使用 encoding='utf-8-sig'
df = pd.read_csv(file, encoding='utf-8-sig')
Q2: 周次文件缺失
问题: 第32周、第38周等文件不存在
解决: 自动跳过并记录
if week not in available_weeks:
print(f"⚠️ 第{week}周: 文件不存在,跳过")
continue
Q3: 数据类型错误
问题: 保费字段被识别为字符串
解决: 强制数值转换
df['signed_premium_yuan'] = pd.to_numeric(
df['signed_premium_yuan'],
errors='coerce'
).fillna(0)
Q4: 内存占用过大
问题: 加载多周数据内存不足
解决: 按需加载或只读取必要列
# 只读取需要的列
usecols = [
'policy_year',
'third_level_organization',
'signed_premium_yuan',
'matured_premium_yuan',
'reported_claim_payment_yuan'
]
df = pd.read_csv(file, usecols=usecols, encoding='utf-8-sig')
性能优化
批量加载优化
from concurrent.futures import ThreadPoolExecutor
def load_weeks_parallel(weeks, data_folder):
"""并行加载多周数据"""
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {
executor.submit(load_week_data, week, data_folder): week
for week in weeks
}
results = {}
for future in futures:
week = futures[future]
try:
df = future.result()
if df is not None:
results[week] = df
except Exception as e:
print(f"❌ 第{week}周加载失败: {e}")
return results
内存管理
import gc
# 加载和处理后释放内存
for week in weeks:
df = load_week_data(week)
processed = preprocess_data(df)
# ... 使用数据 ...
del df, processed
gc.collect()
参考资源
脚本工具:
scripts/quick_load.py- 快速数据加载工具scripts/data_validator.py- 数据质量检查工具
参考文档:
reference/data_schema.md- 完整字段说明reference/data_quality_rules.md- 数据质量标准
更新日志
- v1.0 (2025-11-04): 初始版本
- 基础加载功能
- 智能周期检测
- 数据验证和清洗
- 多年度支持