Sequence-to-sequence (seq2seq) models are powerful architectures for tasks that transform one sequence into another, such as machine translation. These models employ an encoder-decoder architecture, where the encoder processes the input sequence and the decoder generates an output sequence based on the encoder’s output. The attention mechanism was developed for seq2seq models, and understanding how seq2seq works helps clarify the rationale behind attention. In this post, you will explore how to build and train a plain seq2seq model with LSTM for language translation. Specifically:
- How to implement an encoder-decoder architecture with LSTM cells in PyTorch
- How to train the model using sentence pairs from a dataset
- How to generate a variable-length sequence with a seq2seq model
Kick-start your project with my book Building Transformer Models From Scratch with PyTorch. It provides self-study tutorials with working code.
Let’s get started.

Building a Plain Seq2Seq Model for Language Translation
Photo by Pourya Gohari. Some rights reserved.
Overview
This post is divided into five parts; they are:
- Preparing the Dataset for Training
- Implementing the Seq2Seq Model with LSTM
- Training the Seq2Seq Model
- Using the Seq2Seq Model
- Improving the Seq2Seq Model
Preparing the Dataset for Training
In a previous post, you learned how to build a transformer model for translating French sentences to English. In this post, you will reuse the same dataset and build a seq2seq model for the same task.
The seq2seq model consists of two main components: an encoder and a decoder. The encoder processes the input sequence (French sentences) and generates a fixed-size representation, known as the context vector. The decoder then uses this context vector to generate the output sequence (English sentences) one token at a time.
To train such a model, you need a dataset of sentence pairs. The model learns how to translate from the example sentence pairs in the dataset. You can source your own dataset. In this post, you will use the Anki dataset, which can be downloaded from https://www.manythings.org/anki/, and you can also use the copy hosted in Google:
|
1 2 3 4 5 6 7 8 |
import os import requests if not os.path.exists("fra-eng.zip"): url = "http://storage.googleapis.com/download.tensorflow.org/data/fra-eng.zip" response = requests.get(url) with open("fra-eng.zip", "wb") as f: f.write(response.content) |
This is how you can use the requests library to download a file in Python. This zip file contains only one file, fra.txt, which is a plain text file. Each line consists of an English sentence, followed by a tab character, and then a corresponding sentence in French.
To make the data useful for training, it needs to be normalized. Firstly, French sentences are in Unicode, but some characters may have multiple representations. To help your model understand the sentence better, you want to normalize the Unicode representations, such as NFKC. You may also want to convert the alphabet to lowercase to reduce the size of the vocabulary (since the model will consider the same word in different cases as different words). You can read the sentence pairs and perform the normalization as follows:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
import unicodedata import zipfile def normalize(line): """Normalize a line of text and split into two at the tab character""" line = unicodedata.normalize("NFKC", line.strip().lower()) eng, fra = line.split("\t") return eng.lower().strip(), fra.lower().strip() text_pairs = [] with zipfile.ZipFile("fra-eng.zip", "r") as zip_ref: for line in zip_ref.read("fra.txt").decode("utf-8").splitlines(): eng, fra = normalize(line) text_pairs.append((eng, fra)) |
The model you will build is a seq2seq model using LSTM. It is a recurrent neural network that can handle variable-length sequences. It cannot handle sequences of words directly but needs them to be tokenized and encoded into numerical form first. You can create a dictionary as a tokenizer to map each word in the vocabulary to a unique integer. You can also use a more advanced technique, such as Byte Pair Encoding (BPE), which allows it to handle unknown words more effectively by recognizing subword units. Let’s create a separate tokenizer for English and French, respectively:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
import os import tokenizers if os.path.exists("en_tokenizer.json") and os.path.exists("fr_tokenizer.json"): en_tokenizer = tokenizers.Tokenizer.from_file("en_tokenizer.json") fr_tokenizer = tokenizers.Tokenizer.from_file("fr_tokenizer.json") else: en_tokenizer = tokenizers.Tokenizer(tokenizers.models.BPE()) fr_tokenizer = tokenizers.Tokenizer(tokenizers.models.BPE()) # Configure pre-tokenizer to split on whitespace and punctuation, add space at beginning of the sentence en_tokenizer.pre_tokenizer = tokenizers.pre_tokenizers.ByteLevel(add_prefix_space=True) fr_tokenizer.pre_tokenizer = tokenizers.pre_tokenizers.ByteLevel(add_prefix_space=True) # Configure decoder: So that word boundary symbol "Ġ" will be removed en_tokenizer.decoder = tokenizers.decoders.ByteLevel() fr_tokenizer.decoder = tokenizers.decoders.ByteLevel() # Train BPE for English and French using the same trainer VOCAB_SIZE = 8000 trainer = tokenizers.trainers.BpeTrainer( vocab_size=VOCAB_SIZE, special_tokens=["[start]", "[end]", "[pad]"], show_progress=True ) en_tokenizer.train_from_iterator([x[0] for x in text_pairs], trainer=trainer) fr_tokenizer.train_from_iterator([x[1] for x in text_pairs], trainer=trainer) en_tokenizer.enable_padding(pad_id=en_tokenizer.token_to_id("[pad]"), pad_token="[pad]") fr_tokenizer.enable_padding(pad_id=fr_tokenizer.token_to_id("[pad]"), pad_token="[pad]") # Save the trained tokenizers en_tokenizer.save("en_tokenizer.json", pretty=True) fr_tokenizer.save("fr_tokenizer.json", pretty=True) |
Here, the BPE tokenizer is from the tokenizers library. The trained tokenizers are saved to en_tokenizer.json and fr_tokenizer.json for future use. To train the BPE, you need to specify the maximum vocabulary size. The code above sets it to 8000, which is a small number (consider that this dataset has around 15,000 unique words in English and 30,000 unique words in French). You can increase the vocabulary size if you think the model is not performing the translation well. There are some special handling implementations in the BPE above:
- The pre-tokenizer splits the text on whitespace and punctuation by default. But you also added a space at the beginning of the sentence so that all words are prefixed by a space. This helps to reuse vocabulary regardless of the word’s position in the sentence.
- Three special tokens are added to the vocabulary:
[start],[end], and[pad]. These tokens are added before the tokenizer is trained. The[pad]token, in particular, is set as the padding token to fill up the sentence to a longer sequence length
The BPE tokenizers are trained from the dataset, as stored in the list of string pairs text_pairs. The same trainer is used for both languages in the code above but the tokenizers are separate.
After the tokenizers are trained, you can test them on a few sentences:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
# Test the tokenizer print("Sample tokenization:") en_sample, fr_sample = random.choice(text_pairs) encoded = en_tokenizer.encode(en_sample) print(f"Original: {en_sample}") print(f"Tokens: {encoded.tokens}") print(f"IDs: {encoded.ids}") print(f"Decoded: {en_tokenizer.decode(encoded.ids)}") print() encoded = fr_tokenizer.encode("[start] " + fr_sample + " [end]") print(f"Original: {fr_sample}") print(f"Tokens: {encoded.tokens}") print(f"IDs: {encoded.ids}") print(f"Decoded: {fr_tokenizer.decode(encoded.ids)}") print() |
The output would be like the following:
|
1 2 3 4 5 6 7 8 9 10 |
Sample tokenization: Original: it happens to all of us. Tokens: ['Ġit', 'Ġhappens', 'Ġto', 'Ġall', 'Ġof', 'Ġus', '.'] IDs: [124, 1689, 80, 208, 128, 238, 12] Decoded: it happens to all of us. Original: ça nous arrive à tous. Tokens: ['[start]', 'Ġça', 'Ġnous', 'Ġarrive', 'ĠÃł', 'Ġtous', '.', 'Ġ', '[end]'] IDs: [0, 220, 159, 1621, 123, 392, 14, 74, 1] Decoded: ça nous arrive à tous. |
Seq2Seq Architecture with LSTM
Traditionally, handling a sequence of arbitrary length using a neural network requires a recurrent neural network (RNN) architecture. It is a type of neural network where a module maintains a hidden state and updates it as it processes the sequence.
Several modules can be used to implement an RNN. LSTM is one of them. Building a simple LSTM encoder for the input sequence is straightforward:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
import torch import torch.nn as nn class EncoderLSTM(nn.Module): def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers=1, dropout=0.1): super().__init__() self.vocab_size = vocab_size self.embedding_dim = embedding_dim self.hidden_dim = hidden_dim self.num_layers = num_layers self.embedding = nn.Embedding(vocab_size, embedding_dim) self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers, batch_first=True, dropout=dropout if num_layers > 1 else 0) def forward(self, input_seq): embedded = self.embedding(input_seq) outputs, (hidden, cell) = self.lstm(embedded) return outputs, hidden, cell |
LSTM is special because it has two hidden states, named hidden and cell in the code above. In PyTorch, you don’t need to implement the recurrent structure. The module nn.LSTM can handle this well.
In the implementation above, you implemented the encoder part of the seq2seq model as a class derived from nn.Module. You expect to pass on a 2D tensor of integer IDs as a batch of input sequences. This input will be converted into a 3D tensor by replacing each token ID with an embedding vector. In the forward() function above, the variable embedded is a 3D tensor of shape (batch_size, seq_len, embedding_dim). This is then processed by the LSTM module. The output of the LSTM module is a 3D tensor of shape (batch_size, seq_len, hidden_dim), which corresponds to the hidden states of the LSTM at each step while processing the input sequence. The final hidden state and cell state are also returned.
Note that you created the LSTM module with batch_first=True, which means that the first dimension of the input tensor is the batch size. This is a common convention in language data. This module also sets num_layers in LSTM to 1 by default. It is believed that a multi-layer LSTM is more powerful; however, you will build a larger model and require a longer training time.
Creating the decoder part of the seq2seq model is similar, except that you also need to produce the output:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
class DecoderLSTM(nn.Module): def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers=1, dropout=0.1): super().__init__() self.vocab_size = vocab_size self.embedding_dim = embedding_dim self.hidden_dim = hidden_dim self.num_layers = num_layers self.embedding = nn.Embedding(vocab_size, embedding_dim) self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers, batch_first=True, dropout=dropout if num_layers > 1 else 0) self.out = nn.Linear(embedding_dim, vocab_size) def forward(self, input_seq, hidden, cell): embedded = self.embedding(input_seq) output, (hidden, cell) = self.lstm(embedded, (hidden, cell)) prediction = self.out(output) return prediction, hidden, cell |
The decoder LSTM is similar to the encoder LSTM. In the forward() method, the input sequence is the partial target sequence, and the hidden and cell states are the last hidden and cell states from the encoder’s LSTM module. When the decoder’s LSTM module is called, the encoder’s hidden and cell states are used. If not provided, the hidden and cell states are initialized to zeros, as in the encoder.
The input to the forward method is a 2D tensor of token IDs. This needs to be converted into a 3D tensor by the embedding layer before the LSTM module can consume it. The output of the LSTM module is a sequence of hidden states. They should be converted into a logit vector by a linear layer to predict the next token.
The design of the decoder module expects you to pass on a partial target sequence of shape (batch_size, seq_len). The forward() method returns you a predicted sequence of shape (batch, seq_len+1, hidden_dim), which is the output from the LSTM module, transformed by the linear layer. You take the last token in the sequence length dimension as the predicted next token. You need to call the decoder module multiple times to generate the entire target sequence.
To build a complete seq2seq model, you need to connect the encoder and decoder modules. This is how you can do it:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
class Seq2SeqLSTM(nn.Module): def __init__(self, encoder, decoder): super().__init__() self.encoder = encoder self.decoder = decoder def forward(self, input_seq, target_seq): batch_size, target_len = target_seq.shape device = target_seq.device outputs = [] _enc_out, hidden, cell = self.encoder(input_seq) dec_in = target_seq[:, :1] for t in range(target_len-1): pred, hidden, cell = self.decoder(dec_in, hidden, cell) pred = pred[:, -1:, :] outputs.append(pred) dec_in = torch.cat([dec_in, pred.argmax(dim=2)], dim=1) outputs = torch.cat(outputs, dim=1) return outputs |
This module just connects the encoder and decoder modules. The forward() method is created to help train the model. It takes the input sequence (in English) and the target sequence (in French) as input. The English sentence will be converted into “context vectors” using the encoder. The encoder also outputs a processed sequence, but it is not used.
The decoder set up the context vector as provided by the encoder in its LSTM module. Then process a partial target sequence to produce the next-token prediction. Initially, the decoder begins with the special token [start]. Iteratively, it produces one more token at a time until the length of the target sequence is filled.
Note that the model above does not read the content of the target sequence, but uses its length to control the number of iterations. Also, note that the same decoder is called multiple times within a single call to the forward() method.
Training the Seq2Seq Model
To train the above model for English-to-French translation, you need to create a dataset object such that you can iterate over the dataset in batches and in random order. You already collected the data in the previous section and stored it as text_pairs. PyTorch provides a Dataset class to help you shuffle and batch the data. This is how you can create a dataset object:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
import torch from torch.utils.data import Dataset, DataLoader class TranslationDataset(Dataset): def __init__(self, text_pairs): self.text_pairs = text_pairs def __len__(self): return len(self.text_pairs) def __getitem__(self, idx): en, fr = self.text_pairs[idx] return eng, "[start] " + fra + " [end]" def collate_fn(batch): en_str, fr_str = zip(*batch) en_enc = en_tokenizer.encode_batch(en_str, add_special_tokens=True) fr_enc = fr_tokenizer.encode_batch(fr_str, add_special_tokens=True) en_ids = [enc.ids for enc in en_enc] fr_ids = [enc.ids for enc in fr_enc] return torch.tensor(en_ids), torch.tensor(fr_ids) BATCH_SIZE = 32 dataset = TranslationDataset(text_pairs) dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn) |
You can try to print one sample from the dataset:
|
1 2 3 4 |
for en_ids, fr_ids in dataloader: print(f"English: {en_ids}") print(f"French: {fr_ids}") break |
The dataloader object is an iterable that scans the entire dataset in a random order. It returns a tuple of two tensors, each of shape (batch_size, seq_len). You will see that the two tensors you printed are integers, as token IDs are represented by integers.
The dataloader is created with the collate_fn() function. PyTorch dataloader only collects elements from a dataset object as a list, and each element in this case is a tuple of two strings. The collate function converts the strings into token IDs using the BPE tokenizers and then creates a PyTorch tensor.
The next step is to create a model. It is straightforward:
|
1 2 3 4 5 6 7 8 9 10 |
... device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') emb_dim = 256 hidden_dim = 256 num_layers = 1 encoder = EncoderLSTM(enc_vocab, emb_dim, hidden_dim, num_layers, dropout).to(device) decoder = DecoderLSTM(dec_vocab, emb_dim, hidden_dim, num_layers, dropout).to(device) model = Seq2SeqLSTM(encoder, decoder).to(device) print(model) |
This will print:
|
1 2 3 4 5 6 7 8 9 10 11 |
Seq2SeqLSTM( (encoder): EncoderLSTM( (embedding): Embedding(8000, 256) (lstm): LSTM(256, 256, batch_first=True) ) (decoder): DecoderLSTM( (embedding): Embedding(8000, 256) (lstm): LSTM(256, 256, batch_first=True) (out): Linear(in_features=256, out_features=8000, bias=True) ) ) |
So you can see that the model is very simple. In fact, there are only 7 million parameters, but it is large enough to require a sizable time to train.
The code for training is as follows:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
optimizer = optim.Adam(model.parameters(), lr=0.001) loss_fn = nn.CrossEntropyLoss(ignore_index=fr_tokenizer.token_to_id("[pad]")) N_EPOCHS = 30 for epoch in range(N_EPOCHS): model.train() epoch_loss = 0 for en_ids, fr_ids in dataloader: # Move the "sentences" to device en_ids = en_ids.to(device) fr_ids = fr_ids.to(device) # zero the grad, then forward pass optimizer.zero_grad() outputs = model(en_ids, fr_ids) # compute the loss: compare 3D logits to 2D targets loss = loss_fn(outputs.reshape(-1, dec_vocab), fr_ids[:, 1:].reshape(-1)) loss.backward() optimizer.step() epoch_loss += loss.item() print(f"Epoch {epoch+1}/{N_EPOCHS}; Avg loss {epoch_loss/len(dataloader)}; Latest loss {loss.item()}") torch.save(model.state_dict(), f"seq2seq-epoch-{epoch+1}.pth") # Test if (epoch+1) % 5 != 0: continue model.eval() epoch_loss = 0 with torch.no_grad(): for en_ids, fr_ids in dataloader: en_ids = en_ids.to(device) fr_ids = fr_ids.to(device) outputs = model(en_ids, fr_ids) loss = loss_fn(outputs.reshape(-1, dec_vocab), fr_ids[:, 1:].reshape(-1)) epoch_loss += loss.item() print(f"Eval loss: {epoch_loss/len(dataloader)}") |
This is a simple training loop, and many techniques for improved training are not implemented. For example, train-test split of the data, early stopping, and gradient clipping are not used. What it does is to read the dataset in batches, then run the model with forward and backward passes, then update the model parameters.
The loss function used is cross-entropy, as the model is to predict the next token among the vocabulary. When you create the overall model, it generates the entire output sequence that matches the length of the target sequence. Therefore, the loss function can compare the sequence in one shot, rather than computing the loss token by token. However, in this application, the tensors are batches of sequences, and the sequences will be padded to match the longest length. A sequence should be terminated with the [end] token. The positions of padding tokens should be included in the overall loss calculation. That’s why the ignore_index parameter is used when we create the loss function with nn.CrossEntropyLoss().
If you have a separate test set, you can use that for evaluation. In the above, you reused the training data for evaluation once every 5 epochs in the latter half of the for-loop. Remember to toggle the model between model.train() and model.eval() for the correct training/inference behavior.
Using the Model
In the code above, you saved the model at the end of each epoch using torch.save(). When you have the model file, you can load it back using:
|
1 2 3 4 5 6 |
... encoder = EncoderLSTM(enc_vocab, emb_dim, hidden_dim, num_layers, dropout).to(device) decoder = DecoderLSTM(dec_vocab, emb_dim, hidden_dim, num_layers, dropout).to(device) model = Seq2SeqLSTM(encoder, decoder).to(device) model.load_state_dict(torch.load("seq2seq.pth")) |
With a trained model, you can use it to generate translations. However, you do not use the same forward() method as in the training. Instead, you use a loop to call the decoder multiple times until the target sequence is generated.
Below is an implementation on how to perform translation of a few random sentences from the original dataset:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
import random model.eval() N_SAMPLES = 5 MAX_LEN = 60 with torch.no_grad(): start_token = torch.tensor([fr_tokenizer.token_to_id("[start]")]).to(device) for en, true_fr in random.sample(text_pairs, N_SAMPLES): en_ids = torch.tensor(en_tokenizer.encode(en).ids).unsqueeze(0).to(device) _output, hidden, cell = model.encoder(en_ids) pred_ids = [start_token] for _ in range(MAX_LEN): decoder_input = torch.tensor(pred_ids).unsqueeze(0).to(device) output, hidden, cell = model.decoder(decoder_input, hidden, cell) output = output[:, -1, :].argmax(dim=1) pred_ids.append(output.item()) # stop if the predicted token is the end token if pred_ids[-1] == fr_tokenizer.token_to_id("[end]"): break # Decode the predicted IDs pred_fr = fr_tokenizer.decode(pred_ids) print(f"English: {en}") print(f"French: {true_fr}") print(f"Predicted: {pred_fr}") print() |
Initially, switch the model into evaluation mode and run it under the context torch.no_grad(). This will save time and memory.
You pick a few samples from the dataset using random.sample(). The input sentence (English) is tokenized and encoded into the tensor en_ids. It is a 2D tensor of shape (1, seq_len), as the model always expects a batch of sequences, even if the batch size is 1.
You run the English sentence through the model’s encoder to extract the context vector, which represents the final state of the LSTM module. Then, you start with the special token [start] and generate the French sentence in a loop.
This is a typical loop to use the seq2seq model. You expect the model to generate the [end] token eventually; otherwise, you will stop the generation when the length of the generated sequence reaches the maximum length. In each iteration of the loop, you create a new input tensor for the decoder. Then the decoder will generate one extra token, as the last token in the decoder’s output sequence. This output is a logit vector of the size of the vocabulary. You take the token with the highest probability as the next token, via the argmax() method in PyTorch.
The list pred_ids accumulates the list of token IDs. Each iteration of the loop generates the input tensor for the decoder based on this list. When the loop terminates, you run the tokenizer again to convert the token IDs into a string of sentences.
When you run the code above, you may see the following output:
|
1 2 3 4 5 6 7 |
English: it was his silence that made her angry. French: ce fut son silence qui la mit en colère. Predicted: ce fut son silence qui qui mit colère colère. English: you're the teacher. French: tu es le professeur. Predicted: c'est professeur. |
Improving the Model
The above outlines how you can build a plain seq2seq model with LSTM for translation. As you can see from above, the output is not perfect. There are several ways to improve it:
- Improve the tokenizer: The vocabulary size used is small, which may limit the model’s ability to understand word meanings. You can improve the model by incorporating a larger vocabulary. But this may require more training data.
- Use a larger model: One layer of LSTM is used above, and you may see an improvement if you use more layers. You can also add dropout to the LSTM module to prevent overfitting when more layers are used.
- Improve the training: Split the dataset into training and test sets, and use the test set to evaluate the model. In this case, it is easier to determine which epoch produced the best model, allowing you to use it for inference or to early stop the training. You can also tell if the model is converged by monitoring the loss on the test set.
- Experiment with a different decoder model: The decoder above runs the entire target partial sequence with the encoder’s state as the initial state. Alternatively, you can pass on only the last token to the decoder to generate the next token. The difference is that the initial state is used directly to generate the next token in the latter, while the former will mutate the states by scanning the previously generated sequences. It is believed that a recurrent neural network is easy to “forget” the initial state (i.e., the context vector) when the sequence is long.
For completeness, below is the complete code you created in this post:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 |
import random import os import re import unicodedata import zipfile import requests import torch import torch.nn as nn import torch.optim as optim import tokenizers import tqdm # # Data preparation # # Download dataset provided by Anki: https://www.manythings.org/anki/ with requests if not os.path.exists("fra-eng.zip"): url = "http://storage.googleapis.com/download.tensorflow.org/data/fra-eng.zip" response = requests.get(url) with open("fra-eng.zip", "wb") as f: f.write(response.content) # Normalize text # each line of the file is in the format "<english>\t<french>" # We convert text to lowercasee, normalize unicode (UFKC) def normalize(line): """Normalize a line of text and split into two at the tab character""" line = unicodedata.normalize("NFKC", line.strip().lower()) eng, fra = line.split("\t") return eng.lower().strip(), fra.lower().strip() text_pairs = [] with zipfile.ZipFile("fra-eng.zip", "r") as zip_ref: for line in zip_ref.read("fra.txt").decode("utf-8").splitlines(): eng, fra = normalize(line) text_pairs.append((eng, fra)) # # Tokenization with BPE # if os.path.exists("en_tokenizer.json") and os.path.exists("fr_tokenizer.json"): en_tokenizer = tokenizers.Tokenizer.from_file("en_tokenizer.json") fr_tokenizer = tokenizers.Tokenizer.from_file("fr_tokenizer.json") else: en_tokenizer = tokenizers.Tokenizer(tokenizers.models.BPE()) fr_tokenizer = tokenizers.Tokenizer(tokenizers.models.BPE()) # Configure pre-tokenizer to split on whitespace and punctuation, add space at beginning of the sentence en_tokenizer.pre_tokenizer = tokenizers.pre_tokenizers.ByteLevel(add_prefix_space=True) fr_tokenizer.pre_tokenizer = tokenizers.pre_tokenizers.ByteLevel(add_prefix_space=True) # Configure decoder: So that word boundary symbol "Ġ" will be removed en_tokenizer.decoder = tokenizers.decoders.ByteLevel() fr_tokenizer.decoder = tokenizers.decoders.ByteLevel() # Train BPE for English and French using the same trainer VOCAB_SIZE = 8000 trainer = tokenizers.trainers.BpeTrainer( vocab_size=VOCAB_SIZE, special_tokens=["[start]", "[end]", "[pad]"], show_progress=True ) en_tokenizer.train_from_iterator([x[0] for x in text_pairs], trainer=trainer) fr_tokenizer.train_from_iterator([x[1] for x in text_pairs], trainer=trainer) en_tokenizer.enable_padding(pad_id=en_tokenizer.token_to_id("[pad]"), pad_token="[pad]") fr_tokenizer.enable_padding(pad_id=fr_tokenizer.token_to_id("[pad]"), pad_token="[pad]") # Save the trained tokenizers en_tokenizer.save("en_tokenizer.json", pretty=True) fr_tokenizer.save("fr_tokenizer.json", pretty=True) # Test the tokenizer print("Sample tokenization:") en_sample, fr_sample = random.choice(text_pairs) encoded = en_tokenizer.encode(en_sample) print(f"Original: {en_sample}") print(f"Tokens: {encoded.tokens}") print(f"IDs: {encoded.ids}") print(f"Decoded: {en_tokenizer.decode(encoded.ids)}") print() encoded = fr_tokenizer.encode("[start] " + fr_sample + " [end]") print(f"Original: {fr_sample}") print(f"Tokens: {encoded.tokens}") print(f"IDs: {encoded.ids}") print(f"Decoded: {fr_tokenizer.decode(encoded.ids)}") print() # # Create PyTorch dataset for the BPE-encoded translation pairs # class TranslationDataset(torch.utils.data.Dataset): def __init__(self, text_pairs): self.text_pairs = text_pairs def __len__(self): return len(self.text_pairs) def __getitem__(self, idx): eng, fra = self.text_pairs[idx] return eng, "[start] " + fra + " [end]" def collate_fn(batch): en_str, fr_str = zip(*batch) en_enc = en_tokenizer.encode_batch(en_str, add_special_tokens=True) fr_enc = fr_tokenizer.encode_batch(fr_str, add_special_tokens=True) en_ids = [enc.ids for enc in en_enc] fr_ids = [enc.ids for enc in fr_enc] return torch.tensor(en_ids), torch.tensor(fr_ids) BATCH_SIZE = 32 dataset = TranslationDataset(text_pairs) dataloader = torch.utils.data.DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn) # Test the dataset for en_ids, fr_ids in dataloader: print(f"English: {en_ids}") print(f"French: {fr_ids}") break # # Create LSTM seq2seq model for translation # class EncoderLSTM(nn.Module): """A stacked LSTM encoder with an embedding layer""" def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers=1, dropout=0.1): """ Plain LSTM is used. No bidirectional LSTM. Args: vocab_size: The size of the input vocabulary embedding_dim: The dimension of the embedding vector hidden_dim: The dimension of the hidden state num_layers: The number of recurrent layers (layers of stacked LSTM) dropout: The dropout rate, applied to all LSTM layers except the last one """ super().__init__() self.vocab_size = vocab_size self.embedding_dim = embedding_dim self.hidden_dim = hidden_dim self.num_layers = num_layers self.embedding = nn.Embedding(vocab_size, embedding_dim) self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers, batch_first=True, dropout=dropout if num_layers > 1 else 0) def forward(self, input_seq): # input seq = [batch_size, seq_len] -> embedded = [batch_size, seq_len, embedding_dim] embedded = self.embedding(input_seq) # outputs = [batch_size, seq_len, embedding_dim] # hidden = cell = [n_layers, batch_size, hidden_dim] outputs, (hidden, cell) = self.lstm(embedded) return outputs, hidden, cell class DecoderLSTM(nn.Module): def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers=1, dropout=0.1): super().__init__() self.vocab_size = vocab_size self.embedding_dim = embedding_dim self.hidden_dim = hidden_dim self.num_layers = num_layers self.embedding = nn.Embedding(vocab_size, embedding_dim) self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers, batch_first=True, dropout=dropout if num_layers > 1 else 0) self.out = nn.Linear(embedding_dim, vocab_size) def forward(self, input_seq, hidden, cell): # input seq = [batch_size, seq_len] -> embedded = [batch_size, seq_len, embedding_dim] # hidden = cell = [n_layers, batch_size, hidden_dim] embedded = self.embedding(input_seq) # output = [batch_size, seq_len, embedding_dim] output, (hidden, cell) = self.lstm(embedded, (hidden, cell)) prediction = self.out(output) return prediction, hidden, cell class Seq2SeqLSTM(nn.Module): def __init__(self, encoder, decoder): super().__init__() self.encoder = encoder self.decoder = decoder def forward(self, input_seq, target_seq): """Given the partial target sequence, predict the next token""" # input seq = [batch_size, seq_len] # target seq = [batch_size, seq_len] batch_size, target_len = target_seq.shape device = target_seq.device # storing output logits outputs = [] # encoder forward pass _enc_out, hidden, cell = self.encoder(input_seq) dec_in = target_seq[:, :1] # decoder forward pass for t in range(target_len-1): # last target token and hidden states -> next token pred, hidden, cell = self.decoder(dec_in, hidden, cell) # store the prediction pred = pred[:, -1:, :] outputs.append(pred) # use the predicted token as the next input dec_in = torch.cat([dec_in, pred.argmax(dim=2)], dim=1) outputs = torch.cat(outputs, dim=1) return outputs # Initialize model parameters device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') enc_vocab = len(en_tokenizer.get_vocab()) dec_vocab = len(fr_tokenizer.get_vocab()) emb_dim = 256 hidden_dim = 256 num_layers = 2 dropout = 0.1 # Create model encoder = EncoderLSTM(enc_vocab, emb_dim, hidden_dim, num_layers, dropout).to(device) decoder = DecoderLSTM(dec_vocab, emb_dim, hidden_dim, num_layers, dropout).to(device) model = Seq2SeqLSTM(encoder, decoder).to(device) print(model) print("Model created with:") print(f" Input vocabulary size: {enc_vocab}") print(f" Output vocabulary size: {dec_vocab}") print(f" Embedding dimension: {emb_dim}") print(f" Hidden dimension: {hidden_dim}") print(f" Number of layers: {num_layers}") print(f" Dropout: {dropout}") print(f" Total parameters: {sum(p.numel() for p in model.parameters() if p.requires_grad)}") # Train unless model.pth exists if os.path.exists("seq2seq.pth"): model.load_state_dict(torch.load("seq2seq.pth")) else: optimizer = optim.Adam(model.parameters(), lr=0.001) loss_fn = nn.CrossEntropyLoss(ignore_index=fr_tokenizer.token_to_id("[pad]")) N_EPOCHS = 30 for epoch in range(N_EPOCHS): model.train() epoch_loss = 0 for en_ids, fr_ids in tqdm.tqdm(dataloader, desc="Training"): # Move the "sentences" to device en_ids = en_ids.to(device) fr_ids = fr_ids.to(device) # zero the grad, then forward pass optimizer.zero_grad() outputs = model(en_ids, fr_ids) # compute the loss: compare 3D logits to 2D targets loss = loss_fn(outputs.reshape(-1, dec_vocab), fr_ids[:, 1:].reshape(-1)) loss.backward() optimizer.step() epoch_loss += loss.item() print(f"Epoch {epoch+1}/{N_EPOCHS}; Avg loss {epoch_loss/len(dataloader)}; Latest loss {loss.item()}") torch.save(model.state_dict(), f"seq2seq-epoch-{epoch+1}.pth") # Test if (epoch+1) % 5 != 0: continue model.eval() epoch_loss = 0 with torch.no_grad(): for en_ids, fr_ids in tqdm.tqdm(dataloader, desc="Evaluating"): en_ids = en_ids.to(device) fr_ids = fr_ids.to(device) outputs = model(en_ids, fr_ids) loss = loss_fn(outputs.reshape(-1, dec_vocab), fr_ids[:, 1:].reshape(-1)) epoch_loss += loss.item() print(f"Eval loss: {epoch_loss/len(dataloader)}") # Save the final model torch.save(model.state_dict(), "seq2seq.pth") # Test for a few samples model.eval() N_SAMPLES = 5 MAX_LEN = 60 with torch.no_grad(): start_token = torch.tensor([fr_tokenizer.token_to_id("[start]")]).to(device) for en, true_fr in random.sample(text_pairs, N_SAMPLES): en_ids = torch.tensor(en_tokenizer.encode(en).ids).unsqueeze(0).to(device) _output, hidden, cell = model.encoder(en_ids) pred_ids = [start_token] for _ in range(MAX_LEN): decoder_input = torch.tensor(pred_ids).unsqueeze(0).to(device) output, hidden, cell = model.decoder(decoder_input, hidden, cell) output = output[:, -1, :].argmax(dim=1) pred_ids.append(output.item()) # early stop if the predicted token is the end token if pred_ids[-1] == fr_tokenizer.token_to_id("[end]"): break # Decode the predicted IDs pred_fr = fr_tokenizer.decode(pred_ids) print(f"English: {en}") print(f"French: {true_fr}") print(f"Predicted: {pred_fr}") print() |
Further Readings
Below are some resources that you may find useful:
- Sequence to Sequence Learning with Neural Networks
- Learning Phrase Representations using RNN Encoder-Decoder for Statistical Machine Translation
- The Unreasonable Effectiveness of Recurrent Neural Networks
- PyTorch Tutorial on Seq2Seq Translation
- nn.LSTM module in PyTorch
Summary
In this post, you learned about building and training a seq2seq model with LSTM for English to French translation. Specifically, you learned about:
- How encoder-decoder architectures work with LSTM cells
- How to prepare the dataset for training a seq2seq model
- How to implement and train the complete translation model in PyTorch
The implementation is straightforward, but it outlines the general mechanism for the seq2seq model.







No comments yet.