# Base Libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
plt.style.use('fivethirtyeight')
import yfinance as yf
# Preprocessing
from sklearn.preprocessing import MinMaxScaler
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split, GridSearchCV, TimeSeriesSplit, cross_val_score
# Classifier
from sklearn.neighbors import KNeighborsClassifier
# Metrics
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.metrics import plot_confusion_matrix, auc, roc_curve
# Ignore warnings
import warnings
warnings.filterwarnings('ignore')
# df = yf.download('SPY', start='2000-01-01', end = '2021-12-31', progress=False)
df = pd.read_csv('spy.csv', index_col=0, parse_dates=True)
df.head()
df.describe()
# Predictors
df['O-C'] = df['Open'] - df['Close']
df['H-L'] = df['High'] - df['Low']
# Target
'''
y = 1 if tomorrow's close is greater than 99.5% of today's close
y = -1 otherwise
'''
df['y'] = np.where(df['Adj Close'].shift(-1)>0.995*df['Adj Close'],1,-1)
df.head(10)
# Convert to numpy
data = df[['O-C', 'H-L', 'y']].to_numpy()
data[:5]
# Splitting the datasets into training and testing data.
# Always keep shuffle = False for financial time series
X_train, X_test, y_train, y_test = train_test_split(data[:, :-1], data[:, -1], test_size=0.2, random_state=0, shuffle=False)
# Output the train and test data size
print(f"Train and Test Size {len(X_train)}, {len(X_test)}")
# Scale and fit the model
pipe = Pipeline([
("scaler", MinMaxScaler()),
("classifier", KNeighborsClassifier())
])
pipe.fit(X_train, y_train)
y_train_pred = pipe.predict(X_train)
y_test_pred = pipe.predict(X_test)
acc_train = accuracy_score(y_train, y_train_pred)
acc_test = accuracy_score(y_test, y_test_pred)
print(f'Train Accuracy: {acc_train:0.4}, Test Accuracy: {acc_test:0.4}')
# Demonstration of Series.ravel()
mySeries = pd.Series([11, 32, 53, 14])
a, b, c, d = mySeries.ravel()
print(a, b, c, d)
print()
print(mySeries)
tn, fp, fn, tp = confusion_matrix(y_test, y_test_pred).ravel()
print(tn, fp, fn, tp)
# Plot confusion matrix
plot_confusion_matrix(pipe, X_test, y_test, values_format = 'd', cmap='Blues')
plt.title('Confusion Matrix')
plt.grid(False)
# Classification Report
print(classification_report(y_test, y_test_pred))
# Predict Probabilities
probs = pipe.predict_proba(X_test)
preds1 = probs[:, 0]
preds2 = probs[:, 1]
# display first 10 rows for probs
probs[:10]
y_pred = pipe.predict(X_test)
# display first 10 rows for y_pred
y_pred[:10]
Note that the 1s correspond to instances where the second column in probs
>= 0.5
p = 0.2
probs = pipe.predict_proba(X_test)
# y_pred_1 = True if second column in pipe.predict_proba(X_test) is greater than or equal to p
# y_pred_1 = False otherwise
y_pred_1 = (probs[:,1] >= p)
# Display first 10 rows in y_pred_1
y_pred_1[:10]
# Replace True with 1 and False with -1
y_pred_1 = np.where(y_pred_1 == True, 1, y_pred_1)
y_pred_1 = np.where(y_pred_1 == False, -1, y_pred_1)
# Getting the TP, TN, FP, FN values when threshold = p
tn, fp, fn, tp = confusion_matrix(y_test, y_pred_1).ravel()
print(tn, fp, fn, tp)
print(f'FPR = {(fp/(tn+fp)).round(6)}, TPR = {(tp/(tp+fn)).round(6)}')
# probs[:, 1] gives the probability of y = 1
fpr, tpr, threshold = roc_curve(y_test, probs[:, 1], pos_label=1)
fpr, tpr, threshold
FPR (0.990741) and TPR (0.986425) calculated using threshold = 0.2 match the values given in the arrays above (2nd last value in each array).
fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(20,10))
fpr1, tpr1, threshold1 = roc_curve(y_test, preds1, pos_label=-1)
roc_auc1 = auc(fpr1, tpr1)
fpr2, tpr2, threshold2 = roc_curve(y_test, preds2, pos_label=1)
roc_auc2 = auc(fpr2, tpr2)
ax[0].plot([0, 1], [0, 1], 'r--')
ax[0].plot(fpr1, tpr1, 'cornflowerblue', label=f'AUC = {roc_auc1:0.2}', marker='o', ms = 15)
ax[0].set_title("Receiver Operating Characteristic for Down Moves")
ax[0].set_xlabel('False Positive Rate')
ax[0].set_ylabel('True Positive Rate')
ax[1].plot([0, 1], [0, 1], 'r--')
ax[1].plot(fpr2, tpr2, 'cornflowerblue', label=f'AUC = {roc_auc2:0.2}', marker='o', ms = 15)
ax[1].set_title("Receiver Operating Characteristic for Up Moves")
ax[1].set_xlabel('False Positive Rate')
ax[1].set_ylabel('True Positive Rate')
# Define legend
ax[0].legend(), ax[1].legend();
tscv = TimeSeriesSplit(n_splits=3)
for train, test in tscv.split(data[:, :-1]):
print(train, test)
# Perform Gridsearch and fit
param_grid = {"classifier__n_neighbors": np.arange(1,51,1)}
grid_search = GridSearchCV(pipe, param_grid, scoring='roc_auc', n_jobs=-1, cv=TimeSeriesSplit(n_splits=5), verbose=1)
grid_search.fit(X_train, y_train)
# Getting the best parameters and best score
# Best Params
print(f'Best Parameters = {grid_search.best_params_}')
# Best Score
print(f'Best AUC Score = {grid_search.best_score_}')
clf = KNeighborsClassifier(n_neighbors = grid_search.best_params_['classifier__n_neighbors'])
# Fit the model
clf.fit(X_train, y_train)
# Predicting the test dataset
y_pred = clf.predict(X_test)
# Measure Accuracy
acc_train = accuracy_score(y_train, clf.predict(X_train))
acc_test = accuracy_score(y_test, y_pred)
# Print Accuracy
print(f'\n Training Accuracy \t: {acc_train :0.4} \n Test Accuracy \t\t: {acc_test :0.4}')
df.head()
# Create a new Dataframe using the X_test instances
btdf = df[-len(X_test):]
btdf.head()
# Add a Signal Column
# Signal = predicted y values
btdf['Signal'] = clf.predict(X_test)
# Compute SPY's log returns
# log returns today = log of adj close today - log of adj close yesterday
btdf['Returns'] = np.log(btdf['Adj Close']).diff().fillna(0)
btdf.head()
# Compute log returns of KNN strategy
# today's knn returns = yesterday's signal * today's spy returns
btdf['Strategy'] = btdf['Returns'] * (btdf['Signal'].shift(1)).fillna(0)
btdf.tail(20)
# Cumulative Returns
# = exp(ln(today's close) - ln(yesterday's close) + ln(yesterday's close) - ..... - ln(Day 1's close))
# = exp(ln(today's close/close of Day 1))
# = today's close/close of Day 1
btdf['cumretspy'] = btdf['Returns'].cumsum().apply(np.exp)
btdf['cumretknn'] = btdf['Strategy'].cumsum().apply(np.exp)
btdf.head()
# Plot graph iteratively
fig, ax = plt.subplots(3, 2, figsize=(20,20))
# fig, ax = plt.subplots(2, 2, figsize=(20,10))
# 2017
ax[0,0].plot(btdf['cumretspy']['2017'], label ='SPY', color ='cornflowerblue')
ax[0,0].plot(btdf['cumretknn']['2017'], label ='Strategy', color ='crimson')
# 2018
ax[0,1].plot(btdf['cumretspy']['2018'], label ='SPY', color ='cornflowerblue')
ax[0,1].plot(btdf['cumretknn']['2018'], label ='Strategy', color ='crimson')
# 2019
ax[1,0].plot(btdf['cumretspy']['2019'], label ='SPY', color ='cornflowerblue')
ax[1,0].plot(btdf['cumretknn']['2019'], label ='Strategy', color ='crimson')
# 2020
ax[1,1].plot(btdf['cumretspy']['2020'], label ='SPY', color ='cornflowerblue')
ax[1,1].plot(btdf['cumretknn']['2020'], label ='Strategy', color ='crimson')
# 2021
ax[2,0].plot(btdf['cumretspy']['2021'], label ='SPY', color ='cornflowerblue')
ax[2,0].plot(btdf['cumretknn']['2021'], label ='Strategy', color ='crimson')
# Set axis title
ax[0,0].set_title('2017'), ax[0,1].set_title('2018'), ax[1,0].set_title('2019'), ax[1,1].set_title('2020')
ax[2,0].set_title('2021')
# Define legend
ax[0,0].legend(), ax[0,1].legend(), ax[1,0].legend(), ax[1,1].legend()
ax[2,0].legend()
fig.suptitle('Trading Strategy Performance', fontsize=14);
# Import pyfolio
import pyfolio as pf
pf.create_simple_tear_sheet(btdf['Strategy'])
pf.create_simple_tear_sheet(btdf['Returns'])