## preface

The following article mainly teaches you how to build a simple prediction model based on Transformer and use it in stock price prediction. The original code is obtained at the end of the text.

## 1. Transformer model

Transformer is a classic NLP model proposed by Google's team in 2017. Now the popular Bert is also based on transformer. The transformer model uses the self attention mechanism and does not use the sequential structure of RNN, so that the model can be trained in parallel and can have global information. The main purpose of this article is to take you through the pytoch framework to build a simple stock price prediction model based on transformer.

## Basic architecture of Transformer

Specifically, we use the closing price data of Shanghai stock index as an example to predict the closing price at t+1. It should be noted that this paper just takes you through such a simple basic model to sort out the process of data preprocessing, model construction and model evaluation. There are many improvements in the model, such as selecting more meaningful features, how to make effective multi-step prediction, and so on.

## 2. Environmental preparation

Local environment:

Python 3.7 IDE:Pycharm

Library version:

numpy 1.18.1 pandas 1.0.3 sklearn 0.22.2 matplotlib 3.2.1 torch 1.10.1

## 3. Code implementation

## 3.1. import library and define super parameters

First, you need to import the library and set some super parameters of the model. Where, input_window and output_window is used to set the length of input data and output data respectively. Of course, these parameters can also be modified according to the actual application scenario.

Python exchange of learning Q Group: 906715085### import torch import torch.nn as nn import numpy as np import time import math import matplotlib.pyplot as plt from sklearn.preprocessing import MinMaxScaler import pandas as pd torch.manual_seed(0) np.random.seed(0) input_window = 20 output_window = 1 batch_size = 64 device = torch. device("cuda" if torch.cuda.is_available() else "cpu") print(device)

## 3.2. Model construction

A very important component of Transformer is to propose a new way of location coding. We know that recurrent neural network itself is a kind of sequential structure, which naturally contains the position information of words in the sequence. When we abandon the structure of recurrent neural network and completely adopt Attention instead, these word order information will be lost, and the model has no way to know the relative and absolute position information of each word in the sentence. Therefore, it is necessary to add the word order signal to the word vector to help the model learn this information. PositionalEncoding is used to solve this problem. Its principle is to add sine and cosine data of different frequencies to the input sequence as position codes, so that the model can capture the relative position relationship of input variables.

class PositionalEncoding(nn.Module): def __init__(self, d_model, max_len=5000): super(PositionalEncoding, self).__init__() pe = torch.zeros(max_len, d_model) position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(0).transpose(0, 1) self.register_buffer('pe', pe) def forward(self, x): return x + self.pe[:x.size(0), :]

After that, build the basic structure of Transformer. There are encapsulated Transformer components that have been implemented in pytoch, which can be easily called and modified. It should be noted that the encoder Decoder architecture in the original paper is not used in this paper, but a full connection layer is used to replace the Decoder to output the predicted value. In addition, create_mask will be input to mask, so as to avoid introducing future information.

class TransAm(nn.Module): def __init__(self, feature_size=250, num_layers=1, dropout=0.1): super(TransAm, self).__init__() self.model_type = 'Transformer' self.src_mask = None self.pos_encoder = PositionalEncoding(feature_size) self.encoder_layer = nn.TransformerEncoderLayer(d_model=feature_size, nhead=10, dropout=dropout) self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers) self.decoder = nn.Linear(feature_size, 1) self.init_weights() def init_weights(self): initrange = 0.1 self.decoder.bias.data.zero_() self.decoder.weight.data.uniform_(-initrange, initrange) def forward(self, src): if self.src_mask is None or self.src_mask.size(0) != len(src): device = src.device mask = self._generate_square_subsequent_mask(len(src)).to(device) self.src_mask = mask src = self.pos_encoder(src) output = self.transformer_encoder(src, self.src_mask) output = self.decoder(output) return output def _generate_square_subsequent_mask(self, sz): mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1) mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0)) return mask

## 3.3 data preprocessing

Next, we need to preprocess the data. First, we need to define a window partition function. Its function is to delay the input according to the output_windw is used to divide the data and its labels. In this paper, one-step prediction is carried out, so assuming that the input is 1 to 20, the label is 2 to 21, so as to adapt to the output in the form of Transformer's seq2seq.

def create_inout_sequences(input_data, tw): inout_seq = [] L = len(input_data) for i in range(L - tw): train_seq = input_data[i:i + tw] train_label = input_data[i + output_window:i + tw + output_window] inout_seq.append((train_seq, train_label)) return torch.FloatTensor(inout_seq)

Then divide the training set and test set, in which the first 70% of the data is used for model training, and the later data is used for model testing. Specifically, we use the previous input_window closing price to predict the closing price data of the next moment.

def get_data(): series = pd.read_csv('./000001_Daily.csv', usecols=['Close']) # series = pd.read_csv('./daily-min-temperatures.csv', usecols=['Temp']) scaler = MinMaxScaler(feature_range=(-1, 1)) series = scaler.fit_transform(series.values.reshape(-1, 1)).reshape(-1) train_samples = int(0.7 * len(series)) train_data = series[:train_samples] test_data = series[train_samples:] train_sequence = create_inout_sequences(train_data, input_window) train_sequence = train_sequence[:-output_window] test_data = create_inout_sequences(test_data, input_window) test_data = test_data[:-output_window] return train_sequence.to(device), test_data.to(device)

Next, a databatch generator is implemented to read data in the form of batch from the data.

def get_batch(source, i, batch_size): seq_len = min(batch_size, len(source) - 1 - i) data = source[i:i + seq_len] input = torch.stack(torch.stack([item[0] for item in data]).chunk(input_window, 1)) target = torch.stack(torch.stack([item[1] for item in data]).chunk(input_window, 1)) return input, target

## 3.4 model training and evaluation

The following is the code of model training. Specifically, through traversing the training set and the established loss, the parameters are back propagated, in which the gradient clipping technique is used to prevent the gradient explosion, and then the loss is printed every few intervals.

def train(train_data): model.train() for batch_index, i in enumerate(range(0, len(train_data) - 1, batch_size)): start_time = time.time() total_loss = 0 data, targets = get_batch(train_data, i, batch_size) optimizer.zero_grad() output = model(data) loss = criterion(output, targets) loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), 0.7) optimizer.step() total_loss += loss.item() log_interval = int(len(train_data) / batch_size / 5) if batch_index % log_interval == 0 and batch_index > 0: cur_loss = total_loss / log_interval elapsed = time.time() - start_time print('| epoch {:3d} | {:5d}/{:5d} batches | lr {:02.6f} | {:5.2f} ms | loss {:5.5f} | ppl {:8.2f}' .format(epoch, batch_index, len(train_data) // batch_size, scheduler.get_lr()[0], elapsed * 1000 / log_interval, cur_loss, math.exp(cur_loss)))

Next is the code to evaluate the model.

def evaluate(eval_model, data_source): eval_model.eval() total_loss = 0 eval_batch_size = 1000 with torch.no_grad(): for i in range(0, len(data_source) - 1, eval_batch_size): data, targets = get_batch(data_source, i, eval_batch_size) output = eval_model(data) total_loss += len(data[0]) * criterion(output, targets).cpu().item() return total_loss / len(data_source)

Finally, the visualization of the running process of the model.

def plot_and_loss(eval_model, data_source, epoch): eval_model.eval() total_loss = 0. test_result = torch.Tensor(0) truth = torch.Tensor(0) with torch.no_grad(): for i in range(0, len(data_source) - 1): data, target = get_batch(data_source, i, 1) output = eval_model(data) total_loss += criterion(output, target).item() test_result = torch.cat((test_result, output[-1].view(-1).cpu()), 0) truth = torch.cat((truth, target[-1].view(-1).cpu()), 0) plt.plot(test_result, color="red") plt.plot(truth, color="blue") plt.grid(True, which='both') plt.axhline(y=0, color='k') plt.savefig('graph/transformer-epoch%d.png' % epoch) plt.close() return total_loss / i

## 3.5. model operation

Finally, run the model. mse is used as the loss, adam as the optimizer, and the scheduler for setting the learning rate. Finally, 200 epochs are run, and the model is evaluated on the test set every 10 epochs.

train_data, val_data = get_data() model = TransAm().to(device) criterion = nn.MSELoss() lr = 0.005 optimizer = torch.optim.AdamW(model.parameters(), lr=lr) scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1, gamma=0.95) epochs = 200 for epoch in range(1, epochs + 1): epoch_start_time = time.time() train(train_data) if (epoch % 10 is 0): val_loss = plot_and_loss(model, val_data, epoch) else: val_loss = evaluate(model, val_data) print('-' * 89) print('| end of epoch {:3d} | time: {:5.2f}s | valid loss {:5.5f} | valid ppl {:8.2f}'.format(epoch, ( time.time() - epoch_start_time), val_loss, math.exp(val_loss))) print('-' * 89) scheduler.step()

The following is the result of the operation. You can see that the loss is significantly reduced.

cuda| epoch 1 | 2/ 10 batches | lr 0.005000 | 7.83 ms | loss 39.99368 | ppl 233902099994043520.00| epoch 1 | 4/ 10 batches | lr 0.005000 | 7.81 ms | loss 7.20889 | ppl 1351.39| epoch 1 | 6/ 10 batches | lr 0.005000 | 11.10 ms | loss 1.68758 | ppl 5.41| epoch 1 | 8/ 10 batches | lr 0.005000 | 9.35 ms | loss 0.00833 | ppl 1.01| epoch 1 | 10/ 10 batches | lr 0.005000 | 7.81 ms | loss 1.18041 | ppl 3.26-----------------------------------------------------------------------------------------| end of epoch 1 | time: 1.96s | valid loss 2.58557 | valid ppl 13.27 ... | end of epoch 198 | time: 0.30s | valid loss 0.00032 | valid ppl 1.00-----------------------------------------------------------------------------------------| epoch 199 | 2/ 10 batches | lr 0.000000 | 15.62 ms | loss 0.00057 | ppl 1.00| epoch 199 | 4/ 10 batches | lr 0.000000 | 15.62 ms | loss 0.00184 | ppl 1.00| epoch 199 | 6/ 10 batches | lr 0.000000 | 15.62 ms | loss 0.00212 | ppl 1.00| epoch 199 | 8/ 10 batches | lr 0.000000 | 7.81 ms | loss 0.00073 | ppl 1.00| epoch 199 | 10/ 10 batches | lr 0.000000 | 7.81 ms | loss 0.00057 | ppl 1.00-----------------------------------------------------------------------------------------| end of epoch 199 | time: 0.30s | valid loss 0.00032 | valid ppl 1.00-----------------------------------------------------------------------------------------| epoch 200 | 2/ 10 batches | lr 0.000000 | 15.62 ms | loss 0.00053 | ppl 1.00| epoch 200 | 4/ 10 batches | lr 0.000000 | 7.81 ms | loss 0.00177 | ppl 1.00| epoch 200 | 6/ 10 batches | lr 0.000000 | 7.81 ms | loss 0.00224 | ppl 1.00| epoch 200 | 8/ 10 batches | lr 0.000000 | 15.62 ms | loss 0.00069 | ppl 1.00| epoch 200 | 10/ 10 batches | lr 0.000000 | 7.81 ms | loss 0.00049 | ppl 1.00-----------------------------------------------------------------------------------------| end of epoch 200 | time: 0.62s | valid loss 0.00032 | valid ppl 1.00-----------------------------------------------------------------------------------------

Finally, the fitting effect of the model. From the experimental results, we can see that the simple Transformer model we built can achieve relatively good data fitting effect.

## 4. Summary

In this article, we introduce how to build a Transformer based stock prediction model based on pytoch framework, and experiment the model with real stock data, which shows that the Transformer model has a certain effect on stock price prediction. In addition, this paper only makes a simple demo, in which there are still many improvements, such as using more meaningful input data, optimizing some components, etc. In addition, at present, models based on Transformer emerge in endlessly, and many of them are worth learning. You can also use more advanced Transformer models for experiments.

First of all, I would like to introduce myself. I graduated from Jiaotong University in 13 years. I once worked in a small company, went to large factories such as Huawei OPPO, and joined Alibaba in 18 years, until now. I know that most junior and intermediate Java engineers who want to improve their skills often need to explore and grow by themselves or sign up for classes, but there is a lot of pressure on training institutions to pay nearly 10000 yuan in tuition fees. The self-study efficiency of their own fragmentation is very low and long, and it is easy to encounter the ceiling technology to stop. Therefore, I collected a "full set of learning materials for java development" and gave it to you. The original intention is also very simple. I hope to help friends who want to learn by themselves and don't know where to start, and reduce everyone's burden at the same time. Add the business card below to get a full set of learning materials