Python时间序列数据获取与处理全攻略
1. Python获取时间序列数据的完整指南在机器学习和数据分析项目中获取高质量的时间序列数据是至关重要的第一步。无论是测试新算法、建立基准模型还是研究特定领域的时间模式合适的数据集都能为项目奠定坚实基础。本文将全面介绍如何使用Python获取真实世界的时间序列数据并创建符合需求的合成数据。提示时间序列数据是按时间顺序排列的数据点集合广泛应用于金融、经济、气象、物联网等领域。掌握其获取方法能极大提升数据分析效率。1.1 为什么需要多种数据获取方式真实世界的数据获取通常面临三大挑战数据源分散且格式不统一API接口频繁变更特定场景数据难以获取为此我们需要掌握多种数据获取技术使用封装好的库如pandas-datareader快速获取标准数据直接调用Web API获取原始数据生成合成数据满足特定测试需求2. 使用pandas-datareader获取金融经济数据2.1 环境准备与安装首先确保已安装必要的库pip install pandas_datareader requests matplotlibpandas-datareader支持从多个权威数据源获取时间序列数据Yahoo Finance股票市场数据FRED美联储经济数据World Bank全球发展数据2.2 获取股票数据实战以获取苹果公司(AAPL)2021年股价数据为例import pandas_datareader as pdr # 从Yahoo Finance获取苹果股票数据 aapl_df pdr.DataReader(AAPL, yahoo, start2021-01-01, end2021-12-31) print(aapl_df.head())输出结果包含六个关键指标High Low Open Close Volume Adj Close Date 2021-01-04 133.610001 126.760002 133.520004 129.410004 143301900.0 128.453461 2021-01-05 131.740005 128.429993 128.889999 131.009995 97664900.0 130.0416112.3 多股票对比分析同时获取多家公司数据只需传入股票代码列表companies [AAPL, MSFT, GE] # 苹果、微软、通用电气 multi_df pdr.DataReader(companies, yahoo, start2021-01-01, end2021-12-31) # 绘制2021年4-6月收盘价对比 import matplotlib.pyplot as plt multi_df[Close].loc[2021-04-01:2021-06-30].plot(figsize(12,6)) plt.title(2021年Q2三大公司股价对比) plt.ylabel(股价(美元)) plt.show()2.4 获取经济指标数据从FRED获取消费者价格指数(CPI)数据# 获取CPI数据(CPIAUCSL)和核心CPI(CPILFESL) cpi_df pdr.DataReader([CPIAUCSL,CPILFESL], fred, 2010-01-01, 2021-12-31) # 绘制2019-2021年数据 cpi_df.loc[2019:].plot(figsize(12,6)) plt.title(消费者价格指数变化趋势) plt.xticks(rotation45) plt.show()2.5 世界银行数据获取技巧世界银行数据更为复杂需要处理国家维度from pandas_datareader import wb # 搜索人口相关指标 pop_indicators wb.search(population.*total) # 获取各国2020年人口数据 countries wb.get_countries() non_aggregates list(countries[countries.region ! Aggregates][iso2c]) pop_df wb.download(indicatorSP.POP.TOTL, countrynon_aggregates, start2020, end2020) # 显示人口最多的25国 top25 pop_df.dropna().sort_values(SP.POP.TOTL).iloc[-25:] top25.plot(kindbarh, figsize(10,8)) plt.title(2020年人口最多25国) plt.xlabel(人口数量) plt.show()3. 直接调用Web API获取原始数据3.1 世界银行API实战当pandas-datareader无法满足需求时可直接调用APIimport requests import pandas as pd # 1. 获取国家列表 countries_url http://api.worldbank.org/v2/country?formatjsonper_page500 response requests.get(countries_url) _, countries_data response.json() # 2. 筛选非聚合国家 valid_countries [c[id] for c in countries_data if c[region][value] ! Aggregates] # 3. 获取人口数据 pop_url http://api.worldbank.org/v2/country/all/indicator/SP.POP.TOTL?date2020formatjsonper_page500 _, pop_data requests.get(pop_url).json() # 4. 数据处理 population [] for item in pop_data: if item[countryiso3code] in valid_countries: pop { country: item[country][value], code: item[countryiso3code], population: item[value] } population.append(pop) # 5. 转换为DataFrame并分析 df pd.DataFrame(population).dropna()3.2 API调用最佳实践分页处理大多数API限制每页返回数量需要循环获取page 1 all_data [] while True: url fhttp://api.example.com/data?page{page} response requests.get(url) data response.json() if not data: break all_data.extend(data) page 1错误处理添加重试机制from time import sleep def safe_get(url, max_retries3): for _ in range(max_retries): try: response requests.get(url, timeout10) return response.json() except Exception as e: print(fError: {e}, retrying...) sleep(2) return None参数化请求使用字典管理查询参数params { indicator: SP.POP.TOTL, date: 2020:2020, format: json, per_page: 500 } response requests.get(http://api.worldbank.org/v2/country/all/indicator/SP.POP.TOTL, paramsparams)4. 生成合成时间序列数据4.1 自回归(AR)模型实现AR模型是生成时间序列的基础方法模拟当前值与历史值的关系import numpy as np import pandas as pd def generate_ar_series(coeffs, length200, noise0.1): 生成AR(n)时间序列 coeffs: 系数列表 [b_n, b_{n-1}, ..., b_1] length: 序列长度 noise: 噪声水平 order len(coeffs) series list(np.random.randn(order)) # 随机初始化 for _ in range(length - order): # 计算当前值 b1*x_{t-1} b2*x_{t-2} ... bn*x_{t-n} noise next_val np.dot(coeffs, series[-order:]) np.random.randn() * noise series.append(next_val) # 转换为带时间索引的DataFrame dates pd.date_range(start2022-01-01, periodslength, freqD) return pd.DataFrame(series, indexdates, columns[Value]) # 生成AR(3)序列x_t 0.6*x_{t-1} - 0.2*x_{t-2} 0.1*x_{t-3} ε ar_data generate_ar_series([0.6, -0.2, 0.1], length300) ar_data.plot(figsize(12,5), titleAR(3) Synthetic Time Series)4.2 更复杂的合成数据生成4.2.1 添加趋势和季节性def generate_complex_series(length365): # 基础趋势 trend np.linspace(0, 10, length) # 季节性成分 seasonal 5 * np.sin(np.arange(length) * 2 * np.pi / 30) # 随机噪声 noise np.random.normal(0, 1, length) # 组合所有成分 series trend seasonal noise # 创建时间索引 dates pd.date_range(start2022-01-01, periodslength) return pd.DataFrame(series, indexdates, columns[Value]) complex_data generate_complex_series() complex_data.plot(figsize(12,5), title合成数据(趋势季节性噪声))4.2.2 生成多元时间序列def generate_multivariate_series(n_vars3, length200): # 生成相关系数矩阵 corr_matrix np.array([[1.0, 0.8, 0.3], [0.8, 1.0, 0.5], [0.3, 0.5, 1.0]]) # Cholesky分解生成相关随机变量 L np.linalg.cholesky(corr_matrix) uncorrelated np.random.randn(length, n_vars) correlated uncorrelated L.T # 添加时间趋势 trends np.linspace(0, 5, length).reshape(-1,1) * np.arange(1, n_vars1) final_series correlated trends # 创建DataFrame dates pd.date_range(start2022-01-01, periodslength) columns [fVar_{i1} for i in range(n_vars)] return pd.DataFrame(final_series, indexdates, columnscolumns) multi_data generate_multivariate_series() multi_data.plot(subplotsTrue, figsize(12,8), layout(3,1))5. 数据处理与可视化技巧5.1 数据清洗实战处理获取到的原始数据常见问题# 1. 处理缺失值 df.fillna(methodffill, inplaceTrue) # 前向填充 df.interpolate(methodtime, inplaceTrue) # 时间插值 # 2. 处理异常值 def remove_outliers(df, threshold3): z_scores (df - df.mean()) / df.std() return df[(z_scores.abs() threshold).all(axis1)] # 3. 重采样 daily_to_monthly df.resample(M).mean() # 日数据转月平均5.2 高级可视化技巧import matplotlib.pyplot as plt import seaborn as sns # 1. 多图组合 fig, (ax1, ax2) plt.subplots(2, 1, figsize(12,8)) # 原始序列 df[Value].plot(axax1, title原始时间序列) ax1.set_ylabel(值) # 滚动平均 df[Value].rolling(window7).mean().plot(axax2, colorr, title7天滚动平均) ax2.set_ylabel(平滑值) plt.tight_layout() # 2. 季节分解图 from statsmodels.tsa.seasonal import seasonal_decompose result seasonal_decompose(df[Value], modeladditive, period30) result.plot() plt.show()6. 实际应用案例6.1 构建股票价格预测数据集def build_stock_dataset(tickers, start_date, end_date): # 获取原始数据 raw_data pdr.DataReader(tickers, yahoo, start_date, end_date) # 特征工程 features {} for ticker in tickers: # 计算日收益率 features[f{ticker}_Return] raw_data[Close][ticker].pct_change() # 计算5日移动平均 features[f{ticker}_MA5] raw_data[Close][ticker].rolling(5).mean() # 计算波动率 features[f{ticker}_Volatility] raw_data[Close][ticker].rolling(20).std() # 合并特征 feature_df pd.DataFrame(features) # 添加目标变量(次日收益率) for ticker in tickers: feature_df[f{ticker}_Target] feature_df[f{ticker}_Return].shift(-1) # 清理数据 final_df feature_df.dropna() return final_df # 构建苹果、微软、谷歌的预测数据集 stock_data build_stock_dataset([AAPL, MSFT, GOOG], 2020-01-01, 2022-12-31)6.2 经济指标相关性分析# 获取多种经济指标 indicators { GDP: GDPC1, # 实际GDP Unemployment: UNRATE, # 失业率 CPI: CPIAUCSL, # 消费者价格指数 Interest: FEDFUNDS # 联邦基金利率 } econ_data pdr.DataReader(list(indicators.values()), fred, 2000-01-01, 2022-12-31) econ_data.columns list(indicators.keys()) # 计算季度数据相关性 quarterly econ_data.resample(Q).mean() corr_matrix quarterly.corr() # 绘制热力图 sns.heatmap(corr_matrix, annotTrue, cmapcoolwarm) plt.title(经济指标相关性分析) plt.show()7. 性能优化与大规模数据处理7.1 并行获取数据from concurrent.futures import ThreadPoolExecutor def fetch_single_stock(ticker): try: return pdr.DataReader(ticker, yahoo, 2020-01-01, 2022-12-31) except Exception as e: print(fFailed to fetch {ticker}: {e}) return None tickers [AAPL, MSFT, GOOG, AMZN, FB, TSLA, NVDA, PYPL] with ThreadPoolExecutor(max_workers4) as executor: results list(executor.map(fetch_single_stock, tickers)) # 合并结果 valid_results [r for r in results if r is not None] combined_data pd.concat([df[Close] for df in valid_results], axis1) combined_data.columns tickers[:len(valid_results)]7.2 使用Dask处理大数据import dask.dataframe as dd # 假设我们有一个非常大的CSV文件 ddf dd.read_csv(large_dataset.csv, parse_dates[timestamp]) ddf ddf.set_index(timestamp) # 分布式计算滚动平均 rolling_mean ddf[value].rolling(30).mean().compute() # 实际计算发生在调用compute()时8. 常见问题与解决方案8.1 数据获取问题排查API返回空数据检查日期范围是否有效验证股票代码/指标代码是否正确确认数据源是否仍然可用(Yahoo Finance等可能变更API)数据缺失处理# 前向填充 df.fillna(methodffill, inplaceTrue) # 插值处理 df.interpolate(methodtime, inplaceTrue) # 删除缺失值 df.dropna(inplaceTrue)频率不一致问题# 统一转换为日数据 df.asfreq(D, methodpad) # 转换为周数据 df.resample(W).mean()8.2 合成数据常见问题生成的数据不稳定检查AR模型系数是否满足平稳性条件尝试减小系数绝对值或增加噪声缺乏现实感添加趋势和季节性成分考虑使用更复杂的模型(如ARIMA、GARCH)多变量相关性控制使用Cholesky分解确保变量间相关性考虑使用copula方法建模复杂依赖关系9. 扩展应用与进阶技巧9.1 使用yfinance替代pandas-datareader由于Yahoo Finance API变更可以使用更稳定的yfinance库import yfinance as yf # 获取苹果股票数据 aapl yf.Ticker(AAPL) hist aapl.history(start2020-01-01, end2022-12-31) # 获取股息数据 dividends aapl.dividends # 获取财务报表 financials aapl.financials9.2 使用Quandl获取专业数据集import quandl # 需要先设置API密钥 quandl.ApiConfig.api_key your_api_key # 获取WTI原油价格 oil quandl.get(EIA/PET_RWTC_D) # 获取比特币价格 btc quandl.get(BCHAIN/MKPRU)9.3 生成具有突变点的序列def generate_with_breakpoints(length200, n_breaks2): # 生成基础序列 base np.cumsum(np.random.randn(length)) # 随机选择突变点 break_points sorted(np.random.choice(range(50, length-50), n_breaks, replaceFalse)) # 添加突变 for bp in break_points: base[bp:] np.random.randn() * 5 # 随机跳跃 # 添加趋势变化 trends [] current_trend np.random.randn() * 0.05 for i in range(length): if i in break_points: current_trend np.random.randn() * 0.05 trends.append(current_trend) final_series base np.cumsum(trends) # 转换为DataFrame dates pd.date_range(start2022-01-01, periodslength) return pd.DataFrame(final_series, indexdates, columns[Value]) break_data generate_with_breakpoints() break_data.plot(figsize(12,5), title带有突变点的时间序列)10. 项目实战构建完整的数据管道10.1 数据获取与存储自动化import sqlite3 from datetime import datetime, timedelta def update_stock_database(tickers): # 连接数据库 conn sqlite3.connect(stocks.db) # 计算日期范围(最近2年) end_date datetime.now().strftime(%Y-%m-%d) start_date (datetime.now() - timedelta(days365*2)).strftime(%Y-%m-%d) # 获取数据 new_data pdr.DataReader(tickers, yahoo, start_date, end_date) # 存储数据 for ticker in tickers: ticker_data new_data[Close][ticker].reset_index() ticker_data[Ticker] ticker ticker_data.to_sql(stock_prices, conn, if_existsappend, indexFalse) conn.close() # 定时执行(实际项目中可使用APScheduler等) update_stock_database([AAPL, MSFT, GOOG])10.2 数据质量监控def check_data_quality(df): issues [] # 检查缺失值 missing df.isnull().sum() if missing.any(): issues.append(f缺失值发现: {missing[missing0].to_dict()}) # 检查异常值 z_scores (df - df.mean()) / df.std() outliers (z_scores.abs() 3).sum() if outliers.any(): issues.append(f异常值发现: {outliers[outliers0].to_dict()}) # 检查数据频率 freq_diff df.index.to_series().diff().value_counts() if len(freq_diff) 1: issues.append(f不一致的时间频率: {freq_diff.to_dict()}) return issues if issues else 数据质量良好 # 示例使用 quality_report check_data_quality(stock_data) print(quality_report)10.3 构建特征工程管道from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler from sklearn.feature_selection import SelectKBest, f_regression def create_feature_pipeline(n_features10): return Pipeline([ (imputer, SimpleImputer(strategymean)), # 填充缺失值 (scaler, StandardScaler()), # 标准化 (selector, SelectKBest(score_funcf_regression, kn_features)) # 特征选择 ]) # 使用示例 pipeline create_feature_pipeline() X_transformed pipeline.fit_transform(X_train, y_train)掌握这些时间序列数据获取与处理方法后你将能够为各类数据分析项目快速准备高质量的数据集。无论是使用真实数据还是生成特定场景的合成数据这些技能都是现代数据分析师和机器学习工程师的核心竞争力。