《利用Python进行数据分析》读书笔记。
第 11 章:金融和经济数据应用
自2005年开始,python在金融行业中的应用越来越多。
这主要得益于越来越成熟的函数库(NumPy和pandas)以及大量经验丰富的程序员。
许多机构发现python不仅非常适合成为交互式的分析环境,也非常适合开发文件的系统,所需的时间也比Java或C++少得多。
Python还是一种非常好的粘合层,可以非常轻松为C或C++编写的库构建Python接口。
金融分析领域的内容博大精深。在数据规整化方面所花费的精力常常会比解决核心建模和研究问题所花费的时间多得多。
在本章中,术语截面(cross-section)来表示某个时间点的数据。
例如标普500指数中所有成份股在特定日期的收盘价就形成了一个截面。
多个数据在多个时间点的截面数据就构成了一个面板(panel)。
面板数据既可以表示为层次化索引的DataFrame,也可以表示为三维的Panel pandas对象。
数据规整化方面的话题
%pylab inline
import pandas as pd
from pandas import Series, DataFrame
数据规整:Data munging
时间序列以及截面对齐¶
处理金融数据时,最费神的一个问题就是所谓的数据对齐(data alignment)。
两个时间序列的索引可能没有很好的对齐,或者两个DataFrame对象可能含有不匹配的行或者列。
MATLAB、R用户通常会耗费大量的时间来进行数据对对齐工作。
pandas可以在运算中自动对齐数据。这是极好的,会提高效率。
close_px = pd.read_csv('data/ch11/stock_px.csv', parse_dates=True, index_col=0)
volume = pd.read_csv('data/ch11/volume.csv', parse_dates=True, index_col=0)
prices = close_px.ix['2011-09-05':'2011-09-14', ['AAPL', 'JNJ', 'SPX', 'XOM']]
volume = volume.ix['2011-09-05':'2011-09-12', ['AAPL', 'JNJ', 'XOM']]
prices
volume
# 如果想计算一个基于成交量的加权平均价
# pandas在算术运算时会自动对齐数据
# sum函数自动忽略NaN值
prices * volume
# 计算基于成交量的加权平均价
vwap = (prices * volume).sum() / volume.sum()
vwap
vwap.dropna()
# 如果需要手工对齐,可以使用DataFrame的align方法
prices.align(volume, join='inner')
# 通过一组索引可能不同的Series构建DataFrame
s1 = Series(range(3), index=['a', 'b', 'c'])
s2 = Series(range(4), index=['d', 'b', 'c', 'e'])
s3 = Series(range(3), index=['f', 'a', 'c'])
DataFrame({'one': s1, 'two': s2, 'three': s3})
# 可以指定结果的索引(丢弃其余的数据)
DataFrame({'one': s1, 'two': s2, 'three': s3}, index=list('face'))
频率不同的时间按序列的运算¶
经济学时间序列常常按年月日等频率进行数据统计。但有些是无规律的。
频率转换和重对齐的主要工具是resample 和 reindex 方法:
- resample 用于将数据转换到固定频率
- reindex 用于使数据符合一个新索引
二者都支持插值逻辑。
# 一个简单的周时间序列
ts1 = Series(np.random.randn(3),
index=pd.date_range('2012-6-13', periods=3, freq='W-WED'))
ts1
# 重采样到工作日,就会有缺省值出现
ts1.resample('B').mean()
# 用前面的值填充 NaN
ts1.resample('B').ffill()
# 更一般的不规则时间序列
dates = pd.DatetimeIndex(['2012-6-12', '2012-6-17', '2012-6-18',
'2012-6-21', '2012-6-22', '2012-6-29'])
ts2 = Series(np.random.randn(6), index=dates)
ts2
# 如果想将处理过后的ts1加到ts2上,可以先将两个频率弄相同再相加
# 也可以用reindex方法,维持 ts2 的日期索引
ts1.reindex(ts2.index, method='ffill')
ts2 + ts1.reindex(ts2.index, method='ffill')
使用 Period¶
Period 提供了另一种处理不同频率时间序列的方法。
# 比如,一个公司可能会发布其以6月结尾的财年的每季度盈利报告,即频率为Q-JUN
gdp = Series([1.78, 1.94, 2.08, 2.01, 2.15, 2.31, 2.46],
index=pd.period_range('1984Q2', periods=7, freq='Q-SEP'))
infl = Series([0.025, 0.045, 0.037, 0.04],
index=pd.period_range('1982', periods=4, freq='A-DEC'))
gdp
infl
# 与Timestamp的时间序列不同,由period索引的不同频率的时间序列之间的运算必须进行显示转换
# 假设已知 infl 值是每年年末观测的,于是可以将其转换为 Q-SEP ,以得到改频率下的正确时期
infl_q = infl.asfreq('Q-SEP', how='end')
infl_q
# 显示转换以后,就可以被重新索引了(使用向前填充ffill ,以匹配gdp)
infl_q.reindex(gdp.index, method='ffill')
时间和“最当前”数据选取¶
假设有一个很长的盘中数据,希望抽取其中每天特定时间的价格数据。如果数据不规整该怎么办?
# Make an intraday date range and time series
rng = pd.date_range('2012-06-01 09:30', '2012-06-01 15:59', freq='T')
# Make a 5-day series of 9:30-15:59 values
rng = rng.append([rng + pd.offsets.BDay(i) for i in range(1, 4)])
ts = Series(np.arange(len(rng), dtype=float), index=rng)
ts.head()
# 只取10点钟的数据
from datetime import time
ts[time(10, 0)]
# 该操作实际上用了实例方法at_time(各时间序列以及类似的DataFrame对象都有)
ts.at_time(time(10, 0))
# 选取两个Time对象之间的值
ts.between_time(time(10, 0), time(10, 1))
# 可能刚好就没有任何数据落在某个具体的时间上(比如上午10点)。这时,可能会希望得到上午10点之前最后出现的值
#下面将该时间序列的大部分内容随机设置为NA
np.random.seed(12346)
indexer = np.sort(np.random.permutation(len(ts))[700:])
irr_ts = ts.copy()
irr_ts[indexer] = np.nan
irr_ts['2012-06-01 09:50':'2012-06-01 10:00']
#如果将一组Timestamp传入asof方法,就能得到这些时间点处(或其之前最近)的有效值(非NA)。
# 例如,构造一个日期范围(每天上午10点),然后将其传入asof
selection = pd.date_range('2012-06-01 10:00', periods=4, freq='B')
irr_ts.asof(selection)
拼接多个数据源¶
在第七章中曾经介绍了数据拼接的知识,在金融或经济中,还有另外几个经常出现的情况:
- 在一个特定的时间点上,从一个数据源切换到另一个数据源
- 用另一个时间序列对当前时间序列中的缺失值“打补丁”
- 将数据中的符号(国家、资产代码等)替换为实际数据
# 关于特定时间的数据源切换,就是用concat函数进行连接
data1 = DataFrame(np.ones((6, 3), dtype=float),
columns=['a', 'b', 'c'],
index=pd.date_range('6/12/2012', periods=6))
data2 = DataFrame(np.ones((6, 3), dtype=float) * 2,
columns=['a', 'b', 'c'],
index=pd.date_range('6/13/2012', periods=6))
spliced = pd.concat([data1.ix[:'2012-06-14'], data2.ix['2012-06-15':]])
spliced
# 假设data1缺失了data2中存在的某个时间序列
data2 = DataFrame(np.ones((6, 4), dtype=float) * 2,
columns=['a', 'b', 'c', 'd'],
index=pd.date_range('6/13/2012', periods=6))
spliced = pd.concat([data1.ix[:'2012-06-14'], data2.ix['2012-06-15':]])
spliced
# combine_first可以引入合并点之前的数据,这样也就扩展了'd'项的历史
spliced_filled = spliced.combine_first(data2)
spliced_filled
# DataFrame也有一个类似的方法update,它可以实现就地更新
# 如果只想填充空洞,则必须差U纳入overwrite = False才行
# 不传入overwrite会把整条数据都覆盖
spliced.update(data2, overwrite=False)
spliced
# 上面所讲的技术可以将数据中的符号替换为实际数据
# 但有时利用 DataFrame的索引机制直接进行设置会更简单一些
cp_spliced = spliced.copy()
cp_spliced[['a', 'c']] = data1[['a', 'c']]
cp_spliced
收益指数和累计收益¶
金融领域中,收益(return)通常指的是某资产价格的百分比变化。
# 2011到2012年苹果公司的股票价格数据
from pandas_datareader import data, yahoo
price =yahoo.daily.YahooDailyReader.read('AAPL')['Adj Close']
price[-5:]
# 计算两个时间点之间的累计百分比回报只需计算价格的百分比变化即可
price['2011-10-03'] / price['2011-3-01'] - 1
# 通常会先算出一个收益指数,它表示单位投资(比如1美元)收益的时间序列
# 从收益指数中可以得出许多假设。例如,人们可以决定是否进行利润再投资
# 可以用cumprod计算出一个简单的收益指数
returns = price.pct_change()
ret_index = (1 + returns).cumprod()
ret_index[0] = 1 # Set first value to 1
ret_index
# 得到收益指数之后,计算指定时期内的累计收益就很简单了
m_returns = ret_index.resample('BM', how='last').pct_change()
m_returns['2012']
#如果知道了股息的派发日和支付率,就可以将它们计入到每日总收益中
m_rets = (1 + returns).resample('M', how='prod', kind='period') - 1
m_rets['2012']
returns[dividend_dates] += dividend_pcts
分组变换和分析
%pylab inline
import pandas as pd
from pandas import Series, DataFrame
在第九章中,已经学习了分组统计的基础,还学习了如何对数据集的分组应用自定义的变换函数。
下面以一组假想的投资组合为例。
pd.options.display.max_rows = 100
pd.options.display.max_columns = 10
np.random.seed(12345)
import pytz
import random; random.seed(0)
import string
#首先生成1000个股票代码
N = 1000
def rands(n):
choices = string.ascii_uppercase
return ''.join([random.choice(choices) for _ in range(n)])
tickers = np.array([rands(5) for _ in range(N)])
# 创建一个含有3列的DataFrame来承载这些假想数据,不过只选择部分股票组成该投资组合
M = 500
df = DataFrame({'Momentum' : np.random.randn(M) / 200 + 0.03,
'Value' : np.random.randn(M) / 200 + 0.08,
'ShortInterest' : np.random.randn(M) / 200 - 0.02},
index=tickers[:M])
# 随机创建行业分类
ind_names = np.array(['FINANCIAL', 'TECH'])
sampler = np.random.randint(0, len(ind_names), N)
industries = Series(ind_names[sampler], index=tickers,
name='industry')
# 根据行业分类进行分组并执行分组聚合和变换
by_industry = df.groupby(industries)
by_industry.mean()
by_industry.describe()
# 自定义变换:行业内标准化处理(平均值为 0 ,标准差为 1 )
def zscore(group):
return (group - group.mean()) / group.std()
df_stand = by_industry.apply(zscore)
df_stand.groupby(industries).agg(['mean', 'std'])
# 内置变换函数(比如rank)的用法会更简洁一些
ind_rank = by_industry.rank(ascending=False)
ind_rank.groupby(industries).agg(['min', 'max'])
# 在股票投资组合的定量分析中,排名和标准化是一种常见的变换运算组合。
# 通过rank和zscore链接在一起即可完成整个过程
# 行业内排名和标准化,这是把排名进行了标准化
# Industry rank and standardize
by_industry.apply(lambda x: zscore(x.rank())).head()
分组因子暴露¶
因子分析(factor analysis)是投资组合定量管理中的一种技术。
投资组合的持有量和性能(收益与损失)可以被分解为一个或多个表示投资组合权重的因子(风险因子就是其中之一)。
例如,某只股票与某个基准(比如标普500指数)的协动性被称为其beta风险系数。
下面以一个人为构成的投资的投资组合为例进行讲解,它由三个随机生成的因子(通常称为因子载荷)和一些权重构成。
from numpy.random import rand
fac1, fac2, fac3 = np.random.rand(3, 1000)
ticker_subset = tickers.take(np.random.permutation(N)[:1000])
# 因子加权和,噪声
port = Series(0.7 * fac1 - 1.2 * fac2 + 0.3 * fac3 + rand(1000),
index=ticker_subset)
factors = DataFrame({'f1': fac1, 'f2': fac2, 'f3': fac3},
index=ticker_subset)
# 各因子与投资组合之间的矢量相关性可能说明不了什么问题
factors.corrwith(port)
#计算因子暴露的标准方式是最小二乘回归, 可以使用pandas.ols
pd.ols(y=port, x=factors).beta
#可以看出,由于没有给投资组合添加过多的随机噪声,所以原始因子基本恢复了。
# 还可以通过groupby计算各行业的暴露量
def beta_exposure(chunk, factors=None):
return pd.ols(y=chunk, x=factors).beta
# 根据行业进行分组,并应用该函数
by_ind = port.groupby(industries)
exposures = by_ind.apply(beta_exposure, factors=factors)
exposures.unstack()
十分位和四分位分析¶
基于样本分位数的分析是金融分析师们的另一个重要工具,
例如,股票投资组合的性能可以根据各股的市盈率被划分入四分位。
通过pandas.qcut和groupby可以轻松实现分位数分析。
import pandas.io.data as web
data = web.get_data_yahoo('SPY', '2006-01-01')
data.info()
# 计算日收益率,并编写一个用于将收益率转换为趋势信号的函数
px = data['Adj Close']
returns = px.pct_change()
def to_index(rets):
index = (1 + rets).cumprod()
first_loc = max(index.index.get_loc(index.idxmax()) - 1, 0)
index.values[first_loc] = 1
return index
def trend_signal(rets, lookback, lag):
signal = pd.rolling_sum(rets, lookback, min_periods=lookback - 5)
return signal.shift(lag)
# 通过该函数,我们可以单纯地创建和测试一种根据每周五动量信号进行交易的交易策略
signal = trend_signal(returns, 100, 3)
trade_friday = signal.resample('W-FRI').resample('B', fill_method='ffill')
trade_rets = trade_friday.shift(1) * returns
trade_rets = trade_rets[:len(returns)]
# 将该策略的收益率转换为一个收益指数,并绘制一张图表
to_index(trade_rets).plot()
# 假如希望将该策略的性能按不同大小的交易期波幅进行划分。
# 年度标准差是计算波幅的一种简单办法,可以通过计算夏普比率来观察不同波动机制下的风险收益率:
vol = pd.rolling_std(returns, 250, min_periods=200) * np.sqrt(250)
def sharpe(rets, ann=250):
return rets.mean() / rets.std() * np.sqrt(ann)
# 现在利用qcut将vol划分为4等份,并用sharpe进行聚合
cats = pd.qcut(vol, 4)
print('cats: %d, trade_rets: %d, vol: %d' % (len(cats), len(trade_rets), len(vol)))
trade_rets.groupby(cats).agg(sharpe)
更多示例应用
%pylab inline
import pandas as pd
from pandas import Series, DataFrame
信号前沿分析¶
本小节将介绍一种简化的截面动量投资组合,并得出如何得到模型参数化网格。
# 将几只股票做成一个投资组合,并假装历史价格数据
import pandas_datareader.data as web
names = ['AAPL', 'GOOG', 'MSFT', 'DELL', 'GS', 'MS', 'BAC', 'C']
def get_px(stock, start, end):
return web.DataReader(stock, 'yahoo',start, end)['Adj Close']
px = DataFrame({n: get_px(n, None, None) for n in names})
#px = pd.read_csv('data/ch11/stock_px.csv')
px.head()
# 绘制每只股票的累计收益
px = px.asfreq('B').fillna(method='pad')
rets = px.pct_change()
((1 + rets).cumprod() - 1).plot()
# 对于投资组合的构建,计算特定回顾期的动量,然后按降序排列,并标准化
def calc_mom(price, lookback, lag):
mom_ret = price.shift(lag).pct_change(lookback)
ranks = mom_ret.rank(axis=1, ascending=False)
demeaned = ranks.subtract(ranks.mean(axis=1), axis=0)
return demeaned.divide(demeaned.std(axis=1), axis=0)
# 编写检验函数,计算夏普比率
compound = lambda x : (1 + x).prod() - 1
daily_sr = lambda x: x.mean() / x.std()
def strat_sr(prices, lb, hold):
# Compute portfolio weights
freq = '%dB' % hold
port = calc_mom(prices, lb, lag=1)
daily_rets = prices.pct_change()
# 计算投资组合收益
port = port.shift(1).resample(freq, how='first')
returns = daily_rets.resample(freq, how=compound)
port_rets = (port * returns).sum(axis=1)
return daily_sr(port_rets) * np.sqrt(252 / hold)
# 得到一个标量值
strat_sr(px, 70, 30)
# 对参数网格(多对参数组合)应用strat_sr函数,结果报出到defaultdict中
# 最后将全部结果放到DataFrame
from collections import defaultdict
lookbacks = range(20, 90, 5)
holdings = range(20, 90, 5)
dd = defaultdict(dict)
for lb in lookbacks:
for hold in holdings:
dd[lb][hold] = strat_sr(px, lb, hold)
ddf = DataFrame(dd)
ddf.index.name = 'Holding Period'
ddf.columns.name = 'Lookback Period'
# 生成热力图
import matplotlib.pyplot as plt
def heatmap(df, cmap=plt.cm.gray_r):
fig = plt.figure()
ax = fig.add_subplot(111)
axim = ax.imshow(df.values, cmap=cmap, interpolation='nearest')
ax.set_xlabel(df.columns.name)
ax.set_xticks(np.arange(len(df.columns)))
ax.set_xticklabels(list(df.columns))
ax.set_ylabel(df.index.name)
ax.set_yticks(np.arange(len(df.index)))
ax.set_yticklabels(list(df.index))
plt.colorbar(axim)
heatmap(ddf)
期货合约转仓¶
pd.options.display.max_rows = 10
import pandas_datareader.data as web
px = web.DataReader('SPY', 'yahoo')['Adj Close']
#px = web.get_data_yahoo('')['Adj Close'] * 10
px
# 放入两份标普500指数合约及其到期日
from datetime import datetime
expiry = {'ESU2': datetime(2012, 9, 21),
'ESZ2': datetime(2012, 12, 21)}
expiry = Series(expiry).order()
expiry
# 用 雅虎的价格及随机漫步噪声模拟未来走势
np.random.seed(12347)
N = 200
walk = (np.random.randint(0, 200, size=N) - 100) * 0.25
perturb = (np.random.randint(0, 20, size=N) - 10) * 0.25
walk = walk.cumsum()
rng = pd.date_range(px.index[0], periods=len(px) + N, freq='B')
near = np.concatenate([px.values, px.values[-1] + walk])
far = np.concatenate([px.values, px.values[-1] + walk + perturb])
prices = DataFrame({'ESU2': near, 'ESZ2': far}, index=rng)
# prices: 关于这两个合约的时间序列
prices.tail()
# 将多个时间序列合并为单个连续序列。方法:构造一个加权矩阵。
# 其中活动合约的权重为1, 直到期满为止。
# 下面的函数计算加权矩阵,权重根据到期前的期数减少而线性衰减
def get_roll_weights(start, expiry, items, roll_periods=5):
# start : first date to compute weighting DataFrame
# expiry : Series of ticker -> expiration dates
# items : sequence of contract names
dates = pd.date_range(start, expiry[-1], freq='B')
weights = DataFrame(np.zeros((len(dates), len(items))),
index=dates, columns=items)
prev_date = weights.index[0]
for i, (item, ex_date) in enumerate(expiry.iteritems()):
if i < len(expiry) - 1:
weights.ix[prev_date:ex_date - pd.offsets.BDay(), item] = 1
roll_rng = pd.date_range(end=ex_date - pd.offsets.BDay(),
periods=roll_periods + 1, freq='B')
decay_weights = np.linspace(0, 1, roll_periods + 1)
weights.ix[roll_rng, item] = 1 - decay_weights
weights.ix[roll_rng, expiry.index[i + 1]] = decay_weights
else:
weights.ix[prev_date:, item] = 1
prev_date = ex_date
return weights
# 权重计算结果
weights = get_roll_weights('6/1/2012', expiry, prices.columns)
weights.ix['2012-09-12':'2012-09-21']
# 转仓收益就是合约收益的加权和
rolled_returns = (prices.pct_change() * weights).sum(1)
# 相关系数是观察两个资产时间序列变化的协动性的一种手段。
# pandas 的 rolling_corr函数可以根据两个收益序列,计算出移动窗口的相关系数
# 加载价格序列并计算每日收益率
aapl = web.get_data_yahoo('AAPL', '2000-01-01')['Adj Close']
msft = web.get_data_yahoo('MSFT', '2000-01-01')['Adj Close']
aapl_rets = aapl.pct_change()
msft_rets = msft.pct_change()
# 计算一年期移动相关系数并绘制图表
pd.rolling_corr(aapl_rets, msft_rets, 250).plot()
plt.figure()
# 两个资产直接的相关系数不能捕获波动性差异。
# 最小二乘回归,提供了一个变量与一个或多个其他预测变量之间动态关系的建模办法
model = pd.ols(y=aapl_rets, x={'MSFT': msft_rets}, window=250)
model.beta
model.beta['MSFT'].plot()