import time import numpy as np from numpy import newaxis import matplotlib.pyplot as plt import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim class LSTMTimeSeries(nn.Module): def __init__(self, layers): super(LSTMTimeSeries, self).__init__() self.lstm1 = nn.LSTM(input_size = layers[0], hidden_size = layers[1]) self.lstm2 = nn.LSTM(input_size = layers[1], hidden_size = layers[2]) self.linear = nn.Linear(in_features = layers[2], out_features = layers[3]) self.hidden_dim1 = layers[1] self.hidden_dim2 = layers[2] def init_hidden1(self, batch_size): # The axes semantics are (num_layers, minibatch_size, hidden_dim) return torch.zeros(1, batch_size, self.hidden_dim1), torch.zeros(1, batch_size, self.hidden_dim1) def init_hidden2(self, batch_size): # The axes semantics are (num_layers, minibatch_size, hidden_dim) return torch.zeros(1, batch_size, self.hidden_dim2), torch.zeros(1, batch_size, self.hidden_dim2) def forward(self, batch): batch_size = batch.shape[0] # Reshape batch into (sequence length x batch size x features) batch = batch.transpose(0, 1) # Run LSTM1. hidden1, cell1 = self.init_hidden1(batch_size) output1, (hidden1, cell1) = self.lstm1(batch, (hidden1, cell1)) # Run LSTM2 hidden2, cell2 = self.init_hidden2(batch_size) output2, (hidden2, cell2) = self.lstm2(output1, (hidden2, cell2)) # Run Linear layer, only on the last position in the sequence. output = self.linear(output2[-1]) return output.reshape(output.shape[0]) def predict_point_by_point(self, data): #Predict each timestep given the last sequence of true data, in effect only predicting 1 step ahead each time predicted = self.forward(data) predicted = np.reshape(predicted, (predicted.size,)) return predicted def predict_sequence_full(self, data, window_size): #Shift the window by 1 new prediction each time, re-run predictions on new window curr_frame = data[0] predicted = [] for i in range(len(data)): t_frame = torch.tensor(curr_frame[newaxis,:,:]).type(torch.FloatTensor) t_pred = self(t_frame) predicted.append(t_pred.data.numpy()[0]) curr_frame = curr_frame[1:] curr_frame = np.insert(curr_frame, [window_size-1], predicted[-1], axis=0) return predicted def load_data(filename, seq_len): f = open(filename, 'rb').read() data = f.decode().split('\n') data = [float(p) for p in data] sequence_length = seq_len + 1 result = [] for index in range(len(data) - sequence_length): result.append(data[index: index + sequence_length]) result = np.array(result) # Use first 90% of the data for training. row = round(0.9 * result.shape[0]) train = result[:int(row), :] np.random.shuffle(train) x_train = train[:, :-1] # first seq_len observations are for training. y_train = train[:, -1] # the label is the last observation. # Use last 10% of the data for testing. x_test = result[int(row):, :-1] y_test = result[int(row):, -1] x_train = np.reshape(x_train, (x_train.shape[0], x_train.shape[1], 1)) x_test = np.reshape(x_test, (x_test.shape[0], x_test.shape[1], 1)) return [x_train, y_train, x_test, y_test] def plot_results(predicted_data, true_data, show = True): fig = plt.figure(facecolor='white') ax = fig.add_subplot(111) ax.plot(true_data, label='True Data') plt.plot(predicted_data, label='Prediction') plt.legend() plt.title("LSTM time series") if show: plt.show() else: plt.savefig('plot_results.jpg')