Dr Yves J Hilpisch | The AI Machine
import os
import math
import time
import random
import numpy as np
import pandas as pd
from pylab import plt
from IPython import display
plt.style.use('seaborn')
np.set_printoptions(precision=4, suppress=True)
os.environ['PYTHONHASHSEED'] = '0'
import warnings; warnings.simplefilter('ignore')
import gym
env = gym.make('CartPole-v0')
env.observation_space
env.observation_space.low.astype(np.float16)
env.observation_space.high.astype(np.float16)
state = env.reset()
state # [cart position, cart velocity, pole angle, pole angular velocity]
env.action_space
env.action_space.n
env.action_space.sample()
env.action_space.sample()
a = env.action_space.sample()
a
state, reward, done, info = env.step(a)
state, reward, done, info
env.reset()
for e in range(1, 200):
a = env.action_space.sample()
state, reward, done, info = env.step(a) # <2>
print(f'step={e:2d} | state={state} | action={a} | reward={reward}')
if done and (e + 1) < 200:
print('*** FAILED ***')
break
done
env.reset()
img = plt.imshow(env.render(mode='rgb_array')) # initialize bitmap embedding
for e in range(100):
img.set_data(env.render(mode='rgb_array')) # updating the data
display.display(plt.gcf())
display.clear_output(wait=True)
a = env.action_space.sample() # random action choice
obs, rew, done, _ = env.step(a) # taking action
if done and (e + 1) < 200:
print('*** FAILED ***')
break
import tensorflow as tf
from keras.layers import Dense, Dropout
from keras.models import Sequential
from keras.optimizers import Adam
from sklearn.metrics import accuracy_score
def set_seeds(seed=100):
random.seed(seed)
np.random.seed(seed)
tf.random.set_seed(seed)
env.seed(seed)
from collections import deque
class DQLAgent:
def __init__(self, gamma=0.95, lr=0.001, finish=1e10):
self.finish = finish
self.epsilon = 1.0
self.epsilon_min = 0.01
self.epsilon_decay = 0.995
self.gamma = gamma
self.batch_size = 32
self.lr = lr
self.max_treward = 0
self.averages = list()
self.memory = deque(maxlen=2000)
self.osn = env.observation_space.shape[0]
self.model = self._build_model()
def _build_model(self):
model = Sequential()
model.add(Dense(24, input_dim=self.osn,
activation='relu'))
model.add(Dense(24, activation='relu'))
model.add(Dense(env.action_space.n, activation='linear'))
model.compile(loss='mse', optimizer=Adam(lr=self.lr))
return model
def act(self, state):
if random.random() <= self.epsilon:
return env.action_space.sample()
action = self.model.predict(state)[0]
return np.argmax(action)
def replay(self):
batch = random.sample(self.memory, self.batch_size)
for state, action, reward, next_state, done in batch:
if not done:
reward += self.gamma * np.amax(
self.model.predict(next_state)[0])
target = self.model.predict(state)
target[0, action] = reward
self.model.fit(state, target, epochs=1,
verbose=False)
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
def learn(self, episodes, max_iter=200):
trewards = []
for e in range(1, episodes + 1):
state = env.reset()
state = np.reshape(state, [1, self.osn])
for _ in range(max_iter):
action = self.act(state)
next_state, reward, done, info = env.step(action)
next_state = np.reshape(next_state,
[1, self.osn])
self.memory.append([state, action, reward,
next_state, done])
state = next_state
if done:
treward = _ + 1
trewards.append(treward)
av = sum(trewards[-25:]) / 25
self.averages.append(av)
self.max_treward = max(self.max_treward, treward)
templ = 'episode: {:4d}/{} | treward: {:4d} | '
templ += 'av: {:6.1f} | max: {:4d}'
print(templ.format(e, episodes, treward, av,
self.max_treward), end='\r')
break
if av > self.finish:
break
if len(self.memory) > self.batch_size:
self.replay()
def test(self, episodes, max_iter=200):
trewards = []
for e in range(1, episodes + 1):
state = env.reset()
for _ in range(max_iter):
state = np.reshape(state, [1, self.osn])
action = np.argmax(self.model.predict(state)[0])
next_state, reward, done, info = env.step(action)
state = next_state
if done:
treward = _ + 1
trewards.append(treward)
print('episode: {:4d}/{} | treward: {:4d}'
.format(e, episodes, treward), end='\r')
time.sleep(0.05)
break
return trewards
set_seeds(100)
agent = DQLAgent(lr=0.001, finish=195)
episodes = 1000
agent.learn(episodes)
agent.epsilon
plt.figure(figsize=(10, 6))
x = range(len(agent.averages))
y = np.polyval(np.polyfit(x, agent.averages, deg=3), x)
plt.plot(agent.averages, label='moving average')
plt.plot(x, y, 'r--', label='regression')
plt.xlabel('episodes')
plt.ylabel('total reward')
plt.legend();
trewards = agent.test(100)
sum(trewards) / len(trewards)
class observation_space:
def __init__(self, n):
self.shape = (n,)
class action_space:
def __init__(self, n):
self.n = n
def sample(self):
return random.randint(0, self.n - 1)
class Finance:
url = 'http://hilpisch.com/aiif_eikon_eod_data.csv'
def __init__(self, symbol, features):
self.symbol = symbol
self.features = features
self.observation_space = observation_space(4)
self.osn = self.observation_space.shape[0]
self.action_space = action_space(2)
self.min_accuracy = 0.5
self._get_data()
self._prepare_data()
def _get_data(self):
self.raw = pd.read_csv(self.url, index_col=0,
parse_dates=True).dropna()
def _prepare_data(self):
self.data = pd.DataFrame(self.raw[self.symbol])
self.data['r'] = np.log(self.data / self.data.shift(1))
self.data.dropna(inplace=True)
self.data = (self.data - self.data.mean()) / self.data.std()
self.data['d'] = np.where(self.data['r'] > 0, 1, 0)
def _get_state(self):
return self.data[self.features].iloc[
self.bar - self.osn:self.bar].values
def seed(self, seed=None):
pass
def reset(self):
self.treward = 0
self.accuracy = 0
self.bar = self.osn
state = self.data[self.features].iloc[
self.bar - self.osn:self.bar]
return state.values
def step(self, action):
correct = action == self.data['d'].iloc[self.bar]
reward = 1 if correct else 0
self.treward += reward
self.bar += 1
self.accuracy = self.treward / (self.bar - self.osn)
if self.bar >= len(self.data):
done = True
elif reward == 1:
done = False
elif (self.accuracy < self.min_accuracy and
self.bar > self.osn + 10):
done = True
else:
done = False
state = self._get_state()
info = {}
return state, reward, done, info
env = Finance('EUR=', 'r')
env.reset()
a = env.action_space.sample()
a
env.step(a)
set_seeds(100)
agent = DQLAgent(lr=0.001, gamma=0.5, finish=2400)
episodes = 1000
agent.learn(episodes, max_iter=2600)
agent.epsilon
agent.test(3, max_iter=2600)
plt.figure(figsize=(10, 6))
x = range(len(agent.averages))
y = np.polyval(np.polyfit(x, agent.averages, deg=3), x)
plt.plot(agent.averages, label='moving average')
plt.plot(x, y, 'r--', label='regression')
plt.xlabel('episodes')
plt.ylabel('total reward')
plt.legend();