© Dr. Yves J. Hilpisch
The Python Quants GmbH
import numpy as np
import pandas as pd
from pylab import plt
plt.style.use('seaborn')
from pandas_datareader import data as web
%matplotlib inline
h5 = pd.HDFStore('../live/data/equities.h5', 'r')
df = h5['data']
h5.close()
df.info()
raw = pd.DataFrame(df['^GDAXI'])
raw.columns = ['prices']
raw.plot(figsize=(10, 6));
raw['log_rets'] = np.log(raw['prices'] / raw['prices'].shift(1))
raw.dropna(inplace=True)
raw['returns'] = np.where(raw['log_rets'] > 0, 1, 0)
lags = 3
cols = []
for lag in range(1, lags+1):
col = 'lag_%d' % lag
raw[col] = raw['log_rets'].shift(lag)
cols.append(col)
raw['SMA'] = raw['prices'].rolling(10).mean()
raw['trend'] = ((raw['prices'] - raw['SMA']) > 0).astype(int).shift(1)
raw.head()
mean = raw['log_rets'].mean()
std = raw['log_rets'].std()
def buckets(x):
bounds = [mean-std, 0, mean, mean+std]
for i, b in enumerate(sorted(bounds)):
if x < b: return i
else: return i + 1
for _ in cols:
raw[_] = raw[_].apply(buckets)
cols.append('trend')
lags +=1
raw.head()
data = raw.dropna().ix[-10:].copy()
data
x = data[cols].values
x.astype(int)
y = data['returns'].values.reshape((1, -1)).T
y
# initialize weights randomly with mean 0
np.random.seed(1)
weights = 2 * np.random.random((lags, 1)) - 1
weights
# sigmoid function
def sigmoid(x, deriv=False):
if deriv == True:
return sigmoid(x) * (1 - sigmoid(x))
return 1 / (1 + np.exp(-x))
for _ in range(201):
# forward propagation
# layer 1
l1 = sigmoid(np.dot(x, weights))
# errors of layer 1
e = y - l1
if _ % 200 == 0:
print('\nafter %d iterations' % _)
print('MSE: ', (e ** 2).mean())
# multiply errors by the slope of the
# sigmoid at the values in l1
d = e * sigmoid(l1, True)
# update weights
weights += np.dot(x.T, d)
print(l1.round().astype(int).T)
print(y.T)
l1.round() == y
cutoff = '2009-1-1'
data = raw[raw.index < cutoff].dropna().copy()
# initialize weights randomly with mean 0
np.random.seed(1)
weights = 2 * np.random.random((lags, 1)) - 1
weights
x = data[cols].values
y = data['returns'].values.reshape((1, -1)).T
for _ in range(1001):
# forward propagation
# layer 1
l1 = sigmoid(np.dot(x, weights))
# errors of layer 1
e = y - l1
if _ % 200 == 0:
print('\nafter %d iterations' % _)
print('MSE: ', (e ** 2).mean())
# multiply errors by the slope of the
# sigmoid at the values in l1
d = e * sigmoid(l1, True)
# update weights
weights += 0.001 * np.dot(x.T, d)
print(l1.round().astype(int).T)
print(y.T)
sum(y) / len(y)
sum(l1.round() == y) / len(data)
data['pred'] = np.where(l1.round() > 0, 1, -1)
data['strategy'] = data['log_rets'] * data['pred']
data.head()
data[['log_rets', 'strategy']].cumsum().apply(np.exp).plot(figsize=(10, 6))
weights
data = raw[raw.index > cutoff].dropna().copy()
x = data[cols].values
l1 = sigmoid(np.dot(x, weights))
l1.round()
y = data['returns'].values.reshape((1, -1)).T
y
sum(y) / len(y)
sum(l1.round() == y) / len(data)
data['pred'] = np.where(l1.round() > 0, 1, -1)
data['strategy'] = data['log_rets'] * data['pred']
data.head()
data[['log_rets', 'strategy']].cumsum().apply(np.exp).plot(figsize=(10, 6))