Chatbot Tutorial
Table of Contents
Introduction
This is a walk-through the pytorch Chatbot Tutorial which builds a chatbot using a recurrent Sequence-to-Sequence model trained on the Cornell Movie-Dialogs Corpus.
Set Up
Imports
Python
from collections import defaultdict, namedtuple
import codecs
from pathlib import Path
from typing import Dict, List, Union
from zipfile import ZipFile
import csv
import os
import subprocess
PyPi
from dotenv import load_dotenv
import requests
import torch
This Project
from neurotic.tangles.timer import Timer
Setup the Timer
TIMER = Timer()
Load Dotenv
load_dotenv("../../.env")
Check CUDA
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using {}".format(device))
Using cuda
Some Type Hints
OptionalList = Union[list, None]
Some Constants
ENCODING = "iso-8859-1"
The Data
Download
class MovieData:
"""Dowload and ready the movie data
Args:
download_path: Path to the folder to store the data
url: download url for the zip file
chunk_size: bytes to read from stream during download
clean_up: remove the extra downloaded files
"""
def __init__(self,
download_path: Path,
url: str=("http://www.cs.cornell.edu/~cristian/data/"
"cornell_movie_dialogs_corpus.zip"),
chunk_size=1024,
clean_up: bool=True) -> None:
self.download_path = download_path
self.url = url
self.chunk_size = chunk_size
self.clean_up = clean_up
self._zip_path = None
self._data_path = None
self._zip_file = None
return
@property
def zip_path(self) -> Path:
"""Path to the downloaded zip file"""
if self._zip_path is None:
self._zip_path = self.download_path.joinpath(Path(self.url).name)
return self._zip_path
@property
def data_path(self) -> Path:
"""Path to the unzipped file"""
if self._data_path is None:
self._data_path = self.download_path.joinpath(
Path(self.zip_path).stem)
return self._data_path
@property
def zip_file(self) -> ZipFile:
"""the Zip file for the zipped data"""
if self._zip_file is None:
self._zip_file = ZipFile(self.zip_path)
return self._zip_file
def clean(self) -> None:
"""remove the extra downloaded files"""
os.remove(self.zip_path)
return
def __call__(self) -> None:
"""downloads and prepares the file if needed"""
if not self.data_path.is_dir():
if not self.zip_path.is_file():
response = requests.get(self.url, stream=True)
with self.zip_path.open("wb") as writer:
for chunk in response.iter_content(chunk_size=self.chunk_size):
if chunk:
writer.write(chunk)
unpacked = []
for name in self.zip_file.namelist():
name = Path(name)
# there's extra folders and hidden files in there that I'll avoid
if name.suffix in (".pdf", ".txt") and not name.name.startswith("."):
self.zip_file.extract(str(name), path=self.data_path)
unpacked.append(name)
assert self.data_path.is_dir()
if self.clean_up:
# there is a sub-folder in the unzipped folder so move the
# the files up one
for to_move in unpacked:
self.data_path.joinpath(to_move).rename(
self.data_path.joinpath(to_move.name))
# now delete the temporary file
os.remove(self.zip_path)
if unpacked:
# now remove the sub-folder
self.data_path.joinpath(unpacked[0].parent).rmdir()
return
Now let's download and unpack the data.
datasets = Path(os.environ.get("DATASETS")).expanduser()
assert datasets.is_dir()
movie_data = MovieData(datasets, clean_up=True)
movie_data()
for name in movie_data.data_path.iterdir():
print(" - {}".format(name.name))
- chameleons.pdf
- conversation_line_pairs.tsv
- movie_conversations.txt
- movie_characters_metadata.txt
- movie_lines.txt
- movie_titles_metadata.txt
- raw_script_urls.txt
- README.txt
class MovieFile:
urls = "raw_script_urls.txt"
readme = "README.txt"
lines = "movie_lines.txt"
characters = "movie_characters_metadata.txt"
conversations = "movie_conversations.txt"
titles = "movie_titles_metadata.txt"
Movie Lines
Here's an excerpt from the README.txt
file:
In all files the field separator is "
"$
- movie_lines.txt
- contains the actual text of each utterance
- fields:
- lineID
- characterID (who uttered this phrase)
- movieID
- character name
- text of the utterance
Movie Line Data
To load the lines I'm going to make a namedtuple.
MovieLine = namedtuple("MovieLine", ["line_id",
"character_id",
"movie_id",
"character_name",
"text"])
LineData = Dict[str, MovieLine]
LineFields = MovieLine(**{field: index
for index, field in enumerate(MovieLine._fields)})
A Line Loader
class MovieLines:
"""loads the movie dialog lines
Args:
path: path to the source file
separator: column-separator
encoding: the file encoding type (e.g. UTF-8)
"""
def __init__(self, path: Path, separator: str=" +++$+++ ",
encoding="UTF-8") -> None:
self.path = path
self.separator = separator
self.encoding = encoding
self._lines = None
return
@property
def lines(self) -> LineData:
"""Dictionary Of Lines in the Data"""
if self._lines is None:
self._lines = {}
with self.path.open(encoding=self.encoding) as reader:
for line in reader:
tokens = line.strip().split(self.separator)
text = tokens[LineFields.text] if len(tokens) == len(LineFields) else ""
movie_line = MovieLine(line_id=tokens[LineFields.line_id],
character_id=tokens[LineFields.character_id],
movie_id=tokens[LineFields.movie_id],
character_name=tokens[LineFields.character_name],
text=text,
)
self._lines[movie_line.line_id] = movie_line
return self._lines
def head(self, lines: int=5, get: bool=False) -> OptionalList:
"""show the first lines
Args:
lines: number of lines to read
get: if true, return the lines
"""
output = [] if get else None
with self.path.open() as reader:
for index, line in enumerate(reader):
line = line.rstrip()
print(line)
if get:
output.append(line)
if index + 1 >= lines:
break
return output
movie_lines = MovieLines(movie_data.data_path.joinpath(MovieFile.lines), encoding=ENCODING)
output_lines = movie_lines.head(10)
L1045 +++$+++ u0 +++$+++ m0 +++$+++ BIANCA +++$+++ They do not! L1044 +++$+++ u2 +++$+++ m0 +++$+++ CAMERON +++$+++ They do to! L985 +++$+++ u0 +++$+++ m0 +++$+++ BIANCA +++$+++ I hope so. L984 +++$+++ u2 +++$+++ m0 +++$+++ CAMERON +++$+++ She okay? L925 +++$+++ u0 +++$+++ m0 +++$+++ BIANCA +++$+++ Let's go. L924 +++$+++ u2 +++$+++ m0 +++$+++ CAMERON +++$+++ Wow L872 +++$+++ u0 +++$+++ m0 +++$+++ BIANCA +++$+++ Okay -- you're gonna need to learn how to lie. L871 +++$+++ u2 +++$+++ m0 +++$+++ CAMERON +++$+++ No L870 +++$+++ u0 +++$+++ m0 +++$+++ BIANCA +++$+++ I'm kidding. You know how sometimes you just become this "persona"? And you don't know how to quit? L869 +++$+++ u0 +++$+++ m0 +++$+++ BIANCA +++$+++ Like my fear of wearing pastels?
As note in the README.txt
those strange characters are how the columns are separated (I guess so that the commas could be kept in the text). The Line IDs seem to be in reverse oredr, and don't seem to have all the lines - unless they're out of order and just looking at the head is misleading. For reference the movie for the lines I showed (the dialog between Bianca and Cameron) is from 12 Things I Hate About You. For some reason they both encode the chraracters and give their names - u0
is BIANCA
.
If you poke around in the file you'll find that there's something peculiar about the characters in it.
output = subprocess.run(["file", "-i", str(movie_lines.path)], stdout=subprocess.PIPE)
print(output.stdout)
b'/home/athena/data/datasets/cornell_movie_dialogs_corpus/movie_lines.txt: text/plain; charset=unknown-8bit\n'
It doesn't look like standard ASCII, but I wonder if it matters. In the pytorch tutorial they give the encoding as iso-8859-1
, although I can't find any documentation for this, but since they gave it, I guess we can use it.
ENCODING = "iso-8859-1"
I'm using it in MovieLines too so I defined ENCODING at the top of the notebook, this is just to show where it came from.
Conversations
The movie-lines file has all the movie-conversations together, but we want conversations between characters. For that you need to group the lines using the movie_conversations.txt
file.
- movie_conversations.txt
- the structure of the conversations
- fields
- characterID of the first character involved in the conversation
- characterID of the second character involved in the conversation
- movieID of the movie in which the conversation occurred
- list of the utterances that make the conversation, in chronological order: ['lineID1','lineID2',É,'lineIDN'] has to be matched with movie_lines.txt to reconstruct the actual content
You can see that the README has some kind of funky character in it (the third item in the order
list). Weird.
A Conversation Holder
A conversation is a list of lines said by characters to each other. Although the dialog file is presumably in order, we want to be able to partition lines that are part of a single conversation - a verbal interaction between two characters.
ConversationIDs = namedtuple("ConversationIDs", ["character_id_1",
"character_id_2",
"movie_id",
"lines"])
ConversationFields = ConversationIDs(
**{field: index
for index, field in enumerate(ConversationIDs._fields)})
ConversationData = List[ConversationIDs]
A Conversations Builder
This is code to pull the lines out and group them by conversation.
class Conversations:
"""Holds the conversations
Args:
path: path to the conversations file
moviez: object with the movie lines
encoding: the encoding for the file
separator: the column separator
"""
def __init__(self,
path: Path,
movies: MovieLines,
separator: str=" +++$+++ ",
encoding:str="UTF-8") -> None:
self.path = path
self.movies = movies
self.separator = separator
self.encoding = encoding
self._conversations = None
self._sentence_pairs = None
return
@property
def conversations(self) -> ConversationData:
"""The list of conversation line data
"""
if self._conversations is None:
self._conversations = []
with self.path.open(encoding=self.encoding) as reader:
for line in reader:
tokens = line.strip().split(self.separator)
line_ids = eval(tokens[ConversationFields.lines])
lines = [self.movies.lines[line_id] for line_id in line_ids]
self._conversations.append(
ConversationIDs(
character_id_1=tokens[ConversationFields.character_id_1],
character_id_2=tokens[ConversationFields.character_id_2],
movie_id=tokens[ConversationFields.movie_id],
lines = lines,
))
return self._conversations
@property
def sentence_pairs(self) -> list:
"""paired-sentences from the conversations"""
if self._sentence_pairs is None:
self._sentence_pairs = []
for conversation in self.conversations:
for index in range(len(conversation.lines) - 1):
utterance = conversation.lines[index].text
response = conversation.lines[index + 1].text
# you might not always have pairs
if utterance and response:
self._sentence_pairs.append([utterance, response])
return self._sentence_pairs
def head(self, count: int=5) -> None:
"""Print the first lines
Args:
count: how many lines to print
"""
with self.path.open(encoding=self.encoding) as reader:
so_far = 0
for line in reader:
print(line.rstrip())
so_far += 1
if so_far >= count:
break
return
Now I'll build the conversations from the file.
conversations_path = movie_data.data_path.joinpath(MovieFile.conversations)
conversations = Conversations(conversations_path, movie_lines, encoding=ENCODING)
conversations.head()
u0 +++$+++ u2 +++$+++ m0 +++$+++ ['L194', 'L195', 'L196', 'L197'] u0 +++$+++ u2 +++$+++ m0 +++$+++ ['L198', 'L199'] u0 +++$+++ u2 +++$+++ m0 +++$+++ ['L200', 'L201', 'L202', 'L203'] u0 +++$+++ u2 +++$+++ m0 +++$+++ ['L204', 'L205', 'L206'] u0 +++$+++ u2 +++$+++ m0 +++$+++ ['L207', 'L208']
Store the Processed Lines
Since we've transformed the data we should store it to avoid needing to transform it again later.
with TIMER:
processed_path = movie_data.data_path.joinpath("conversation_line_pairs.tsv")
delimiter = str(codecs.decode("\t", "unicode_escape"))
NEWLINE = "\n"
with processed_path.open("w", encoding="utf-8") as outputfile:
writer = csv.writer(outputfile, delimiter=delimiter)
for pair in conversations.sentence_pairs:
writer.writerow(pair)
Started: 2019-02-18 18:44:01.624014 Ended: 2019-02-18 18:44:04.127445 Elapsed: 0:00:02.503431
Check Our Stored File
with processed_path.open() as reader:
count = 0
for line in reader:
print(repr(line))
count += 1
if count == 5:
break
"Can we make this quick? Roxanne Korrine and Andrew Barrett are having an incredibly horrendous public break- up on the quad. Again.\tWell, I thought we'd start with pronunciation, if that's okay with you.\n" "Well, I thought we'd start with pronunciation, if that's okay with you.\tNot the hacking and gagging and spitting part. Please.\n" "Not the hacking and gagging and spitting part. Please.\tOkay... then how 'bout we try out some French cuisine. Saturday? Night?\n" "You're asking me out. That's so cute. What's your name again?\tForget it.\n" "No, no, it's my fault -- we didn't have a proper introduction ---\tCameron.\n"
A Vocabulary
PADDING, START_OF_SENTENCE, END_OF_SENTENCE = 0, 1, 2
class Vocabulary:
"""A class to hold words and sentences
Args:
name: name of the vocabulary
token_delimiter: what to split sentences on
"""
def __init__(self, name: str, token_delimiter: str=" ") -> None:
self.name = name
self.trimmed = False
self.token_delimiter = token_delimiter
self.word_to_index = {}
self._word_to_count = None
self._index_to_word = None
return
@property
def word_to_count(self) -> defaultdict:
"""map of word to word count"""
if self._word_to_count is None:
self._word_to_count = defaultdict(lambda: 1)
return self._word_to_count
@property
def index_to_word(self) -> dict:
"""map of word-index back to the word"""
if self._index_to_word is None:
self._index_to_word = dict(
PADDING="PAD",
START_OF_SENTENCE="SOS",
END_OF_SENTENCE="EOS",
)
return self._index_to_word
@property
def word_count(self) -> int:
"""the number of words in our vocabulary"""
return len(self.index_to_word)
def add_sentence(self, sentence: str) -> None:
"""Adds the words in the sentence to our dictionary
Args:
sentence: string of words
"""
for word in sentence.split(self.token_delimiter):
self.add_word(word)
return
def add_word(self, word: str) -> None:
"""add the word to our vocabulary
Args:
word: word to add
"""
if word not in self.word_to_index:
self.word_to_index[word] = self.word_count
self.index_to_word[self.word_count] = word
else:
self.word_to_count[word] += 1
return
def trim(self, minimum: int) -> None:
"""Trim words below the minimum
.. warning:: This will only work once, even if you change the
minimum. set self.trimmed to False if you want to do it again
Args:
minimum: lowest acceptible count for a word
"""
if self.trimmed:
return
self.trimmed = True
keepers = []
for word, count in self.word_to_count.items():
if count >= minimum:
keepers.append(word)
print("Keep: {}/{} = {:.2f}".format(len(keepers),
len(self.word_count),
len(keepers)/len(self.word_count)))
self.reset()
for word in keepers:
self.add_word(word)
return
def reset(self) -> None:
"""Resets the dictionaries"""
self.word_to_index = {}
self._word_to_count = None
self._index_to_word = None
return