Chatbot Tutorial

Introduction

This is a walk-through the pytorch Chatbot Tutorial which builds a chatbot using a recurrent Sequence-to-Sequence model trained on the Cornell Movie-Dialogs Corpus.

Set Up

Imports

Python

from collections import defaultdict, namedtuple
import codecs
from pathlib import Path
from typing import Dict, List, Union
from zipfile import ZipFile
import csv
import os
import subprocess

PyPi

from dotenv import load_dotenv
import requests
import torch

This Project

from neurotic.tangles.timer import Timer

Setup the Timer

TIMER = Timer()

Load Dotenv

load_dotenv("../../.env")

Check CUDA

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using {}".format(device))
Using cuda

Some Type Hints

OptionalList = Union[list, None]

Some Constants

ENCODING = "iso-8859-1"

The Data

Download

class MovieData:
    """Dowload and ready the movie data
    Args:
     download_path: Path to the folder to store the data
     url: download url for the zip file
     chunk_size: bytes to read from stream during download
     clean_up: remove the extra downloaded files
    """
    def __init__(self,
                 download_path: Path,
                 url: str=("http://www.cs.cornell.edu/~cristian/data/"
                           "cornell_movie_dialogs_corpus.zip"),
                 chunk_size=1024,
                 clean_up: bool=True) -> None:
        self.download_path = download_path
        self.url = url
        self.chunk_size = chunk_size
        self.clean_up = clean_up
        self._zip_path = None
        self._data_path = None
        self._zip_file = None
        return

    @property
    def zip_path(self) -> Path:
        """Path to the downloaded zip file"""
        if self._zip_path is None:
            self._zip_path = self.download_path.joinpath(Path(self.url).name)
        return self._zip_path

    @property
    def data_path(self) -> Path:
        """Path to the unzipped file"""
        if self._data_path is None:
            self._data_path = self.download_path.joinpath(
                Path(self.zip_path).stem)
        return self._data_path

    @property
    def zip_file(self) -> ZipFile:
        """the Zip file for the zipped data"""
        if self._zip_file is None:
            self._zip_file = ZipFile(self.zip_path)
        return self._zip_file

    def clean(self) -> None:
        """remove the extra downloaded files"""
        os.remove(self.zip_path)
        return

    def __call__(self) -> None:
        """downloads and prepares the file if needed"""
        if not self.data_path.is_dir():
            if not self.zip_path.is_file():
                response = requests.get(self.url, stream=True)
                with self.zip_path.open("wb") as writer:
                    for chunk in response.iter_content(chunk_size=self.chunk_size):
                        if chunk:
                            writer.write(chunk)
            unpacked = []
            for name in self.zip_file.namelist():
                name = Path(name)
                # there's extra folders and hidden files in there that I'll avoid
                if name.suffix in (".pdf", ".txt") and not name.name.startswith("."):
                    self.zip_file.extract(str(name), path=self.data_path)
                    unpacked.append(name)
            assert self.data_path.is_dir()
            if self.clean_up:
                # there is a sub-folder in the unzipped folder so move the
                # the files up one
                for to_move in unpacked:
                    self.data_path.joinpath(to_move).rename(
                        self.data_path.joinpath(to_move.name))

                # now delete the temporary file
                os.remove(self.zip_path)
                if unpacked:
                    # now remove the sub-folder
                    self.data_path.joinpath(unpacked[0].parent).rmdir()
        return

Now let's download and unpack the data.

datasets = Path(os.environ.get("DATASETS")).expanduser()
assert datasets.is_dir()
movie_data = MovieData(datasets, clean_up=True)
movie_data()
for name in movie_data.data_path.iterdir():
    print(" - {}".format(name.name))
  • chameleons.pdf
  • conversation_line_pairs.tsv
  • movie_conversations.txt
  • movie_characters_metadata.txt
  • movie_lines.txt
  • movie_titles_metadata.txt
  • raw_script_urls.txt
  • README.txt
class MovieFile:
    urls = "raw_script_urls.txt"
    readme = "README.txt"
    lines = "movie_lines.txt"
    characters = "movie_characters_metadata.txt"
    conversations = "movie_conversations.txt"
    titles = "movie_titles_metadata.txt"

Movie Lines

Here's an excerpt from the README.txt file:

In all files the field separator is " $ "

  • movie_lines.txt
    • contains the actual text of each utterance
    • fields:
      • lineID
      • characterID (who uttered this phrase)
      • movieID
      • character name
      • text of the utterance

Movie Line Data

To load the lines I'm going to make a namedtuple.

MovieLine = namedtuple("MovieLine", ["line_id",
                                     "character_id",
                                     "movie_id",
                                     "character_name",
                                     "text"])

LineData = Dict[str, MovieLine]
LineFields = MovieLine(**{field: index
                          for index, field in enumerate(MovieLine._fields)})

A Line Loader

class MovieLines:
    """loads the movie dialog lines

    Args:
     path: path to the source file
     separator: column-separator
     encoding: the file encoding type (e.g. UTF-8)
    """
    def __init__(self, path: Path, separator: str=" +++$+++ ",
                 encoding="UTF-8") -> None:
        self.path = path
        self.separator = separator
        self.encoding = encoding
        self._lines = None
        return

    @property
    def lines(self) -> LineData:
        """Dictionary Of Lines in the Data"""
        if self._lines is None:
            self._lines = {}
            with self.path.open(encoding=self.encoding) as reader:
                for line in reader:
                    tokens = line.strip().split(self.separator)

                    text = tokens[LineFields.text] if len(tokens) == len(LineFields) else ""
                    movie_line = MovieLine(line_id=tokens[LineFields.line_id],
                                           character_id=tokens[LineFields.character_id],
                                           movie_id=tokens[LineFields.movie_id],
                                           character_name=tokens[LineFields.character_name],
                                           text=text,
                    )
                    self._lines[movie_line.line_id] = movie_line
        return self._lines

    def head(self, lines: int=5, get: bool=False) -> OptionalList:
        """show the first lines

        Args:
         lines: number of lines to read
         get: if true, return the lines
        """
        output = [] if get else None
        with self.path.open() as reader:
            for index, line in enumerate(reader):
                line = line.rstrip()
                print(line)
                if get:
                    output.append(line)
                if index + 1 >= lines:
                    break
        return output
movie_lines = MovieLines(movie_data.data_path.joinpath(MovieFile.lines), encoding=ENCODING)
output_lines = movie_lines.head(10)
L1045 +++$+++ u0 +++$+++ m0 +++$+++ BIANCA +++$+++ They do not!
L1044 +++$+++ u2 +++$+++ m0 +++$+++ CAMERON +++$+++ They do to!
L985 +++$+++ u0 +++$+++ m0 +++$+++ BIANCA +++$+++ I hope so.
L984 +++$+++ u2 +++$+++ m0 +++$+++ CAMERON +++$+++ She okay?
L925 +++$+++ u0 +++$+++ m0 +++$+++ BIANCA +++$+++ Let's go.
L924 +++$+++ u2 +++$+++ m0 +++$+++ CAMERON +++$+++ Wow
L872 +++$+++ u0 +++$+++ m0 +++$+++ BIANCA +++$+++ Okay -- you're gonna need to learn how to lie.
L871 +++$+++ u2 +++$+++ m0 +++$+++ CAMERON +++$+++ No
L870 +++$+++ u0 +++$+++ m0 +++$+++ BIANCA +++$+++ I'm kidding.  You know how sometimes you just become this "persona"?  And you don't know how to quit?
L869 +++$+++ u0 +++$+++ m0 +++$+++ BIANCA +++$+++ Like my fear of wearing pastels?

As note in the README.txt those strange characters are how the columns are separated (I guess so that the commas could be kept in the text). The Line IDs seem to be in reverse oredr, and don't seem to have all the lines - unless they're out of order and just looking at the head is misleading. For reference the movie for the lines I showed (the dialog between Bianca and Cameron) is from 12 Things I Hate About You. For some reason they both encode the chraracters and give their names - u0 is BIANCA.

If you poke around in the file you'll find that there's something peculiar about the characters in it.

output = subprocess.run(["file", "-i", str(movie_lines.path)], stdout=subprocess.PIPE)
print(output.stdout)
b'/home/athena/data/datasets/cornell_movie_dialogs_corpus/movie_lines.txt: text/plain; charset=unknown-8bit\n'

It doesn't look like standard ASCII, but I wonder if it matters. In the pytorch tutorial they give the encoding as iso-8859-1, although I can't find any documentation for this, but since they gave it, I guess we can use it.

ENCODING = "iso-8859-1"

I'm using it in MovieLines too so I defined ENCODING at the top of the notebook, this is just to show where it came from.

Conversations

The movie-lines file has all the movie-conversations together, but we want conversations between characters. For that you need to group the lines using the movie_conversations.txt file.

  • movie_conversations.txt
    • the structure of the conversations
    • fields
      • characterID of the first character involved in the conversation
      • characterID of the second character involved in the conversation
      • movieID of the movie in which the conversation occurred
      • list of the utterances that make the conversation, in chronological order: ['lineID1','lineID2',É,'lineIDN'] has to be matched with movie_lines.txt to reconstruct the actual content

You can see that the README has some kind of funky character in it (the third item in the order list). Weird.

A Conversation Holder

A conversation is a list of lines said by characters to each other. Although the dialog file is presumably in order, we want to be able to partition lines that are part of a single conversation - a verbal interaction between two characters.

ConversationIDs = namedtuple("ConversationIDs", ["character_id_1",
                                                 "character_id_2",
                                                 "movie_id",
                                                 "lines"])
ConversationFields = ConversationIDs(
    **{field: index
       for index, field in enumerate(ConversationIDs._fields)})
ConversationData = List[ConversationIDs]

A Conversations Builder

This is code to pull the lines out and group them by conversation.

class Conversations:
    """Holds the conversations

    Args:
     path: path to the conversations file
     moviez: object with the movie lines
     encoding: the encoding for the file
     separator: the column separator
    """
    def __init__(self,
                 path: Path,
                 movies: MovieLines,
                 separator: str=" +++$+++ ",
                 encoding:str="UTF-8") -> None:
        self.path = path
        self.movies = movies
        self.separator = separator
        self.encoding = encoding
        self._conversations = None
        self._sentence_pairs = None
        return

    @property
    def conversations(self) -> ConversationData:
        """The list of conversation line data
        """
        if self._conversations is None:
            self._conversations = []
            with self.path.open(encoding=self.encoding) as reader:
                for line in reader:
                    tokens = line.strip().split(self.separator)
                    line_ids = eval(tokens[ConversationFields.lines])
                    lines = [self.movies.lines[line_id] for line_id in line_ids]
                    self._conversations.append(
                        ConversationIDs(
                            character_id_1=tokens[ConversationFields.character_id_1],
                            character_id_2=tokens[ConversationFields.character_id_2],
                            movie_id=tokens[ConversationFields.movie_id],
                            lines = lines,
                        ))
        return self._conversations

    @property
    def sentence_pairs(self) -> list:
        """paired-sentences from the conversations"""
        if self._sentence_pairs is None:
            self._sentence_pairs = []
            for conversation in self.conversations:
                for index in range(len(conversation.lines) - 1):
                    utterance = conversation.lines[index].text
                    response = conversation.lines[index + 1].text
                    # you might not always have pairs
                    if utterance and response:
                        self._sentence_pairs.append([utterance, response])
        return self._sentence_pairs

    def head(self, count: int=5) -> None:
        """Print the first lines

        Args:
         count: how many lines to print
        """
        with self.path.open(encoding=self.encoding) as reader:
            so_far = 0
            for line in reader:
                print(line.rstrip())
                so_far += 1
                if so_far >= count:
                    break
        return

Now I'll build the conversations from the file.

conversations_path = movie_data.data_path.joinpath(MovieFile.conversations)
conversations = Conversations(conversations_path, movie_lines, encoding=ENCODING)
conversations.head()
u0 +++$+++ u2 +++$+++ m0 +++$+++ ['L194', 'L195', 'L196', 'L197']
u0 +++$+++ u2 +++$+++ m0 +++$+++ ['L198', 'L199']
u0 +++$+++ u2 +++$+++ m0 +++$+++ ['L200', 'L201', 'L202', 'L203']
u0 +++$+++ u2 +++$+++ m0 +++$+++ ['L204', 'L205', 'L206']
u0 +++$+++ u2 +++$+++ m0 +++$+++ ['L207', 'L208']

Store the Processed Lines

Since we've transformed the data we should store it to avoid needing to transform it again later.

with TIMER:
    processed_path = movie_data.data_path.joinpath("conversation_line_pairs.tsv")
    delimiter = str(codecs.decode("\t", "unicode_escape"))
    NEWLINE = "\n"
    with processed_path.open("w", encoding="utf-8") as outputfile:
        writer = csv.writer(outputfile, delimiter=delimiter)
        for pair in conversations.sentence_pairs:
            writer.writerow(pair)
Started: 2019-02-18 18:44:01.624014
Ended: 2019-02-18 18:44:04.127445
Elapsed: 0:00:02.503431

Check Our Stored File

with processed_path.open() as reader:
    count = 0
    for line in reader:
        print(repr(line))
        count += 1
        if count == 5:
            break
"Can we make this quick?  Roxanne Korrine and Andrew Barrett are having an incredibly horrendous public break- up on the quad.  Again.\tWell, I thought we'd start with pronunciation, if that's okay with you.\n"
"Well, I thought we'd start with pronunciation, if that's okay with you.\tNot the hacking and gagging and spitting part.  Please.\n"
"Not the hacking and gagging and spitting part.  Please.\tOkay... then how 'bout we try out some French cuisine.  Saturday?  Night?\n"
"You're asking me out.  That's so cute. What's your name again?\tForget it.\n"
"No, no, it's my fault -- we didn't have a proper introduction ---\tCameron.\n"

A Vocabulary

PADDING, START_OF_SENTENCE, END_OF_SENTENCE = 0, 1, 2

class Vocabulary:
    """A class to hold words and sentences

    Args:
     name: name of the vocabulary
     token_delimiter: what to split sentences on
    """
    def __init__(self, name: str, token_delimiter: str=" ") -> None:
        self.name = name
        self.trimmed = False
        self.token_delimiter = token_delimiter
        self.word_to_index = {}
        self._word_to_count = None
        self._index_to_word = None
        return

    @property
    def word_to_count(self) -> defaultdict:
        """map of word to word count"""
        if self._word_to_count is None:
            self._word_to_count = defaultdict(lambda: 1)
        return self._word_to_count

    @property
    def index_to_word(self) -> dict:
        """map of word-index back to the word"""
        if self._index_to_word is None:
            self._index_to_word = dict(
                PADDING="PAD",
                START_OF_SENTENCE="SOS",
                END_OF_SENTENCE="EOS",
            )
        return self._index_to_word

    @property
    def word_count(self) -> int:
        """the number of words in our vocabulary"""
        return len(self.index_to_word)

    def add_sentence(self, sentence: str) -> None:
        """Adds the words in the sentence to our dictionary

        Args:
         sentence: string of words
        """
        for word in sentence.split(self.token_delimiter):
            self.add_word(word)
        return

    def add_word(self, word: str) -> None:
        """add the word to our vocabulary

        Args:
         word: word to add
        """
        if word not in self.word_to_index:
            self.word_to_index[word] = self.word_count
            self.index_to_word[self.word_count] = word
        else:
            self.word_to_count[word] += 1
        return

    def trim(self, minimum: int) -> None:
        """Trim words below the minimum

        .. warning:: This will only work once, even if you change the
          minimum. set self.trimmed to False if you want to do it again

        Args:
         minimum: lowest acceptible count for a word
        """
        if self.trimmed:
            return
        self.trimmed = True
        keepers = []
        for word, count in self.word_to_count.items():
            if count >= minimum:
                keepers.append(word)
        print("Keep: {}/{} = {:.2f}".format(len(keepers),
                                            len(self.word_count),
                                            len(keepers)/len(self.word_count)))
        self.reset()
        for word in keepers:
            self.add_word(word)
        return

    def reset(self) -> None:
        """Resets the dictionaries"""
        self.word_to_index = {}
        self._word_to_count = None
        self._index_to_word = None
        return

Preparing the Data For Model-Training