Autocorrect: Minimum Edit Distance

Beginning

This is a post in the series of posts introduced here. Now that we implemented auto-correct, how do we evaluate the similarity between two strings (e.g. 'waht' and 'what')?

Also how do you efficiently find the shortest path to go from the word, 'waht' to the word 'what'?

We'll implement a dynamic programming system that will tell us the minimum number of edits required to convert a string into another string.

Imports

# python
from pathlib import Path

import os

# from pypi
from dotenv import load_dotenv
import numpy
import pandas

# this repo
from neurotic.nlp.autocorrect.preprocessing import CorpusBuilder
from neurotic.nlp.autocorrect.suggestor import WordSuggestor

Set Up

load_dotenv("posts/nlp/.env", override=True)
path = Path(os.environ["SHAKESPEARE"])
corpus = CorpusBuilder(path)
suggestor = WordSuggestor(corpus=corpus, suggestions=2, want_switches=False)

Minimum Edit distance

Dynamic Programming

Dynamic Programming breaks a problem down into subproblems which can be combined to form the final solution. Here, given a string \(source_{[0 \ldots i]}\) and a string \(target_{[0 \ldots j]}\), we will compute all the combinations of \(substrings_{[i, j]}\) and calculate their edit distance. To do this efficiently, we will use a table to maintain the previously computed substrings and use those to calculate larger substrings.

We'll create a matrix and update each element in the matrix as follows:

\[ \text{Initialization} \]

\begin{align} D[0,0] &= 0 \\ D[i,0] &= D[i-1,0] + del\_cost(source[i]) \tag{4}\\ D[0,j] &= D[0,j-1] + ins\_cost(target[j]) \\ \end{align}

\[ \text{Per Cell Operations} \]

\begin{align} D[i,j] =min \begin{cases} D[i-1,j] + del\_cost\\ D[i,j-1] + ins\_cost\\ D[i-1,j-1] + \left\{\begin{matrix} rep\_cost; & \textit{if src}[i]\neq tar[j]\\ 0 ; & \textit{if src}[i]=tar[j] \end{matrix}\right. \end{cases} \tag{5} \end{align}

So converting the source word play to the target word stay, using an insert cost of one, a delete cost of 1, and replace cost of 2 would give you the following table:

  # s t a y
# 0 1 2 3 4
p 1 2 3 4 5
l 2 3 4 5 6
a 3 4 5 4 5
y 4 5 6 5 4

The operations used in this algorithm are 'insert', 'delete', and 'replace'. These correspond to the functions that you defined earlier: insert_letter(), delete_letter() and replace_letter(). switch_letter() is not used here.

The diagram below describes how to initialize the table. Each entry in D[i,j] represents the minimum cost of converting string source[0:i] to string target[0:j]. The first column is initialized to represent the cumulative cost of deleting the source characters to convert string "EER" to "". The first row is initialized to represent the cumulative cost of inserting the target characters to convert from "" to "NEAR".

\begin{align} D[i,j] =min \begin{cases} D[i-1,j] + del\_cost\\ D[i,j-1] + ins\_cost\\ D[i-1,j-1] + \left\{\begin{matrix} rep\_cost; & if src[i]\neq tar[j]\\ 0 ; & if src[i]=tar[j] \end{matrix}\right. \end{cases} \tag{5} \end{align}
def min_edit_distance(source, target, ins_cost = 1, del_cost = 1, rep_cost = 2):
    '''
    Input: 
       source: a string corresponding to the string you are starting with
       target: a string corresponding to the string you want to end with
       ins_cost: an integer setting the insert cost
       del_cost: an integer setting the delete cost
       rep_cost: an integer setting the replace cost
    Output:
       D: a matrix of len(source)+1 by len(target)+1 containing minimum edit distances
       med: the minimum edit distance (med) required to convert the source string to the target
    '''
    # use deletion and insert cost as  1
    m = len(source) 
    n = len(target) 
    #initialize cost matrix with zeros and dimensions (m+1,n+1) 
    D = numpy.zeros((m+1, n+1), dtype=int) 


    # Fill in column 0, from row 1 to row m, both inclusive
    for row in range(1, m + 1): # Replace None with the proper range
        D[row,0] = D[row - 1, 0] + del_cost

    # Fill in row 0, for all columns from 1 to n, both inclusive
    for col in range(1, n + 1): # Replace None with the proper range
        D[0,col] = D[0, col - 1] + ins_cost

    # Loop through row 1 to row m, both inclusive
    for row in range(1, m + 1): 

        # Loop through column 1 to column n, both inclusive
        for col in range(1, n + 1):
            r_cost = rep_cost

            if source[row - 1] == target[col - 1]:
                r_cost = 0
            D[row,col] = min((D[row-1, col] + del_cost),
                             D[row, col - 1] + ins_cost,
                             D[row-1, col-1] + r_cost)

    # Set the minimum edit distance with the cost found at row m, column n
    med = D[m, n]

    return D, med

Testing the Function

play to stay

source =  'play'
target = 'stay'
matrix, min_edits = min_edit_distance(source, target)
print("minimum edits: ",min_edits, "\n")
idx = list('#' + source)
cols = list('#' + target)
expected = pandas.DataFrame(numpy.array([
    [0,  1,  2,  3,  4],
    [1,  2,  3,  4,  5],
    [2,  3,  4,  5,  6],
    [3,  4,  5,  4,  5],
    [4,  5,  6,  5,  4],
]), index=idx, columns=cols)
actual = pandas.DataFrame(matrix, index=idx, columns= cols)
print(actual)
assert min_edits==4

assert all(expected == actual)
minimum edits:  4 

   #  s  t  a  y
#  0  1  2  3  4
p  1  2  3  4  5
l  2  3  4  5  6
a  3  4  5  4  5
y  4  5  6  5  4

eer to near

source =  'eer'
target = 'near'
matrix, min_edits = min_edit_distance(source, target)
print("minimum edits: ",min_edits, "\n")
idx = list(source)
idx.insert(0, '#')
cols = list(target)
cols.insert(0, '#')
actual = pandas.DataFrame(matrix, index=idx, columns= cols)
print(actual)
expected = pandas.DataFrame([
    [0,  1,  2,  3,  4],
    [1,  2,  1,  2,  3],
    [2,  3,  2,  3,  4],
    [3,  4,  3,  4,  3],
    ], index=idx, columns=cols)
assert all(expected == actual)
assert min_edits == 3
minimum edits:  3 

   #  n  e  a  r
#  0  1  2  3  4
e  1  2  1  2  3
e  2  3  2  3  4
r  3  4  3  4  3

intention to execution

source = "intention"
target = "execution"
matrix, min_edits = min_edit_distance(source, target)
print("minimum edits: ",min_edits, "\n")
index = list("#" + source)
columns = list("#" + target)
actual = pandas.DataFrame(matrix, index=index, columns=columns)
print(actual)
expected = pandas.DataFrame([
    [0, 1, 2,  3,  4,  5,  6,  7,  8,  9],
    [1, 2, 3,  4,  5,  6,  7,  6,  7,  8],
    [2, 3, 4,  5,  6,  7,  8,  7,  8,  7],
    [3, 4, 5,  6,  7,  8,  7,  8,  9,  8],
    [4, 3, 4,  5,  6,  7,  8,  9, 10,  9],
    [5, 4, 5,  6,  7,  8,  9, 10, 11, 10],
    [6, 5, 6,  7,  8,  9,  8,  9, 10, 11],
    [7, 6, 7,  8,  9, 10,  9,  8,  9, 10],
    [8, 7, 8,  9, 10, 11, 10,  9,  8,  9],
    [9, 8, 9, 10, 11, 12, 11, 10,  9,  8],
], index=index, columns=columns)

assert all(expected == actual)
assert min_edits == 8
minimum edits:  8 

   #  e  x   e   c   u   t   i   o   n
#  0  1  2   3   4   5   6   7   8   9
i  1  2  3   4   5   6   7   6   7   8
n  2  3  4   5   6   7   8   7   8   7
t  3  4  5   6   7   8   7   8   9   8
e  4  3  4   5   6   7   8   9  10   9
n  5  4  5   6   7   8   9  10  11  10
t  6  5  6   7   8   9   8   9  10  11
i  7  6  7   8   9  10   9   8   9  10
o  8  7  8   9  10  11  10   9   8   9
n  9  8  9  10  11  12  11  10   9   8

Finding the Closest of Multiple Strings

One Letter Edits

source = "eer"
targets = suggestor.one_letter_edits(source)
rep_cost = 1
for t in targets:
    _, min_edits = min_edit_distance(source, t, rep_cost=rep_cost)  # set ins, del, sub costs all to one
    if min_edits != 1: print(source, t, min_edits)

Two Letter Edits

The 'replace()' routine utilizes all letters a-z one of which returns the original word.

source = "eer"
targets = suggestor.two_letter_edits(source)
for t in targets:
    _, min_edits = min_edit_distance(source, t,rep_cost=rep_cost)
    if min_edits != 2 and min_edits != 1: print(source, t, min_edits)
eer eer 0

We have to allow single edits here because some two_edits will restore a single edit.

End

The next post will be about implementing backtrace to find the shortest path to the minimum edit distance.

A Class-Based Minimum Edit Distance

Imports

# pypi
from tabulate import tabulate

import attr
import numpy
import pandas

The Minimum Edit Distance

@attr.s(auto_attribs=True)
class MinimumEdits:
    """Calculates the minimum edit distance between two strings

    Uses the Levenshtein distance

    Args:
     source: the starting string
     target: what to transform the source to
     insertion_cost: how much inserting a character costs
     deletion_cost: how much deleting a character costs
     replacement_cost: how much swapping out a character costs
     table_format: tabluate table format for printing table
    """
    source: str
    target: str
    insertion_cost: int=1
    deletion_cost: int=1
    replacement_cost: int=2
    table_format: str="orgtbl"
    _rows: int=None
    _columns: int=None
    _distance_table: numpy.ndarray=None
    _distance_frame: pandas.DataFrame=None
    _minimum_distance: int=None
    _backtrace: list=None

Rows

@property
def rows(self) -> int:
    """Rows in the table"""
    if self._rows is None:
        self._rows = len(self.source)
    return self._rows

Columns

@property
def columns(self) -> int:
    """Number of columns for the table"""
    if self._columns is None:
        self._columns = len(self.target)
    return self._columns

The Table

@property
def distance_table(self) -> numpy.ndarray:
    """Table of edit distances"""
    if self._distance_table is None:
        self._distance_table = numpy.zeros((self.rows + 1, self.columns + 1),
                                           dtype=int)
        # initialize the first row
        for row in range(1, self.rows + 1):
            self._distance_table[row, 0] = (self._distance_table[row - 1, 0]
                                            + self.deletion_cost)
        # initialize the first column
        for column in range(1, self.columns + 1):
            self._distance_table[0, column] = (self._distance_table[0, column-1]
                                               + self.insertion_cost)

        for row in range(1, self.rows + 1):
            one_row_back = row - 1
            for column in range(1, self.columns + 1):
                one_column_back = column - 1
                replacement_cost = (
                    0 if self.source[one_row_back] == self.target[one_column_back]
                    else self.replacement_cost)
                self._distance_table[row, column] = min(
                    (self._distance_table[one_row_back, column]
                     + self.deletion_cost),
                     (self._distance_table[row, one_column_back]
                      + self.insertion_cost),
                    (self._distance_table[one_row_back, one_column_back]
                     + replacement_cost))
    return self._distance_table

Distance Frame

@property
def distance_frame(self) -> pandas.DataFrame:
    """pandas dataframe of the distance table"""
    if self._distance_frame is None:
        self._distance_frame = pandas.DataFrame(
            self.distance_table,
            index= list("#" + self.source),
            columns = list("#" + self.target),
        )
    return self._distance_frame

Minimum Distance

@property
def minimum_distance(self) -> int:
    """The minimum edit distance from source to target"""
    if self._minimum_distance is None:
        self._minimum_distance = self.distance_table[
            self.rows, self.columns]
    return self._minimum_distance

Distance String

def __str__(self) -> str:
    """tabluate version of distance frame

    Returns:
     table formatted string of distance table
    """
    return tabulate(self.distance_frame, headers="keys", tablefmt=self.table_format)

Test Out the Minimum Distance

from neurotic.nlp.autocorrect.distance import MinimumEdits

SOURCE, TARGET = "cow", "dog"
editor = MinimumEdits(source=SOURCE, target=TARGET)
assert editor.rows == len(SOURCE)
assert editor.columns == len(TARGET)

assert editor.distance_table.shape == (len(SOURCE) + 1, len(TARGET) + 1)
assert (editor.distance_table[:, 0] == numpy.arange(editor.rows + 1, dtype=int)).all()
assert (editor.distance_table[0, :] == numpy.arange(editor.columns + 1, dtype=int)).all()
assert (editor.distance_table == numpy.array([[0, 1, 2, 3],
                                              [1, 2, 3, 4],
                                              [2, 3, 2, 3],
                                              [3, 4, 3, 4]])).all()
assert editor.minimum_distance == 4
editor = MinimumEdits(source="play", target="stay")
assert editor.minimum_distance == 4
editor = MinimumEdits(source="eer", target="near")
assert editor.minimum_distance == 3
editor = MinimumEdits(source="intention", target="execution")
assert editor.minimum_distance == 8
print(editor.distance_frame)
   #  d  o  g
#  0  1  2  3
c  1  2  3  4
o  2  3  2  3
w  3  4  3  4
print(str(editor))
  # d o g
# 0 1 2 3
c 1 2 3 4
o 2 3 2 3
w 3 4 3 4

Autocorrect System: Combining the Edits

Beginning

This is a continuation of a series of posts that were introduced here. Now that we've implemented the string manipulations, we'll create two functions that, given a string, will return all the possible single and double edits on that string. These will be edit_one_letter() and edit_two_letters().

Imports

# python
from pathlib import Path

import math
import os

# pypi
from dotenv import load_dotenv

# this repo
from neurotic.nlp.autocorrect.edits import TheEditor
from neurotic.nlp.autocorrect.preprocessing import CorpusBuilder

Set Up

The CorpusBuilder has both the vocabulary and the probability for each word in the vocabulary. It gets the path to the text-file with the vocabulary in it from an envirnoment variable named SHAKESPEARE so we have to load that first.

load_dotenv("posts/nlp/.env", override=True)
path = Path(os.environ["SHAKESPEARE"])

And then build the corpus.

corpus = CorpusBuilder(path)

Middle

Combining the edits

Edit one letter

Instructions: Implement the edit_one_letter function to get all the possible edits that are one edit away from a word. The edits consist of the replace, insert, delete, and optionally the switch operation. You should use the previous functions you have already implemented to complete this function. The 'switch' function is a less common edit function, so its use will be selected by an "allow_switches" input argument.

Note that those functions return lists while this function should return a python set. Utilizing a set eliminates any duplicate entries.

def edit_one_letter(word: str, allow_switches: bool=True) -> set:
    """Get all possible words one edit away from the original

    Args:
      word: word for which we will generate all possible words one edit away.

    Returns:
      edit_one_set: a set of words with one possible edit.
    """

    edit_one_set = set()

    editor = TheEditor(word)
    edits = editor.replaced + editor.inserted + editor.deleted
    if allow_switches:
        edits += editor.switched
    edit_one_set = set(edits)

    return edit_one_set
tmp_word = "at"
tmp_edit_one_set = edit_one_letter(tmp_word)
# turn this into a list to sort it, in order to view it
tmp_edit_one_l = sorted(list(tmp_edit_one_set))

print(f"input word {tmp_word} \nedit_one_l \n{tmp_edit_one_l}\n")
print(f"The type of the returned object should be a set {type(tmp_edit_one_set)}")
print(f"Number of outputs from edit_one_letter('at') is {len(edit_one_letter('at'))}")

expected = ['a', 'aa', 'aat', 'ab', 'abt', 'ac', 'act', 'ad', 'adt', 'ae', 'aet', 'af', 'aft', 'ag', 'agt', 'ah', 'aht', 'ai', 'ait', 'aj', 'ajt', 'ak', 'akt', 'al', 'alt', 'am', 'amt', 'an', 'ant', 'ao', 'aot', 'ap', 'apt', 'aq', 'aqt', 'ar', 'art', 'as', 'ast', 'ata', 'atb', 'atc', 'atd', 'ate', 'atf', 'atg', 'ath', 'ati', 'atj', 'atk', 'atl', 'atm', 'atn', 'ato', 'atp', 'atq', 'atr', 'ats', 'att', 'atu', 'atv', 'atw', 'atx', 'aty', 'atz', 'au', 'aut', 'av', 'avt', 'aw', 'awt', 'ax', 'axt', 'ay', 'ayt', 'az', 'azt', 'bat', 'bt', 'cat', 'ct', 'dat', 'dt', 'eat', 'et', 'fat', 'ft', 'gat', 'gt', 'hat', 'ht', 'iat', 'it', 'jat', 'jt', 'kat', 'kt', 'lat', 'lt', 'mat', 'mt', 'nat', 'nt', 'oat', 'ot', 'pat', 'pt', 'qat', 'qt', 'rat', 'rt', 'sat', 'st', 't', 'ta', 'tat', 'tt', 'uat', 'ut', 'vat', 'vt', 'wat', 'wt', 'xat', 'xt', 'yat', 'yt', 'zat', 'zt']

assert tmp_edit_one_l == expected
assert len(edit_one_letter("at")) == 129
input word at 
edit_one_l 
['a', 'aa', 'aat', 'ab', 'abt', 'ac', 'act', 'ad', 'adt', 'ae', 'aet', 'af', 'aft', 'ag', 'agt', 'ah', 'aht', 'ai', 'ait', 'aj', 'ajt', 'ak', 'akt', 'al', 'alt', 'am', 'amt', 'an', 'ant', 'ao', 'aot', 'ap', 'apt', 'aq', 'aqt', 'ar', 'art', 'as', 'ast', 'ata', 'atb', 'atc', 'atd', 'ate', 'atf', 'atg', 'ath', 'ati', 'atj', 'atk', 'atl', 'atm', 'atn', 'ato', 'atp', 'atq', 'atr', 'ats', 'att', 'atu', 'atv', 'atw', 'atx', 'aty', 'atz', 'au', 'aut', 'av', 'avt', 'aw', 'awt', 'ax', 'axt', 'ay', 'ayt', 'az', 'azt', 'bat', 'bt', 'cat', 'ct', 'dat', 'dt', 'eat', 'et', 'fat', 'ft', 'gat', 'gt', 'hat', 'ht', 'iat', 'it', 'jat', 'jt', 'kat', 'kt', 'lat', 'lt', 'mat', 'mt', 'nat', 'nt', 'oat', 'ot', 'pat', 'pt', 'qat', 'qt', 'rat', 'rt', 'sat', 'st', 't', 'ta', 'tat', 'tt', 'uat', 'ut', 'vat', 'vt', 'wat', 'wt', 'xat', 'xt', 'yat', 'yt', 'zat', 'zt']

The type of the returned object should be a set <class 'set'>
Number of outputs from edit_one_letter('at') is 129

Edit two letters

Now you can generalize this to implement to get two edits on a word. To do so, you would have to get all the possible edits on a single word and then for each modified word, you would have to modify it again.

Instructions: Implement the edit_two_letters function that returns a set of words that are two edits away. Note that creating additional edits based on the edit_one_letter function may 'restore' some one_edits to zero or one edits. That is allowed here. This accounted for in get_corrections.

def edit_two_letters(word: str, allow_switches: bool=True) -> set:
    """Make two-letter edits

    Args:
      word: the input string/word 

    Returns:
       edit_two_set: a set of strings with all possible two edits
    """

    edit_two_set = set()

    ones = edit_one_letter(word, allow_switches)
    for word in ones:
        edit_two_set = edit_two_set.union(edit_one_letter(word, allow_switches))

    return edit_two_set
tmp_edit_two_set = edit_two_letters("a")
tmp_edit_two_l = sorted(list(tmp_edit_two_set))
twos = len(tmp_edit_two_l)

assert twos == 2654, twos
print(f"Number of strings with edit distance of two: {twos}")

first_ten = tmp_edit_two_l[:10]
assert first_ten == ['', 'a', 'aa', 'aaa', 'aab', 'aac', 'aad', 'aae', 'aaf', 'aag']
print(f"First 10 strings {first_ten}")

last_ten = tmp_edit_two_l[-10:]
assert last_ten == ['zv', 'zva', 'zw', 'zwa', 'zx', 'zxa', 'zy', 'zya', 'zz', 'zza']
print(f"Last 10 strings {last_ten}")
print(f"The data type of the returned object should be a set {type(tmp_edit_two_set)}")

actual = len(edit_two_letters('at'))
expected = 7154
assert expected == actual, actual
print(f"Number of strings that are 2 edit distances from 'at' is {actual}")
Number of strings with edit distance of two: 2654
First 10 strings ['', 'a', 'aa', 'aaa', 'aab', 'aac', 'aad', 'aae', 'aaf', 'aag']
Last 10 strings ['zv', 'zva', 'zw', 'zwa', 'zx', 'zxa', 'zy', 'zya', 'zz', 'zza']
The data type of the returned object should be a set <class 'set'>
Number of strings that are 2 edit distances from 'at' is 7154

Suggest Spelling Corrections

Now you will use your edit_two_letters function to get a set of all the possible 2 edits on your word. You will then use those strings to get the most probable word you meant to type aka your typing suggestion.

Instructions: Implement get_corrections, which returns a list of zero to n possible suggestion tuples of the form (word, probability_of_word).

  • Step 1: Generate suggestions for a supplied word: You'll use the edit functions you have developed. The 'suggestion algorithm' should follow this logic:
    • If the word is in the vocabulary, suggest the word.
    • Otherwise, if there are suggestions from edit_one_letter that are in the vocabulary, use those.
    • Otherwise, if there are suggestions from edit_two_letters that are in the vocabulary, use those.
    • Otherwise, suggest the input word.*
    • The idea is that words generated from fewer edits are more likely than words with more edits.

Note:

  • Edits of one or two letters may 'restore' strings to either zero or one edit. This algorithm accounts for this by preferentially selecting lower distance edits first.
  • Short circuit

    In Python, logical operations such as and and or have two useful properties. They can operate on lists and they have ['short-circuit' behavior](https://docs.python.org/3/library/stdtypes.html). Try these:

    Example of logical operation on lists or sets.

    print( [] and ["a","b"] )
    print( [] or ["a","b"] )
    #example of Short circuit behavior
    val1 =  ["Most","Likely"] or ["Less","so"] or ["least","of","all"]  # selects first, does not evalute remainder
    print(val1)
    val2 =  [] or [] or ["least","of","all"] # continues evaluation until there is a non-empty list
    print(val2)
    
    []
    ['a', 'b']
    ['Most', 'Likely']
    ['least', 'of', 'all']
    

    The logical or could be used to implement the suggestion algorithm very compactly. Alternately, if/then constructs could be used.

    Step 2: Create a 'best_words' dictionary where the 'key' is a suggestion and the 'value' is the probability of that word in your vocabulary. If the word is not in the vocabulary, assign it a probability of 0.

    Step 3: Select the n best suggestions. There may be fewer than n.

    • edit_one_letter and edit_two_letters return python sets.
    • Sets have a handy set.intersection feature
    • To find the keys that have the highest values in a dictionary, you can use the Counter dictionary to create a Counter object from a regular dictionary. Then you can use Counter.most_common(n) to get the n most common keys.
    • To find the intersection of two sets, you can use set.intersection or the & operator.
    • If you are not as familiar with short circuit syntax (as shown above), feel free to use if else statements instead.
    • To use an if statement to check of a set is empty, use 'if not x:' syntax
    def get_corrections(word: str, probs: dict, vocab: set, n: int=2, verbose: bool=False) -> list:
        """Gets corrections within n edits
    
        Args: 
           word: a user entered string to check for suggestions
           probs: a dictionary that maps each word to its probability in the corpus
           vocab: a set containing all the vocabulary
           n: number of possible word corrections you want returned in the dictionary
    
        Returns: 
           n_best: a list of tuples with the most probable n corrected words and their probabilities.
        """
    
        suggestions = []
        n_best = []
    
        if word in vocab:
            n_best = [(word, probs[word])]
        else:
            suggestions = vocab.intersection(edit_one_letter(word))
            if not suggestions:
                suggestions = vocab.intersection(edit_two_letters(word))
            if suggestions:
                probabilities = list(reversed(sorted([(probs.get(suggestion, 0), suggestion)
                                    for suggestion in suggestions])))
                n_best = [(word, probability) for (probability, word) in probabilities[:n]]
    
        if verbose:
            print("entered word = ", word, "\nsuggestions = ", suggestions)
    
        return n_best
    
    word = "dbadd"
    test = get_corrections(word, probs=corpus.probabilities, vocab=corpus.vocabulary, n=2, verbose=True)
    print(test)
    
    entered word =  dbadd 
    suggestions =  {'bade', 'band', 'add', 'dead', 'bad'}
    [('dead', 0.0006341627186928787), ('bad', 0.0002051702913418137)]
    
    word = "days"
    test = get_corrections(word, probs=corpus.probabilities, vocab=corpus.vocabulary, n=2, verbose=True)
    assert len(test) == 1, test
    print(test)
    
    entered word =  days 
    suggestions =  []
    [('days', 0.0004103405826836274)]
    
    # Test your implementation - feel free to try other words in my word
    my_word = 'dys'
    tmp_corrections = get_corrections(my_word, corpus.probabilities, set(corpus.words), 2, verbose=True) # keep verbose=True
    for i, word_prob in enumerate(tmp_corrections):
        print(f"word {i}: {word_prob[0]}, probability {word_prob[1]:.6f}")
    
    print(f"data type of corrections {type(tmp_corrections)}")
    
    expected = 0.000410
    actual = tmp_corrections[0][1]
    assert math.isclose(expected, actual, abs_tol=1e-6), actual
    
    expected = 0.000019
    actual = tmp_corrections[1][1]
    assert math.isclose(expected, actual, abs_tol=1e-6), actual
    
    entered word =  dys 
    suggestions =  {'days', 'dye'}
    word 0: days, probability 0.000410
    word 1: dye, probability 0.000019
    data type of corrections <class 'list'>
    

End

The next step is to write some code to find the Minimum Edit Distance needed to transform one word into another word.

A Suggestor

Imports

# pypi
import attr

# this repository
from neurotic.nlp.autocorrect.edits import TheEditor

The Suggestor

@attr.s(auto_attribs=True)
class WordSuggestor:
    """Suggests Words for Autocorrection

    Args:
     corpus: a Corpus Builder object
     suggestions: number of suggestions to return for each word
     want_switches: also do the =switch= edit
    """
    corpus: object
    suggestions: int=2
    want_switches: bool=True

Edit One Letter

def one_letter_edits(self, word: str) -> set:
    """Get all possible words one edit away from the original

    Args:
      word: word for which we will generate all possible words one edit away.

    Returns:
      set of words with one possible edit.
    """    
    editor = TheEditor(word)
    edits = editor.replaced + editor.inserted + editor.deleted
    if self.want_switches:
        edits += editor.switched
    return set(edits)

Two-Letter Edits

def two_letter_edits(self, word: str) -> set:
    """Make two-letter edits

    Args:
      word: the input string/word 

    Returns:
      set of strings with all possible two-letter edits
    """
    ones = self.one_letter_edits(word)
    return set.union(*(self.one_letter_edits(one) for one in ones))

The Call

def __call__(self, word: str) -> list:
    """Finds the closest words to the word

    If the word is in our corpus then it just returns the word

    Args:
     word: potential word to correct

    Returns:
     list of (word, probability) tuples
    """
    if word in self.corpus.vocabulary:
        best = [(word, self.corpus.probabilities[word])]
    else:
        suggestions = self.corpus.vocabulary.intersection(self.one_letter_edits(word))
        if not suggestions:
            suggestions = self.corpus.vocabulary.intersection(self.two_letter_edits(word))
        if suggestions:
            probabilities = list(reversed(sorted(
                [(self.corpus.probabilities.get(suggestion, 0), suggestion)
                 for suggestion in suggestions])))
            best = [(word, probability)
                    for (probability, word) in probabilities[
                            :self.suggestions]]
        else:
            best = [(word, 0)]
    return best

Test the Suggestor

from neurotic.nlp.autocorrect.suggestor import WordSuggestor
suggestor = WordSuggestor(corpus=corpus, suggestions=2)
# this doesn't have any one-letter-edits in the corpus so it won't return anything
# unless the two-letter-edits is working
word = "dbadd"
test = suggestor(word)
print(test)
[('dead', 0.0006341627186928787), ('bad', 0.0002051702913418137)]
word = "days"
test = suggestor(word)
assert len(test) == 1, test
assert test[0][0] == word
print(test)
[('days', 0.0004103405826836274)]
word = 'dys'
tmp_corrections = suggestor(word)
for i, word_prob in enumerate(tmp_corrections):
    print(f"word {i}: {word_prob[0]}, probability {word_prob[1]:.6f}")

print(f"data type of corrections {type(tmp_corrections)}")

expected = 0.000410
actual = tmp_corrections[0][1]
assert math.isclose(expected, actual, abs_tol=1e-6), actual

expected = 0.000019
actual = tmp_corrections[1][1]
assert math.isclose(expected, actual, abs_tol=1e-6), actual
word 0: days, probability 0.000410
word 1: dye, probability 0.000019
data type of corrections <class 'list'>

Autocorrect System: Edits

Beginning

This is part of a series that's introduced in Autocorrect: The System. In the previous post of this series we computed \(P(w_i)\) for all the words in the corpus, so now we'll write a few functions to manipulate strings so that we can edit the erroneous strings and return the right spellings of the words. In this section, we'll implement four functions:

  • delete_letter: given a word, it returns all the possible strings that have one character removed.
  • switch_letter: given a word, it returns all the possible strings that have two adjacent letters switched.
  • replace_letter: given a word, it returns all the possible strings that have one character replaced by another different letter.
  • insert_letter: given a word, it returns all the possible strings that have an additional character inserted.

Middle

Delete Letter

We're going to implement a function that, given a word, returns a list of strings with one character deleted.

def delete_letter(word: str, verbose: bool=False) -> list:
    """Delete one letter at a time

    Args:
      word: the string/word for which you will generate all possible words 
               in the vocabulary which have 1 missing character
    Returns:
      delete_l: a list of all possible strings obtained by deleting 1 character from word
    """

    delete_l = []
    split_l = []

    split_l = [(word[:index], word[index:]) for index in range(len(word) + 1)]
    delete_l = [left + right[1:] for left, right in split_l if right]

    if verbose:
        print(f"input word {word}, \nsplit_l = {split_l}, \ndelete_l = {delete_l}")

    return delete_l
delete_word_l = delete_letter(word="cans",
                        verbose=True)

assert delete_word_l == ['ans', 'cns', 'cas', 'can']
input word cans, 
split_l = [('', 'cans'), ('c', 'ans'), ('ca', 'ns'), ('can', 's'), ('cans', '')], 
delete_l = ['ans', 'cns', 'cas', 'can']
deleted = len(delete_letter('at'))
print(f"Number of outputs of delete_letter('at') is {deleted}")
assert deleted == 2
Number of outputs of delete_letter('at') is 2

Switch Letter

Now implement a function that switches two letters in a word. It takes in a word and returns a list of all the possible switches of two letters that are adjacent to each other.

  • For example, given the word 'eta', it returns {'eat', 'tea'}, but does not return 'ate'.
def switch_letter(word: str, verbose: bool=False) -> list:
    """Switches pairs of adjacent letters

    Args:
      word: input string
    Returns:
      switches: a list of all possible strings with one adjacent charater switched
    """

    switch_l = []
    split_l = []

    split_l = [(word[:index], word[index:]) for index in range(len(word) + 1)]
    switch_l = [left[:-1] + right[0] + left[-1] + right[1:]
                for left, right in split_l
                if left and right]

    if verbose:
        print(f"Input word = {word} \nsplit_l = {split_l} \nswitch_l = {switch_l}") 

    return switch_l
switch_word_l = switch_letter(word="eta",
                         verbose=True)
assert switch_word_l == ['tea', 'eat']
Input word = eta 
split_l = [('', 'eta'), ('e', 'ta'), ('et', 'a'), ('eta', '')] 
switch_l = ['tea', 'eat']
switches = len(switch_letter('at'))
print(f"Number of outputs of switch_letter('at') is {switches}")
assert switches == 1
Number of outputs of switch_letter('at') is 1

Replace Letter

Now implement a function that takes in a word and returns a list of strings with one replaced letter from the original word.

  • Step 1: is the same as in `delete_letter()`
  • Step 2: A list comprehension or for loop which form strings by replacing letters. This can be of the form:

[f(a,b,c) for a, b in splits if condition for c in string] Note the use of the second for loop. It is expected in this routine that one or more of the replacements will include the original word. For example, replacing the first letter of 'ear' with 'e' will return 'ear'.

  • Step 3: Remove the original input letter from the output.
def replace_letter(word: str, verbose: bool=False) -> list:
    """Replace each letter in the string with another letter in the alphabet

    Args:
      word: the input string/word 

    Returns:
      replaces: a list of all possible strings where we replaced one letter from the original word. 
    """

    letters = 'abcdefghijklmnopqrstuvwxyz'
    replace_l = []
    split_l = []

    split_l = [(word[:index], word[index:]) for index in range(len(word) + 1)]
    replace_l = [left + letter + right[1:] for left, right in split_l if right
                for letter in letters]
    replace_set = set(replace_l)
    replace_set.discard(word)

    replace_l = sorted(list(replace_set))

    if verbose:
        print(f"Input word = {word} \nsplit_l = {split_l} \nreplace_l {replace_l}")   

    return replace_l
word = "can"
replace_l = replace_letter(word=word,
                              verbose=True)
expected_replacements = (len(word) * 26) - len(word)
assert len(replace_l) == expected_replacements
print(f"Replacements: {len(replace_l)}")
expected = ['aan', 'ban', 'caa', 'cab', 'cac', 'cad', 'cae', 'caf', 'cag', 'cah', 'cai', 'caj', 'cak', 'cal', 'cam', 'cao', 'cap', 'caq', 'car', 'cas', 'cat', 'cau', 'cav', 'caw', 'cax', 'cay', 'caz', 'cbn', 'ccn', 'cdn', 'cen', 'cfn', 'cgn', 'chn', 'cin', 'cjn', 'ckn', 'cln', 'cmn', 'cnn', 'con', 'cpn', 'cqn', 'crn', 'csn', 'ctn', 'cun', 'cvn', 'cwn', 'cxn', 'cyn', 'czn', 'dan', 'ean', 'fan', 'gan', 'han', 'ian', 'jan', 'kan', 'lan', 'man', 'nan', 'oan', 'pan', 'qan', 'ran', 'san', 'tan', 'uan', 'van', 'wan', 'xan', 'yan', 'zan']
assert replace_l == expected
Input word = can 
split_l = [('', 'can'), ('c', 'an'), ('ca', 'n'), ('can', '')] 
replace_l ['aan', 'ban', 'caa', 'cab', 'cac', 'cad', 'cae', 'caf', 'cag', 'cah', 'cai', 'caj', 'cak', 'cal', 'cam', 'cao', 'cap', 'caq', 'car', 'cas', 'cat', 'cau', 'cav', 'caw', 'cax', 'cay', 'caz', 'cbn', 'ccn', 'cdn', 'cen', 'cfn', 'cgn', 'chn', 'cin', 'cjn', 'ckn', 'cln', 'cmn', 'cnn', 'con', 'cpn', 'cqn', 'crn', 'csn', 'ctn', 'cun', 'cvn', 'cwn', 'cxn', 'cyn', 'czn', 'dan', 'ean', 'fan', 'gan', 'han', 'ian', 'jan', 'kan', 'lan', 'man', 'nan', 'oan', 'pan', 'qan', 'ran', 'san', 'tan', 'uan', 'van', 'wan', 'xan', 'yan', 'zan']
Replacements: 75
word = "at"
replacements = len(replace_letter(word))
print(f"Number of outputs of replace_letter('at') is {replacements}")

expected = (len(word) * 26) - len(word)
assert expected == replacements
Number of outputs of replace_letter('at') is 50

Insert Letter

Now implement a function that takes in a word and returns a list with a letter inserted at every offset.

  • Step 1: is the same as in `delete_letter()`
  • Step 2: This can be a list comprehension of the form: [f(a,b,c) for a, b in splits if condition for c in string]
def insert_letter(word: str, verbose: bool=False) -> list:
    """Stick a letter before and after each letter in the word

    Args:
      word: the input string/word 

    Returns:
      inserts: a set of all possible strings with one new letter inserted at every offset
    """
    letters = 'abcdefghijklmnopqrstuvwxyz'
    insert_l = []
    split_l = []

    split_l = [(word[:index], word[index:]) for index in range(len(word) + 1)]
    insert_l = [left + letter + right for left, right in split_l for letter in letters]

    if verbose:
        print(f"Input word {word} \nsplit_l = {split_l} \ninsert_l = {insert_l}")

    return insert_l
word = "at"
insert_l = insert_letter(word, True)
inserted = len(insert_l)
print(f"Number of strings output by insert_letter('at') is {inserted}")

assert inserted == (len(word) + 1) * 26

expected = ['aat', 'bat', 'cat', 'dat', 'eat', 'fat', 'gat', 'hat', 'iat', 'jat', 'kat', 'lat', 'mat', 'nat', 'oat', 'pat', 'qat', 'rat', 'sat', 'tat', 'uat', 'vat', 'wat', 'xat', 'yat', 'zat', 'aat', 'abt', 'act', 'adt', 'aet', 'aft', 'agt', 'aht', 'ait', 'ajt', 'akt', 'alt', 'amt', 'ant', 'aot', 'apt', 'aqt', 'art', 'ast', 'att', 'aut', 'avt', 'awt', 'axt', 'ayt', 'azt', 'ata', 'atb', 'atc', 'atd', 'ate', 'atf', 'atg', 'ath', 'ati', 'atj', 'atk', 'atl', 'atm', 'atn', 'ato', 'atp', 'atq', 'atr', 'ats', 'att', 'atu', 'atv', 'atw', 'atx', 'aty', 'atz']

assert expected == insert_l
Input word at 
split_l = [('', 'at'), ('a', 't'), ('at', '')] 
insert_l = ['aat', 'bat', 'cat', 'dat', 'eat', 'fat', 'gat', 'hat', 'iat', 'jat', 'kat', 'lat', 'mat', 'nat', 'oat', 'pat', 'qat', 'rat', 'sat', 'tat', 'uat', 'vat', 'wat', 'xat', 'yat', 'zat', 'aat', 'abt', 'act', 'adt', 'aet', 'aft', 'agt', 'aht', 'ait', 'ajt', 'akt', 'alt', 'amt', 'ant', 'aot', 'apt', 'aqt', 'art', 'ast', 'att', 'aut', 'avt', 'awt', 'axt', 'ayt', 'azt', 'ata', 'atb', 'atc', 'atd', 'ate', 'atf', 'atg', 'ath', 'ati', 'atj', 'atk', 'atl', 'atm', 'atn', 'ato', 'atp', 'atq', 'atr', 'ats', 'att', 'atu', 'atv', 'atw', 'atx', 'aty', 'atz']
Number of strings output by insert_letter('at') is 78
word = "at"
inserted = len(insert_letter(word))
print(f"Number of outputs of insert_letter('at') is {inserted}")

expected = (len(word) + 1) * 26
assert expected == inserted
Number of outputs of insert_letter('at') is 78

End

The Editor

Now to bundle it up for later.

Imports

# python
from string import ascii_lowercase
# from pypi
import attr

The Editor Class

@attr.s(auto_attribs=True)
class TheEditor:
    """Does various edits to words

    Args:
     word: string to edit
    """
    word: str
    _splits: list=None
    _deleted: list=None
    _switched: list=None
    _replaced: list=None
    _inserted: list=None

Splits

A list of splits.

@property
def splits(self) -> list:
    """Tuples of splits for word"""
    if self._splits is None:
        self._splits = [(self.word[:index], self.word[index:])
                        for index in range(len(self.word) + 1)]
    return self._splits

Deleted

@property
def deleted(self) -> list:
    """Deletes one letter at a time from the word

    Returns:
     list of all possible strings created by deleting one letter
    """
    if self._deleted is None:
        self._deleted = [left + right[1:]
                         for left, right in self.splits if right]
    return self._deleted

Switched

@property
def switched(self) -> list:
    """switches one letter pair at a time

    Returns:
     all possible strings with one adjacent charater switched
    """
    if self._switched is None:
        self._switched = [left[:-1] + right[0] + left[-1] + right[1:]
                          for left, right in self.splits
                          if left and right]
    return self._switched

Replace a Letter

@property
def replaced(self) -> list:
    """Replace each letter with every other letter of the alphabet

    Returns:
     replacements in alphabetical order (doesn't include original word)
    """
    if self._replaced is None:
        self._replaced = set([left + letter + right[1:]
                              for left, right in self.splits if right
                              for letter in ascii_lowercase])
        self._replaced.discard(self.word)
        self._replaced = sorted(list(self._replaced))
    return self._replaced

Insert Letters

@property
def inserted(self) -> list:
    """Adds letters before and after each letter

    Returns:
      all possible strings with one new letter inserted at every offset
    """
    if self._inserted is None:
        self._inserted = [left + letter + right
                          for left, right in self.splits
                          for letter in ascii_lowercase]
    return self._inserted

Checking the Editor

from neurotic.nlp.autocorrect.edits import TheEditor

editor = TheEditor(word="cans")

# splits
expected = [('', 'cans'), ('c', 'ans'), ('ca', 'ns'), ('can', 's'), ('cans', '')]
assert editor.splits == expected, editor.splits

# deletions
expected = ['ans', 'cns', 'cas', 'can']

assert editor.deleted == expected

# switches
word = "eta"
editor = TheEditor(word=word)
expected = ['tea', 'eat']
assert editor.switched == expected

editor = TheEditor(word="at")
switches = len(editor.switched)
print(f"Number of outputs of switch_letter('at') is {switches}")
assert switches == 1

# replacements
word = "can"
editor = TheEditor(word)
replacements = editor.replaced
expected = (len(word) * 26) - len(word)
assert len(replacements) == expected, f"expected: {expected} actual: {len(replacements)}"

expected = ['aan', 'ban', 'caa', 'cab', 'cac', 'cad', 'cae', 'caf', 'cag', 'cah', 'cai', 'caj', 'cak', 'cal', 'cam', 'cao', 'cap', 'caq', 'car', 'cas', 'cat', 'cau', 'cav', 'caw', 'cax', 'cay', 'caz', 'cbn', 'ccn', 'cdn', 'cen', 'cfn', 'cgn', 'chn', 'cin', 'cjn', 'ckn', 'cln', 'cmn', 'cnn', 'con', 'cpn', 'cqn', 'crn', 'csn', 'ctn', 'cun', 'cvn', 'cwn', 'cxn', 'cyn', 'czn', 'dan', 'ean', 'fan', 'gan', 'han', 'ian', 'jan', 'kan', 'lan', 'man', 'nan', 'oan', 'pan', 'qan', 'ran', 'san', 'tan', 'uan', 'van', 'wan', 'xan', 'yan', 'zan']
assert replacements == expected

word = "at"
editor = TheEditor(word)
expected = (len(word) * 26) - len(word)
assert expected == len(editor.replaced)

# Insertions
inserted = len(editor.inserted)
assert inserted == (len(word) + 1) * 26

expected = ['aat', 'bat', 'cat', 'dat', 'eat', 'fat', 'gat', 'hat', 'iat', 'jat', 'kat', 'lat', 'mat', 'nat', 'oat', 'pat', 'qat', 'rat', 'sat', 'tat', 'uat', 'vat', 'wat', 'xat', 'yat', 'zat', 'aat', 'abt', 'act', 'adt', 'aet', 'aft', 'agt', 'aht', 'ait', 'ajt', 'akt', 'alt', 'amt', 'ant', 'aot', 'apt', 'aqt', 'art', 'ast', 'att', 'aut', 'avt', 'awt', 'axt', 'ayt', 'azt', 'ata', 'atb', 'atc', 'atd', 'ate', 'atf', 'atg', 'ath', 'ati', 'atj', 'atk', 'atl', 'atm', 'atn', 'ato', 'atp', 'atq', 'atr', 'ats', 'att', 'atu', 'atv', 'atw', 'atx', 'aty', 'atz']

assert expected == editor.inserted

word = "at"
editor = TheEditor(word)
inserted = len(editor.inserted)
print(f"Number of outputs of insert_letter('at') is {inserted}")

expected = (len(word) + 1) * 26
assert expected == inserted

Autocorrect System: Data Preprocessing

Beginning

This is part of a series that builds an autocorrect system. The introduction is this post.

Imports

# python
from collections import Counter
from pathlib import Path

import math
import os
import re

# pypi
from dotenv import load_dotenv

Set Up

The Environment

This loads our environment variables.

load_dotenv("posts/nlp/.env", override=True)

Middle

Our corpus is going to come from a text file with some plays of Shakespeare in it.

The Process Data Function

def process_data(file_name: str) -> list:
    """
    pre-processes the text file

    Note:
     The original assignment assumes the file will be in the same directory as 
    the code - so it's called file_name for now but it's really the path to 
    the file

    Args: 
       a path to the text file

    Returns: 
       words: list of all words in the corpus (text file you read) lower-cased
    """
    words = []

    with open(file_name) as lines:
        for line in lines:
            tokens = re.findall("\w+", line)
            words += [token.strip().lower() for token in tokens]

    return words
words = process_data(os.environ["SHAKESPEARE"])
vocabulary = set(words)  # this will be your new vocabulary
first_ten = words[:10]
print(f"The first ten words in the text are: \n{first_ten}")
print(f"There are {len(vocabulary)} unique words in the vocabulary.")

expected = "o for a muse of fire that would ascend the"
actual = " ".join(first_ten)
assert expected == actual, actual
assert len(vocabulary) == 6116
The first ten words in the text are: 
['o', 'for', 'a', 'muse', 'of', 'fire', 'that', 'would', 'ascend', 'the']
There are 6116 unique words in the vocabulary.

Get Count

This creates a dictionary of word: count pairs.

def get_count(word_l: list) -> Counter:
    """Creates a counter for the words

    Args:
       word_l: a list of words representing the corpus. 

    Returns:
       word_counter: word-frequency dictionary
    """
    word_count_dict = Counter(word_l)
    return word_count_dict
word_counter = get_count(words)
print(f"There are {len(word_counter)} key values pairs")
print(f"The count for the word 'thee' is {word_counter['thee']}")
assert len(word_counter) == 6116
assert word_counter['thee'] == 240
There are 6116 key values pairs
The count for the word 'thee' is 240

Get Probability

Given the dictionary of word counts, compute the probability that each word will appear if randomly selected from the corpus of words.

\[ P(w_i) = \frac{C(w_i)}{M} \tag{Equation-2} \]

where

\(C(w_i)\) is the total number of times \(w_i\) appears in the corpus.

M is the total number of words in the corpus.

For example, the probability of the word 'am' in the sentence 'I am happy because I am learning' is:

\[ P(am) = \frac{C(w_i)}{M} = \frac {2}{7} \tag{Equation-3} \]

def get_probs(word_count_dict: Counter) -> dict:
    """Calculates the probability of each word

    Args:
      word_count_dict: word:frequency dictionary

    Returns:
      probs: word:probability of word dictionary
    """
    probs = {}  # return this variable correctly

    ### START CODE HERE ###
    total = sum(word_count_dict.values())
    probs = {word: word_count_dict[word]/total for word in word_count_dict}
    ### END CODE HERE ###
    return probs
probabilities = get_probs(word_counter)
print(f"Length of probabilities is {len(probabilities)}")
thee_probability = probabilities["thee"]
print(f"P('thee') is {thee_probability:.4f}")
assert len(probabilities) == 6116
assert math.isclose(thee_probability, 0.0045, abs_tol=1e-04), thee_probability
Length of probabilities is 6116
P('thee') is 0.0045

End

Now that we have the skeleton I'll put it all into a class to make it easier to use it in another notebook.

Imports

# python
from collections import Counter
from pathlib import Path

import math
import os
import re

# pypi
import attr

Corpus Builder

@attr.s(auto_attribs=True)
class CorpusBuilder:
    """Builds the autocorrect corpus counts

    Args:
     path: Path to the corpus source file
    """
    path: Path
    _words: list=None
    _counts: Counter=None
    _probabilities: dict=None
    _vocabulary: set=None

Corpus Words

@property
def words(self) -> list:
    """
    The processed words from the source file

    Returns: 
      words: list of all words in the corpus lower-cased
    """
    if self._words is None:
        with self.path.open() as lines:
            tokenized = (re.findall("\w+", line) for line in lines)
            self._words = [word.strip().lower() for sublist in tokenized for word in sublist]
    return self._words

Corpus Word Counts

@property
def counts(self) -> Counter:
    """The counter for the words in the corpus

    Returns:
     word: word-frequency counter
    """
    if self._counts is None:
        self._counts = Counter(self.words)
    return self._counts

Corpus Word Probabilities

@property
def probabilities(self) -> dict:
    """The probability for each word in the corpus

    Returns:
     word:probability dictionary
    """
    if self._probabilities is None:
        total = sum(self.counts.values())
        self._probabilities = {word: self.counts[word]/total
                               for word in self.counts}
    return self._probabilities

Vocabulary

The final code is going to use set operations so for convenience I'll duplicate the words as a set.

@property
def vocabulary(self) -> set:
    """The set of vocabulary words"""
    if self._vocabulary is None:
        self._vocabulary = set(self.words)
    return self._vocabulary

Testing the Corpus

from neurotic.nlp.autocorrect.preprocessing import CorpusBuilder

path = Path(os.environ["SHAKESPEARE"])
assert path.is_file()

corpus = CorpusBuilder(path)

words = corpus.words
vocabulary = corpus.vocabulary  # this will be your new vocabulary
first_ten = words[:10]
print(f"The first ten words in the text are: \n{first_ten}")
print(f"There are {len(vocabulary)} unique words in the vocabulary.")

expected = "o for a muse of fire that would ascend the"
actual = " ".join(first_ten)
assert expected == actual, actual
assert len(vocabulary) == 6116
The first ten words in the text are: 
['o', 'for', 'a', 'muse', 'of', 'fire', 'that', 'would', 'ascend', 'the']
There are 6116 unique words in the vocabulary.
word_counter = corpus.counts
print(f"There are {len(word_counter)} key values pairs")
print(f"The count for the word 'thee' is {word_counter['thee']}")
assert len(word_counter) == 6116
assert word_counter['thee'] == 240
There are 6116 key values pairs
The count for the word 'thee' is 240
probabilities = corpus.probabilities
print(f"Length of probabilities is {len(probabilities)}")
thee_probability = probabilities["thee"]
print(f"P('thee') is {thee_probability:.4f}")
assert len(probabilities) == 6116
assert math.isclose(thee_probability, 0.0045, abs_tol=1e-04), thee_probability
Length of probabilities is 6116
P('thee') is 0.0045

So, now we have a corpus builder. In the next part - Autocorrect System: Edits - we'll implement some functions to help with creating candidate replacements using edits.

Autocorrect: The System

Edit Distance

In this series of posts we'll implement models that correct words that are 1 and 2 edit distances away.

  • We say two words are n edit distance away from each other when we need n edits to change one word into another.

An edit could consist of one of the following options:

  • Delete (remove a letter): ‘hat’ => ‘at, ha, ht’
  • Switch (swap 2 adjacent letters): ‘eta’ => ‘eat, tea,…’
  • Replace (change 1 letter to another): ‘jat’ => ‘hat, rat, cat, mat, …’
  • Insert (add a letter): ‘te’ => ‘the, ten, ate, …’

We'll be using the four methods above to implement an Auto-correct system by computing the probabilities that a certain word is correct given an input word.

This auto-correct system was first created by Peter Norvig in 2007 in this article.

The goal of our spell check model is to compute the following probability:

\[ P(c|w) = \frac{P(w|c)\times P(c)}{P(w)} \tag{Equation 1} \]

The equation above is Bayes Rule, and it is saying that the probability of a word being correct \(P(c|w)\) is equal to the probability of having a certain word w, given that it is correct \(P(w|c)\), multiplied by the probability of being correct in general \(P(C)\) divided by the probability of that word w appearing \(P(w)\) in general.

Autocorrect: Finding Candidates Using Edits

Beginning

Middle

Our Data

This is the word we are going to autocorrect.

word = "dearz"

Splits

This finds all the splits in our word.

splits = [(word[:index], word[index:]) for index in range(len(word) + 1)]
for split in splits:
    print(split)
('', 'dearz')
('d', 'earz')
('de', 'arz')
('dea', 'rz')
('dear', 'z')
('dearz', '')

Delete

Starting with the splits, delete the first letter of the second element of each tuple. This means that each letter gets deleted exactly once.

deletes = [left + right[1:] for left, right in splits if right]
for deleted in deletes:
    print(f" - {deleted}")
- earz
- darz
- derz
- deaz
- dear

These are now the candidates to use for the correction.

Filter Out

Since not all the candidates are real words you need to filter out all but the real candidates using a pre-defined vocabulary.

vocabulary = ['dean','deer','dear','fries','and','coke']
candidates = set(vocabulary).intersection(set(deletes))
print(candidates)
{'dear'}

End

This doesn't demonstrate all the edits (nor the use of edit distance) but the remaining types of edits are done in a similar manner to these two.

Autocorrect: Building the Vocabulary

Beginning

# python
from collections import Counter
from functools import partial

import re

# pypi
import holoviews
import hvplot.pandas
import pandas

# my stuff
from graeae import EmbedHoloviews

Set Up

Plotting

SLUG = "autocorrect-building-the-vocabulary"
Embed = partial(EmbedHoloviews,
                folder_path=f"files/posts/nlp/{SLUG}/")

Middle

Set Up the Corpus

text = 'red pink pink blue blue yellow ORANGE BLUE BLUE PINK'
print(text)
print(f"String Length: {len(text)}")
red pink pink blue blue yellow ORANGE BLUE BLUE PINK
String Length: 52

Preprocessing

Lowercasing

text_lowercased = text.lower()
print(text_lowercased)
red pink pink blue blue yellow orange blue blue pink

Tokenizing

ALPHANUMERIC_UNDERSCORE = r"\w"
ONE_OR_MORE = r"+"

TOKEN = ALPHANUMERIC_UNDERSCORE + ONE_OR_MORE
tokens = re.findall(TOKEN, text_lowercased)
print(f"Tokens: {len(tokens)}")
Tokens: 10

Create the Vocabulary

First Way: Distinct Words

vocabulary = set(tokens)
print(vocabulary)
print(f"Count: {len(vocabulary)}")
{'pink', 'red', 'orange', 'blue', 'yellow'}
Count: 5

Second Way: Add Word Counts

  • With a Dictionary
    counts_from_dict = {token: tokens.count(token) for token in tokens}
    print(counts_from_dict)
    print(f"Unique: {len(counts_from_dict)}")
    
    {'red': 1, 'pink': 3, 'blue': 4, 'yellow': 1, 'orange': 1}
    Unique: 5
    
  • With a Counter
    counts_from_counter = Counter(tokens)
    print(counts_from_counter)
    print(f"Unique: {len(counts_from_counter)}")
    for key, count in counts_from_counter.items():
        assert count == counts_from_dict[key]
    
    Counter({'blue': 4, 'pink': 3, 'red': 1, 'yellow': 1, 'orange': 1})
    Unique: 5
    

Plot the Vocabulary

keys = list(counts_from_counter.keys())
colors = holoviews.Cycle(values=keys)
data = pandas.DataFrame(dict(
    Count=list(counts_from_counter.values()),
    Token=keys)
                        )
plot = data.hvplot.bar(x="Token", y="Count").opts(
    title="Token Counts",
    width=990,
    height=780,
    fontscale=2,
    color=colors,
    color_index="Token"
)

outcome = Embed(plot=plot, file_name="token_counts")()
print(outcome)

Figure Missing

End

This is the basic way that we'll be creating a vocabulary for the autocorrect feature.

Locality-Sensitive Hashing (LSH) for Machine Translation

Beginning

This is a continuation of the post in which we implemented k-Nearest Neighbors. It's part of a series of posts on building an English to French translator whose links are gathered in the this post.

Imports

# python
import math

# pypi
from dotenv import load_dotenv
from nltk.corpus import twitter_samples

import numpy

# my code
from graeae import Timer
from neurotic.nlp.word_embeddings.embeddings import EmbeddingsLoader
from neurotic.nlp.twitter.processor import TwitterProcessor

Set Up

The Timer

TIMER = Timer()

The Environment

load_dotenv("posts/nlp/.env")

The Tweets

positive_tweets = twitter_samples.strings("positive_tweets.json")
negative_tweets = twitter_samples.strings("negative_tweets.json")
tweets = positive_tweets + negative_tweets

The Twitter Processor

process_tweet = TwitterProcessor()

The Embeddings Loader

embeddings = EmbeddingsLoader()

Middle

Locality-Sensitive Hashing (LSH)

In this part of the assignment, you will implement a more efficient version of k-nearest neighbors using locality sensitive hashing. You will then apply this to document search.

  • Process the tweets and represent each tweet as a vector (represent a document with a vector embedding).
  • Use locality sensitive hashing and k nearest neighbors to find tweets that are similar to a given tweet.

3.1 Getting the document embeddings

  • Bag-of-words (BOW) document models

    Text documents are sequences of words.

    • The ordering of words makes a difference. For example, sentences "Apple pie is better than pepperoni pizza." and "Pepperoni pizza is better than apple pie" have opposite meanings due to the word ordering.
    • However, for some applications, ignoring the order of words can allow us to train an efficient and still effective model.
    • This approach is called Bag-of-words document model.

Document embeddings

  • Document embedding is created by summing up the embeddings of all words in the document.
  • If we don't know the embedding of some word, we can ignore that word.

Exercise 07: Complete the get_document_embedding() function.

  • The function get_document_embedding() encodes entire document as a "document" embedding.
  • It takes in a document (as a string) and a dictionary, en_embeddings
  • It processes the document, and looks up the corresponding embedding of each word.
    • It then sums them up and returns the sum of all word vectors of that processed tweet.
  • Hints
    • You can handle missing words easier by using the get() method of the python dictionary instead of the bracket notation (i.e. "[ ]"). See more about it here
    • The default value for missing word should be the zero vector. Numpy will broadcast simple 0 scalar into a vector of zeros during the summation.
    • Alternatively, skip the addition if a word is not in the dictonary.
    • You can use your process_tweet() function which allows you to process the tweet. The function just takes in a tweet and returns a list of words.
    # UNQ_C12 (UNIQUE CELL IDENTIFIER, DO NOT EDIT)
    def get_document_embedding(tweet, en_embeddings): 
        '''
        Input:
           - tweet: a string
           - en_embeddings: a dictionary of word embeddings
        Output:
           - doc_embedding: sum of all word embeddings in the tweet
        '''
        doc_embedding = numpy.zeros(300)
    
        ### START CODE HERE (REPLACE INSTANCES OF 'None' with your code) ###
        # process the document into a list of words (process the tweet)
        processed_doc = process_tweet(tweet)
        for word in processed_doc:
            # add the word embedding to the running total for the document embedding
            doc_embedding = doc_embedding + en_embeddings.get(word, 0)
        ### END CODE HERE ###
        return doc_embedding
    

    You do not have to input any code in this cell, but it is relevant to grading, so please do not change anything

    # testing your function
    custom_tweet = "RT @Twitter @chapagain Hello There! Have a great day. :) #good #morning http://chapagain.com.np"
    #tweet_embedding = get_document_embedding(custom_tweet, en_embeddings_subset)
    tweet_embedding = get_document_embedding(custom_tweet, embeddings.english_subset)
    
    actual = tweet_embedding[-5:]
    expected = [-0.00268555, -0.15378189, -0.55761719, -0.07216644, -0.32263184]
    assert numpy.allclose(actual, expected)
    print(actual)
    
    [-0.00268555 -0.15378189 -0.55761719 -0.07216644 -0.32263184]
    

Exercise 08

  • Store all document vectors into a dictionary

    Now, let's store all the tweet embeddings into a dictionary. Implement get_document_vecs().

    def get_document_vecs(all_docs, en_embeddings):
        '''
        Input:
           - all_docs: list of strings - all tweets in our dataset.
           - en_embeddings: dictionary with words as the keys and their embeddings as the values.
        Output:
           - document_vec_matrix: matrix of tweet embeddings.
           - ind2Doc_dict: dictionary with indices of tweets in vecs as keys and their embeddings as the values.
        '''
    
        # the dictionary's key is an index (integer) that identifies a specific tweet
        # the value is the document embedding for that document
        ind2Doc_dict = {}
    
        # this is list that will store the document vectors
        document_vec_l = []
    
        for i, doc in enumerate(all_docs):
    
            ### START CODE HERE (REPLACE INSTANCES OF 'None' with your code) ###
            # get the document embedding of the tweet
            doc_embedding = get_document_embedding(doc, en_embeddings)
    
            # save the document embedding into the ind2Tweet dictionary at index i
            ind2Doc_dict[i] = doc_embedding
    
            # append the document embedding to the list of document vectors
            document_vec_l.append(doc_embedding)
    
            ### END CODE HERE ###
    
        # convert the list of document vectors into a 2D array (each row is a document vector)
        document_vec_matrix = numpy.vstack(document_vec_l)
    
        return document_vec_matrix, ind2Doc_dict
    
    document_vecs, ind2Tweet = get_document_vecs(tweets, embeddings.english_subset)
    
    dict_length = len(ind2Tweet)
    expected = len(tweets)
    assert dict_length == expected
    print(f"length of dictionary {dict_length:,}")
    rows, columns = document_vecs.shape
    print(f"shape of document_vecs ({rows:,}, {columns})")
    assert rows == expected
    assert columns == 300
    

3.2 Looking up the tweets

Now you have a vector of dimension (m,d) where m is the number of tweets (10,000) and d is the dimension of the embeddings (300). Now you will input a tweet, and use cosine similarity to see which tweet in our corpus is similar to your tweet.

my_tweet = 'i am sad'
process_tweet(my_tweet)
tweet_embedding = get_document_embedding(my_tweet, embeddings.english_subset)

This gives you a tweet similar to your input.

def cosine_similarity(vector_1: numpy.ndarray, vector_2: numpy.ndarray) -> float:
    """Calculates the similarity between two vectors

    Args:
     vector_1: array to compare
     vector_2: array to compare to vector_1

    Returns:
     cosine similarity between the two vectors
    """
    return numpy.dot(vector_1, vector_2)/(numpy.linalg.norm(vector_1) *
                                          numpy.linalg.norm(vector_2))
idx = numpy.argmax(cosine_similarity(document_vecs, tweet_embedding))
print(tweets[idx])

3.3 Finding the most similar tweets with LSH

You will now implement locality sensitive hashing (LSH) to identify the most similar tweet. Instead of looking at all 10,000 vectors, you can just search a subset to find its nearest neighbors.

Let's say you have a set of data points, You can divide the vector space into regions and search within one region for nearest neighbors of a given vector.

N_VECS = len(tweets)       # This many vectors.
N_DIMS = document_vecs.shape[1]     # Vector dimensionality.
print(f"There are {N_VECS:,} vectors and each has {N_DIMS} dimensions.")

Choosing the number of planes

  • Each plane divides the space to 2 parts.
  • So n planes divide the space into \(2^{n}\) hash buckets.
  • We want to organize 10,000 document vectors into buckets so that every bucket has about ~16 vectors.
  • For that we need \(\frac{10000}{16}=625\) buckets.
  • We're interested in n, number of planes, so that \(2^{n}= 625\). Now, we can calculate \(n=\log_{2}625 = 9.29 \approx 10\).

We use \(\log_2(625)\) as the number of planes to have ~16 vectors/bucket.

buckets = 10000/16
print(buckets)
planes = math.ceil(numpy.log2(buckets))
print(planes)
625.0
10
N_PLANES = planes

Number of times to repeat the hashing to improve the search.

N_UNIVERSES = 25

3.4 Getting the hash number for a vector

For each vector, we need to get a unique number associated to that vector in order to assign it to a "hash bucket".

Hyperlanes in vector spaces

  • In 3-dimensional vector space, the hyperplane is a regular plane. In 2 dimensional vector space, the hyperplane is a line.
  • Generally, the hyperplane is a subspace which has dimension 1 lower than the original vector space has.
  • A hyperplane is uniquely defined by its normal vector.
  • Normal vector n of the plane \(\pi\) is the vector to which all vectors in the plane \(\pi\) are orthogonal (perpendicular in 3 dimensional case).

Using Hyperplanes to split the vector space

We can use a hyperplane to split the vector space into 2 parts.

  • All vectors whose dot product with a plane's normal vector is positive are on one side of the plane.
  • All vectors whose dot product with the plane's normal vector is negative are on the other side of the plane.

Encoding hash buckets

  • For a vector, we can take its dot product with all the planes, then encode this information to assign the vector to a single hash bucket.
  • When the vector is pointing to the opposite side of the hyperplane than normal, encode it by 0.
  • Otherwise, if the vector is on the same side as the normal vector, encode it by 1.
  • If you calculate the dot product with each plane in the same order for every vector, you've encoded each vector's unique hash ID as a binary number, like [0, 1, 1, … 0].

Exercise 09: Implementing hash buckets

We've initialized hash table hashes for you. It is list of N_UNIVERSES matrices, each describes its own hash table. Each matrix has N_DIMS rows and N_PLANES columns. Every column of that matrix is a N_DIMS-dimensional normal vector for each of N_PLANES hyperplanes which are used for creating buckets of the particular hash table.

Exercise: Your task is to complete the function hash_value_of_vector which places vector v in the correct hash bucket.

  • First multiply your vector v, with a corresponding plane. This will give you a vector of dimension \((1,\text{N_planes})\).
  • You will then convert every element in that vector to 0 or 1.
  • You create a hash vector by doing the following: if the element is negative, it becomes a 0, otherwise you change it to a 1.
  • You then compute the unique number for the vector by iterating over N_PLANES
  • Then you multiply \(2^i\) times the corresponding bit (0 or 1).
  • You will then store that sum in the variable hash_value.

Intructions: Create a hash for the vector in the function below. Use this formula:

\[ hash = \sum_{i=0}^{N-1} \left( 2^{i} \times h_{i} \right) \]

  • Create the sets of planes
    • Create multiple (25) sets of planes (the planes that divide up the region).
    • You can think of these as 25 separate ways of dividing up the vector space with a different set of planes.
    • Each element of this list contains a matrix with 300 rows (the word vectors have 300 dimensions), and 10 columns (there are 10 planes in each "universe").
    numpy.random.seed(0)
    planes_l = [numpy.random.normal(size=(N_DIMS, N_PLANES))
                for _ in range(N_UNIVERSES)]
    
    • Hints
      • numpy.squeeze() removes unused dimensions from an array; for instance, it converts a (10,1) 2D array into a (10,) 1D array
      def hash_value_of_vector(v, planes):
          """Create a hash for a vector; hash_id says which random hash to use.
      
          Input:
             - v:  vector of tweet. It's dimension is (1, N_DIMS)
             - planes: matrix of dimension (N_DIMS, N_PLANES) - the set of planes that divide up the region
          Output:
             - res: a number which is used as a hash for your vector
      
          """
          ### START CODE HERE (REPLACE INSTANCES OF 'None' with your code) ###
          # for the set of planes,
          # calculate the dot product between the vector and the matrix containing the planes
          # remember that planes has shape (300, 10)
          # The dot product will have the shape (1,10)
          assert planes.shape == (300, 10)
          assert v.shape == (1, 300)
          dot_product = numpy.dot(v, planes)
          assert dot_product.shape == (1, 10), dot_product.shape
      
          # get the sign of the dot product (1,10) shaped vector
          sign_of_dot_product = numpy.sign(dot_product)
      
          # set h to be false (equivalent to 0 when used in operations) if the sign is negative,
          # and true (equivalent to 1) if the sign is positive (1,10) shaped vector
          h = sign_of_dot_product >= 0
          assert h.shape == (1, 10)
      
          # remove extra un-used dimensions (convert this from a 2D to a 1D array)
          h = numpy.squeeze(h)
      
          # initialize the hash value to 0
          hash_value = 0
      
          n_planes = planes.shape[1]
          for i in range(n_planes):
              # increment the hash value by 2^i * h_i
              hash_value += 2**i * h[i]
          ### END CODE HERE ###
      
          # cast hash_value as an integer
          hash_value = int(hash_value)
      
          return hash_value
      
      numpy.random.seed(0)
      idx = 0
      planes = planes_l[idx]  # get one 'universe' of planes to test the function
      vec = numpy.random.rand(1, 300)
      expected = 768
      actual = hash_value_of_vector(vec, planes)
      assert expected == actual
      print(f" The hash value for this vector,",
            f"and the set of planes at index {idx},",
            f"is {actual}")
      

3.5 Creating a hash table

Exercise 10

Given that you have a unique number for each vector (or tweet), You now want to create a hash table. You need a hash table, so that given a hash_id, you can quickly look up the corresponding vectors. This allows you to reduce your search by a significant amount of time.

We have given you the make_hash_table function, which maps the tweet vectors to a bucket and stores the vector there. It returns the hash_table and the id_table. The id_table tells you which vector in a certain bucket corresponds to what tweet.

  • Hints
    • a dictionary comprehension, similar to a list comprehension, looks like this: `{i:0 for i in range(10)}`, where the key is 'i' and the value is zero for all key-value pairs.
    def make_hash_table(vecs, planes):
        """
        Input:
           - vecs: list of vectors to be hashed.
           - planes: the matrix of planes in a single "universe", with shape (embedding dimensions, number of planes).
        Output:
           - hash_table: dictionary - keys are hashes, values are lists of vectors (hash buckets)
           - id_table: dictionary - keys are hashes, values are list of vectors id's
                               (it's used to know which tweet corresponds to the hashed vector)
        """
        ### START CODE HERE (REPLACE INSTANCES OF 'None' with your code) ###
    
        # number of planes is the number of columns in the planes matrix
        num_of_planes = planes.shape[1]
    
        # number of buckets is 2^(number of planes)
        num_buckets = 2**num_of_planes
    
        # create the hash table as a dictionary.
        # Keys are integers (0,1,2.. number of buckets)
        # Values are empty lists
        hash_table = {index: [] for index in range(num_buckets)}
    
        # create the id table as a dictionary.
        # Keys are integers (0,1,2... number of buckets)
        # Values are empty lists
        id_table = {index: [] for index in range(num_buckets)}
    
        # for each vector in 'vecs'
        for i, v in enumerate(vecs):
            # calculate the hash value for the vector
            h = hash_value_of_vector(v.reshape(1, 300), planes)
    
            # store the vector into hash_table at key h,
            # by appending the vector v to the list at key h
            hash_table[h].append(v)
    
            # store the vector's index 'i' (each document is given a unique integer 0,1,2...)
            # the key is the h, and the 'i' is appended to the list at key h
            id_table[h].append(i)
    
        ### END CODE HERE ###
    
        return hash_table, id_table
    
    numpy.random.seed(0)
    planes = planes_l[0]  # get one 'universe' of planes to test the function
    vec = numpy.random.rand(1, 300)
    tmp_hash_table, tmp_id_table = make_hash_table(document_vecs, planes)
    # tmp_hash_table, tmp_id_table = make_hash_table(vec.reshape(1, 300), planes)
    
    index = 2
    print(f"The hash table at key {index} has {len(tmp_hash_table[index])} document vectors")
    print(f"The id table at key {index} has {len(tmp_id_table[index])}")
    print(f"The first 5 document indices stored at key {index} of are {tmp_id_table[index][0:5]}")
    

    Expected output

    #+RESULTS The hash table at key 0 has 3 document vectors The id table at key 0 has 3 The first 5 document indices stored at key 0 of are [3276, 3281, 3282]

    I get a hash of 2 for document 3276, not 1…

3.6 Creating all hash tables

You can now hash your vectors and store them in a hash table that would allow you to quickly look up and search for similar vectors. Run the cell below to create the hashes. By doing so, you end up having several tables which have all the vectors. Given a vector, you then identify the buckets in all the tables. You can then iterate over the buckets and consider much fewer vectors. The more buckets you use, the more accurate your lookup will be, but also the longer it will take.

  • Creating the hashtables
    hash_tables = []
    id_tables = []
    with TIMER:
        for universe_id in range(N_UNIVERSES):  # there are 25 hashes
            print('working on hash universe #:', universe_id)
            planes = planes_l[universe_id]
            hash_table, id_table = make_hash_table(document_vecs, planes)
            hash_tables.append(hash_table)
            id_tables.append(id_table)
    

Bundling It Up

<<imports>>


<<planes-universe>>

    <<planes-plane-count>>

    <<planes-planes>>


<<documents-embeddings-builder>>

    <<document-embedding>>

    <<documents-embeddings>>

    <<document-index-to-embedding>>


<<hash-table>>

    <<hash-value-of-vector>>

    <<hash-hashes>>

    <<hash-table-table>>

    <<hash-index-table>>



<<hash-tables>>

    <<hash-tables-hash>>

    <<hash-tables-index>>

Imports

# python
import math

# pypi
import attr
import numpy

Planes Universe

@attr.s(auto_attribs=True)
class PlanesUniverse:
    """Creates set of planes with a random mormal distribution of points


    Args:
     vector_count: number of vectors that will be hashed
     dimensions: number of columns per vector
     universes: number of universes to create
     vectors_per_bucket: how many vectors we want in each
     random_seed: value to seed the random number generator
    """
    vector_count: int
    dimensions: int
    universes: int
    vectors_per_bucket: int
    random_seed: int=0
    _plane_count: int=None
    _planes: list=None
  • Plane Count
    @property
    def plane_count(self) -> int:
        """The number of planes to create
    
        Uses the number of vectors and desired vectors per bucket
        """
        if self._plane_count is None:
            buckets = self.vector_count/self.vectors_per_bucket
            self._plane_count = math.ceil(numpy.log2(buckets))
        return self._plane_count
    
  • Planes

    The list of planes.

    @property
    def planes(self) -> list:
        """The list of planes"""
        if self._planes is None:
            numpy.random.seed(self.random_seed)
            self._planes = [numpy.random.normal(size=(self.dimensions,
                                                      self.plane_count))
                            for _ in range(self.universes)]
        return self._planes
    

Documents Embeddings Builder

@attr.s(auto_attribs=True)
class DocumentsEmbeddings:
    """Builds embeddings for documents from their words

    Args:
     embeddings: word-embeddings for the documents
     process: callable to pre-process documents
     documents: documents (strings) to hash
    """
    embeddings: dict
    process: object
    documents: list
    _documents_embeddings: numpy.ndarray=None
    _document_index_to_embedding: dict=None
  • Getting the Document Embeddings
    def document_embedding(self, document: str) -> numpy.ndarray: 
        """sums the embeddings for words in the document
    
        Args:
          - document: string to tokenize and build embedding for
    
        Returns:
          - embedding: sum of all word embeddings in the document
        """
        rows = len(next(iter(self.embeddings.values())))
        embedding = numpy.zeros(rows)
        words = self.process(document)
        # adding the zeros means you always return an array, not just the number 0
        # if none of the words in the document are in the embeddings
        return embedding + sum((self.embeddings.get(word, 0) for word in words))
    
  • Documents Embeddings
    @property
    def documents_embeddings(self) -> numpy.ndarray:
        """array of embeddings for each document in documents"""
        if self._documents_embeddings is None:
            self._documents_embeddings = numpy.vstack(
                [self.document_embedding(document) for document in self.documents])
        return self._documents_embeddings
    
  • Document Index to Embeddings
    @property
    def document_index_to_embedding(self) -> dict:
        """maps document index (from self.documents) to embedding"""
        if self._document_index_to_embedding is None:
            self._document_index_to_embedding = {
                index: embedding for index, embedding in enumerate(
                    self.documents_embeddings)}
        return self._document_index_to_embedding
    

Hash Table Builder

@attr.s(auto_attribs=True)
class HashTable:
    """Builds the hash-table for embeddings

    Args:
     planes: matrix of planes to divide into hash table
     vectors: vectors to be hashed
    """
    planes: numpy.ndarray
    vectors: numpy.ndarray
    _hashes: list=None
    _hash_table: dict=None
    _index_table: dict=None
  • Vector Hash Value
    def hash_value(self, vector: numpy.ndarray) -> int:
        """
        Create a hash for a vector
    
        Args:
         - vector:  vector of tweet. It's dimension is (1, N_DIMS)
    
        Returns:
          - res: a number which is used as a hash for your vector
        """
        rows, columns = self.planes.shape
        # assert vector.shape == (1, rows), vector.shape
        dot_product = numpy.dot(vector, self.planes)
        #assert dot_product.shape == (1, columns), dot_product.shape
    
        sign_of_dot_product = numpy.sign(dot_product)
        hashes = sign_of_dot_product >= 0
        assert hashes.shape == dot_product.shape
    
        # remove extra un-used dimensions (convert this from a 2D to a 1D array)
        hashes = numpy.squeeze(hashes)
        hash_value = 0
    
        for column in range(columns):
            hash_value += 2**column * hashes[column]
        return int(hash_value)
    
  • Hashes
    @property
    def hashes(self) -> list:
        """Vector hashes"""
        if self._hashes is None:
            self._hashes = [self.hash_value(vector) for vector in self.vectors]
        return self._hashes
    
  • Hash Table Build
    @property
    def hash_table(self) -> dict:
        """Hash table of vectors
    
        Returns:
          hash_table: dictionary - keys are hashes, values are lists of vectors (hash buckets)
        """
        if self._hash_table is None:
            number_of_planes = self.planes.shape[1]
            number_of_buckets = 2**number_of_planes
    
            self._hash_table = {index: [] for index in range(number_of_buckets)}
    
            for index, hash_ in enumerate(self.hashes):
                self._hash_table[hash_].append(self.vectors[index])
        return self._hash_table
    
  • Index Table
    @property
    def index_table(self) -> dict:
        """Tabel of document hash to index"""
        if self._index_table is None:
            number_of_planes = self.planes.shape[1]
            number_of_buckets = 2**number_of_planes
    
            self._index_table = {index: [] for index in range(number_of_buckets)}
    
            for index, hash_ in enumerate(self.hashes):            
                self._index_table[hash_].append(index)
        return self._index_table
    
  • Build The Tables

    The code that uses the tables doesn't actually pull them at the same time, so I'm going to keep them separate.

    def build_tables(self) -> None:
        """Builds the hash and index table properties"""
        number_of_planes = self.planes.shape[1]
        number_of_buckets = 2**number_of_planes
    
        self._hash_table = {index: [] for index in range(number_of_buckets)}
        self._index_table = {index: [] for index in range(number_of_buckets)}
    
        for index, hash_ in enumerate(self.hashes):
            self._hash_table[hash_].append(self.vectors[index])
            self._index_table[hash_].append(index)
        return
    

Hash Tables

@attr.s(auto_attribs=True)
class HashTables:
    """Builds the universes of hash tables

    Args:
     universes: how many universes
     planes: planes to hash vectors into
     vectors: vectors to hash
    """
    universes: int
    planes: list
    vectors: numpy.ndarray
    _hash_tables: list=None
    _id_tables: list=None
  • Hash Tables
    @property
    def hash_tables(self) -> list:
        """Builds the list of hash tables"""
        if self._hash_tables is None: 
            self._hash_tables = [
                HashTable(vectors=self.vectors,
                          planes=self.planes[universe]).hash_table
                for universe in range(self.universes)
            ]
        return self._hash_tables
    
  • ID Tables
    @property
    def id_tables(self) -> list:
        """Builds the list of id tables"""
        if self._id_tables is None: 
            self._id_tables = [
                HashTable(vectors=self.vectors,
                          planes=self.planes[universe]).index_table
                for universe in range(self.universes)
            ]
        return self._id_tables
    

Testing the Classes

PlanesUniverse

from neurotic.nlp.word_embeddings.hashing import PlanesUniverse
universes = PlanesUniverse(vector_count=len(tweets),
                        dimensions=N_DIMS,
                        universes=N_UNIVERSES,
                        vectors_per_bucket=16)

assert universes.plane_count==10

Documents Embeddings Builder

from neurotic.nlp.word_embeddings.hashing import DocumentsEmbeddings

table = DocumentsEmbeddings(embeddings=embeddings.english_subset,
                            process=process_tweet, documents=tweets)

custom_tweet = "RT @Twitter @chapagain Hello There! Have a great day. :) #good #morning http://chapagain.com.np"

tweet_embedding = table.document_embedding(custom_tweet)

actual = tweet_embedding[-5:]
expected = [-0.00268555, -0.15378189, -0.55761719, -0.07216644, -0.32263184]
assert numpy.allclose(actual, expected)
print(actual)

dict_length = len(table.document_index_to_embedding)
expected = len(tweets)
assert dict_length == expected
print(f"length of dictionary {dict_length:,}")
rows, columns = table.documents_embeddings.shape
print(f"shape of document_vecs ({rows:,}, {columns})")
assert rows == expected
assert columns == 300

my_tweet = 'i am sad'
tweet_embedding = table.document_embedding(my_tweet)

idx = numpy.argmax(cosine_similarity(table.documents_embeddings, tweet_embedding))
print(tweets[idx])
[-0.00268555 -0.15378189 -0.55761719 -0.07216644 -0.32263184]
length of dictionary 10,000
shape of document_vecs (10,000, 300)
@zoeeylim sad sad sad kid :( it's ok I help you watch the match HAHAHAHAHA

Hash Table Builder

from neurotic.nlp.word_embeddings.hashing import HashTable

numpy.random.seed(0)
idx = 0
planes = universes.planes[idx]  # get one 'universe' of planes to test the function
vec = numpy.random.rand(1, 300)
expected = 768

hasher = HashTable(planes=planes, vectors=None)
actual = hasher.hash_value(vec)

assert expected == actual, f"expected: {expected}, Actual: {actual}"
print(f" The hash value for this vector,",
      f"and the set of planes at index {idx},",
      f"is {actual}")
The hash value for this vector, and the set of planes at index 0, is 768
numpy.random.seed(0)
planes = universes.planes[0]  # get one 'universe' of planes to test the function
vec = numpy.random.rand(1, 300)

hasher = HashTable(planes=planes, vectors=document_vecs)

tmp_hash_table = hasher.hash_table
tmp_id_table = hasher.index_table

index = 2
print(f"The hash table at key {index} has {len(tmp_hash_table[index])} document vectors")
print(f"The id table at key {index} has {len(tmp_id_table[index])}")
print(f"The first 5 document indices stored at key {index} of are {tmp_id_table[index][0:5]}")
The hash table at key 2 has 21 document vectors
The id table at key 2 has 21
The first 5 document indices stored at key 2 of are [356, 529, 976, 1754, 1779]

Hash Tables

from neurotic.nlp.word_embeddings.hashing import HashTables
tables = HashTables(universes=25, planes=universes.planes, vectors=table.documents_embeddings)
with TIMER:    
    hash_tables_2 = tables.hash_tables
    id_tables_2 = tables.id_tables
2020-10-29 19:06:32,191 graeae.timers.timer start: Started: 2020-10-29 19:06:32.191271
2020-10-29 19:06:56,635 graeae.timers.timer end: Ended: 2020-10-29 19:06:56.635738
2020-10-29 19:06:56,637 graeae.timers.timer end: Elapsed: 0:00:24.444467
assert len(hash_tables_2) == universes.universes
assert len(id_tables_2) == universes.universes

id_tables_ = zip(id_tables, id_tables_2)
for table, table_2 in id_tables_:
    assert len(table_2) == 2**universes.plane_count
    for bucket, ids in table.items():
        assert ids == table_2[bucket], "[{bucket}]: {ids}, {table_2[bucket]}"

Testing Objects

Variable Class
embeddings EmbeddingsLoader
process_tweet TwitterProcessor
table DocumentsEmbeddings
hasher HashTable
tables HashTables

The Data

This is a summary of the data that was loaded since this was such a long post and I can't remember what's what without looking around.

Variable Type Description
tweets list of strings All the tweets (10,000)
document_vecs numpy.ndarray Document embeddings for the tweets (10,000, 300)
ind2Tweet dict Map index of tweet (in tweets or document_vecs) to document embedding
planes_l List of arrays List of random planes for hashing (each is 300 x 10, 25 total)
hash_tables List List of bucket-index to document embeddings maps (One for each plane, each with 1,024 buckets (2^number of planes))
id_tables List List of bucket index: document index maps (one for each plane, each with 1,024)

End

The next step is to use this to implement approximate k-Nearest Neighbors to compelete our application.

Implementing k-Nearest Neighbors for Machine Translation

Beginning

This continues from the post where we found the transformation matrix. It's part of a series of posts whose links are gathered in the Machine Translation post.

Imports

# pypi
import numpy

# my stuff
from graeae import Timer
from neurotic.nlp.word_embeddings.english_french import TrainingData
from neurotic.nlp.word_embeddings.training import TheTrainer
TIMER = Timer()

Middle

Testing the translation

k-Nearest neighbors algorithm

  • The k-Nearest neighbors algorithm is a method which takes a vector as input and finds the other vectors in the dataset that are closest to it.
  • The 'k' is the number of "nearest neighbors" to find (e.g. k=2 finds the closest two neighbors).

Searching for the translation embedding

Since we're approximating the translation function from English to French embeddings with a linear transformation matrix \(\mathbf{R}\), most of the time we won't get the exact embedding of a French word when we transform embedding \(\mathbf{e}\) of some particular English word into the French embedding space.

This is where k-NN becomes really useful! By using 1-NN with \(\mathbf{eR}\) as input, we can search for an embedding \(\mathbf{f}\) (as a row) in the matrix \(\mathbf{Y}\) which is the closest to the transformed vector \(\mathbf{eR}\).

Cosine similarity

Cosine similarity between vectors u and v calculated as the cosine of the angle between them.

The formula is:

\[ \cos(u,v)=\frac{u\cdot v}{\left\|u\right\|\left\|v\right\|} \]

  • \(\cos(u,v) = 1\) when u and v lie on the same line and have the same direction.
  • \(\cos(u,v) = -1\) when they have exactly opposite directions.
  • \(\cos(u,v) = 0\) when the vectors are orthogonal (perpendicular) to each other.

Note: Distance and similarity are pretty much opposite things.

  • We can obtain distance metric from cosine similarity, but the cosine similarity can't be used directly as the distance metric.
  • When the cosine similarity increases (towards 1), the "distance" between the two vectors decreases (towards 0).
  • We can define the cosine distance between u and v as

\[ d_{\text{cos}}(u,v)=1-\cos(u,v) \]

The Cosine Similarity

def cosine_similarity(vector_1: numpy.ndarray, vector_2: numpy.ndarray) -> float:
    """Calculates the similarity between two vectors

    Args:
     vector_1: array to compare
     vector_2: array to compare to vector_1

    Returns:
     cosine similarity between the two vectors
    """
    return numpy.dot(vector_1, vector_2)/(numpy.linalg.norm(vector_1) *
                                          numpy.linalg.norm(vector_2))

the nearest_neighbor() function

Inputs:

  • Vector v
  • A set of possible nearest neighbors candidates
  • k nearest neighbors to find.
  • The distance metric should be based on cosine similarity.
  • cosine_similarity function is already implemented and imported for you. It's arguments are two vectors and it returns the cosine of the angle between them.
  • Iterate over rows in candidates, and save the result of similarities between current row and vector v in a python list. Take care that similarities are in the same order as row vectors of candidates.
  • Now you can use numpy argsort to sort the indices for the rows of candidates.
  • Hints
    • numpy.argsort sorts values from most negative to most positive (smallest to largest)
    • The candidates that are nearest to v should have the highest cosine similarity
    • To get the last element of a list tmp, the notation is tmp[-1:]
    # UNQ_C8 (UNIQUE CELL IDENTIFIER, DO NOT EDIT)
    def nearest_neighbor(v, candidates, k=1):
        """
        Input:
          - v, the vector you are going find the nearest neighbor for
          - candidates: a set of vectors where we will find the neighbors
          - k: top k nearest neighbors to find
        Output:
          - k_idx: the indices of the top k closest vectors in sorted form
        """
        # cosine_similarities = [cosine_similarity(v, row) for row in candidates]
    
        # for each candidate vector...
        #for row in candidates:
        #    # get the cosine similarity
        #    cos_similarity = cosine_similarity(v, row)
        #
        #    # append the similarity to the list
        #    similarity_l.append(cos_similarity)
    
        # sort the similarity list and get the indices of the sorted list
        # sorted_ids = numpy.argsort(similarity_l)
    
        # get the indices of the k most similar candidate vectors
        # k_idx = sorted_ids[-k:]
        ### END CODE HERE ###
        return numpy.argsort([cosine_similarity(v, row) for row in candidates])[-k:]
    

Test your implementation:

v = numpy.array([1, 0, 1], dtype="float64")
candidates = numpy.array([[1, 0, 5], [-2, 5, 3], [2, 0, 1], [6, -9, 5], [9, 9, 9]])
expected = numpy.array([
    [9, 9, 9],
    [1, 0, 5],
    [2, 0, 1]])
actual = candidates[nearest_neighbor(v, candidates, 3)]
print(actual)
assert (actual == expected).all()
[[9 9 9]
 [1 0 5]
 [2 0 1]]

Test your translation and compute its accuracy

Complete the function test_vocabulary which takes in English embedding matrix X, French embedding matrix Y and the R matrix and returns the accuracy of translations from X to Y by R.

  • Iterate over transformed English word embeddings and check if the closest French word vector belongs to French word that is the actual translation.
  • Obtain an index of the closest French embedding by using nearest_neighbor (with argument k=1), and compare it to the index of the English embedding you have just transformed.
  • Keep track of the number of times you get the correct translation.
  • Calculate accuracy as

    \[ \text{accuracy}=\frac{\#(\text{correct predictions})}{\#(\text{total predictions})} \]

# UNQ_C10 (UNIQUE CELL IDENTIFIER, DO NOT EDIT)
def test_vocabulary(X, Y, R):
    '''
    Input:
       X: a matrix where the columns are the English embeddings.
       Y: a matrix where the columns correspond to the French embeddings.
       R: the transform matrix which translates word embeddings from
       English to French word vector space.
    Output:
       accuracy: for the English to French capitals
    '''

    ### START CODE HERE (REPLACE INSTANCES OF 'None' with your code) ###
    # The prediction is X times R
    pred = numpy.dot(X, R)

    # initialize the number correct to zero
    #num_correct = 0
    #predictions = (nearest_neighbor(row, Y) == index for index, row in enumerate(pred))
    # accuracy = sum(predictions)/len(red)
    # loop through each row in pred (each transformed embedding)
    #for index, row_vector in enumerate(pred):
    #    # get the index of the nearest neighbor of pred at row 'i'; also pass in the candidates in Y
    #    pred_idx = nearest_neighbor(row_vector, Y)
    #
    #    # if the index of the nearest neighbor equals the row of i... \
    #    if pred_idx == index:
    #        # increment the number correct by 1.
    #        num_correct += 1
    #
    ## accuracy is the number correct divided by the number of rows in 'pred' (also number of rows in X)
    #accuracy = num_correct/len(pred)
    #
    #### END CODE HERE ###

    return sum([nearest_neighbor(row, Y) == index for index, row in enumerate(pred)])/len(pred)

Let's see how is your translation mechanism working on the unseen data:

# X_val, Y_val = get_matrices(en_fr_test, fr_embeddings_subset, en_embeddings_subset)
data = TrainingData()
trainer = TheTrainer(data.x_train, data.y_train)
r = trainer.fit()

You do not have to input any code in this cell, but it is relevant to grading, so please do not change anything.

with TIMER:
    acc = test_vocabulary(data.x_train, data.y_train, trainer.transformation)  # this might take a minute or two
2020-10-24 19:57:36,633 graeae.timers.timer start: Started: 2020-10-24 19:57:36.632998
2020-10-24 20:05:48,225 graeae.timers.timer end: Ended: 2020-10-24 20:05:48.225526
2020-10-24 20:05:48,226 graeae.timers.timer end: Elapsed: 0:08:11.592528
print(f"accuracy on test set is {acc[0]:.3f}")
accuracy on test set is 0.552

Expected Output:

#+RESULTS 0.557

You managed to translate words from one language to another language without ever seing them with almost 56% accuracy by using some basic linear algebra and learning a mapping of words from one language to another!

Bundling It Up

<<imports>>

<<nearest-neighbor>>

    <<nearest-cosine-similarity>>

    <<nearest-nearest-neighbor>>

    <<nearest-call>>

Imports

# pypi
import attr
import numpy

Nearest Neighbor

@attr.s(auto_attribs=True)
class NearestNeighbors:
    """Finds the nearest neighbor(s) to a vector

    Args:
     candidates: set of vectors that are potential neighbors
     k: number of neighbors to find
    """
    candidates: numpy.ndarray    
    k: int=1

Cosine Similarity Method

def cosine_similarity(self, vector_1: numpy.ndarray, vector_2: numpy.ndarray) -> float:
    """Calculates the similarity between two vectors

    Args:
     vector_1: array to compare
     vector_2: array to compare to vector_1

    Returns:
     cosine similarity between the two vectors
    """
    return numpy.dot(vector_1, vector_2)/(numpy.linalg.norm(vector_1) *
                                          numpy.linalg.norm(vector_2))

Nearest Neighbor Method

def nearest_neighbors(self, vector: numpy.ndarray) -> numpy.ndarray:
    """Find the nearest neghbor(s) to a vector

    Args:
      - vector, the vector you are going find the nearest neighbor for

    Returns:
      - k_idx: the indices of the top k closest vectors in sorted form
    """
    return numpy.argsort([self.cosine_similarity(vector, row)
                          for row in self.candidates])[-self.k:]

Nearest Neighbor Call

def __call__(self, vector: numpy.ndarray) -> numpy.ndarray:
    """Alias for the `nearest_neighbors` method

    Args:
      - vector, the vector you are going find the nearest neighbor for

    Returns:
      - k_idx: the indices of the top k closest vectors in sorted form
    """
    return self.nearest_neighbors(vector)

Testing It

from neurotic.nlp.word_embeddings.nearest_neighbors import NearestNeighbors


v = numpy.array([1, 0, 1], dtype="float64")
candidates = numpy.array([[1, 0, 5], [-2, 5, 3], [2, 0, 1], [6, -9, 5], [9, 9, 9]])

testing = NearestNeighbors(candidates=candidates, k=3)

expected = numpy.array([
    [9, 9, 9],
    [1, 0, 5],
    [2, 0, 1]])
actual = candidates[testing.nearest_neighbors(v)]
print(actual)
assert (actual == expected).all()
[[9 9 9]
 [1 0 5]
 [2 0 1]]
with TIMER:
    data = TrainingData()
    trainer = TheTrainer(data.x_train, data.y_train)
    r = trainer.fit()
    predictions = numpy.dot(data.x_train, trainer.transformation)
with TIMER:
    tester = NearestNeighbors(k=1, candidates=data.y_train)
    accuracy = sum([tester(row) == index
                    for index, row in enumerate(predictions)])/len(predictions)
2020-10-31 19:35:14,133 graeae.timers.timer start: Started: 2020-10-31 19:35:14.133884
2020-10-31 19:43:29,695 graeae.timers.timer end: Ended: 2020-10-31 19:43:29.695745
2020-10-31 19:43:29,697 graeae.timers.timer end: Elapsed: 0:08:15.561861
print(f"Accuracy: {100 * accuracy[0]: 0.2f} %")
Accuracy:  55.23 %

End

Training the Machine Translation Transformation Matrix

Beginning

In a prior post we built the translation training set. In this post we'll find the Transformation Matrix that maps our English Embeddings to the French ones.

Imports

# python
from functools import partial
# pypi
from dotenv import load_dotenv

import hvplot.pandas
import numpy
import pandas

# My Stuff
from graeae import EmbedHoloviews
from neurotic.nlp.word_embeddings.english_french import TrainingData

Set Up

load_dotenv("posts/nlp/.env")
slug = "machine-translation-transformation-matrix"
Embed = partial(EmbedHoloviews,
                folder_path=f"files/posts/nlp/{slug}")

Middle

Translation As Linear Transformation of Embeddings

The problem we're working on is creating a translator that converts an English word to a French one. To do this we're using Word Embeddings which allows us to re-state the problem form being about translation to one of finding the transformation matrix that will give us a new embedding that close enough to the French translation that we can use some kind of algorithm to find the French embedding that is closest to it.

  • Given dictionaries of English and French word embeddings we'll create a transformation matrix R.
  • Given an English word embedding, \(\mathbf{e}\), we can multiply \(\mathbf{eR}\) to get a new word embedding \(\mathbf{f}\).
  • Both \(\mathbf{e}\) and \(\mathbf{f}\) are row vectors.
  • We can then compute the nearest neighbors to \(\mathbf{f}\) in the French embeddings and recommend the word that is most similar to the transformed word embedding.

Note: e was called X_train in the original assigment and corresponds to the TrainData.x_train property that we built in the previous post.

Rethinking Translation as the Minimization Problem

Find a matrix R that minimizes the following equation.

\[ \arg \min _{\mathbf{R}}\| \mathbf{X R} - \mathbf{Y}\|_{F} \]

So we're trying to find the transformation matrix that minimizes the distance between an English embedding and its corresponding French embedding. The subscript for the norm (F) means that we're using the Frobenius norm.

Frobenius norm

The Frobenius Norm of a matrix A (assuming it is of dimension m, n) is defined as the square root of the sum of the absolute squares of its elements:

\[ \|\mathbf{A}\|_{F} \equiv \sqrt{\sum_{i=1}^{m} \sum_{j=1}^{n}\left|a_{i j}\right|^{2}} \]

Actual loss function

In the real world applications, the Frobenius norm loss:

\[ \| \mathbf{XR} - \mathbf{Y}\|_{F} \]

is often replaced by it's squared value divided by m:

\[ \frac{1}{m} \| \mathbf{X R} - \mathbf{Y} \|_{F}^{2} \]

where m is the number of examples (rows in \(\mathbf{X}\)).

  • The same R is found when using this loss function versus the original Frobenius norm.
  • The reason for taking the square is that it's easier to compute the gradient of the squared Frobenius.
  • The reason for dividing by m is that we're more interested in the average loss per embedding than the loss for the entire training set.
  • The loss for all training sets increases with more words (training examples), so taking the average helps us to track the average loss regardless of the size of the training set.

Implementing the Translation Mechanism Described

  • Step 1: Computing the loss
    • The loss function will be the squared Frobenoius norm of the difference between the matrix and its approximation, divided by the number of training examples m.
    • Its formula is:

    \[ L(X, Y, R)=\frac{1}{m}\sum_{i=1}^{m} \sum_{j=1}^{n}\left( a_{i j} \right)^{2} \]

    where \(a_{i j}\) is value in the \(i^{th}\) row and /j/th column of the matrix \(\mathbf{XR}-\mathbf{Y}\).

    Instructions: complete the compute_loss() function.

    • Compute the approximation of Y by matrix multiplying X and R
    • Compute the difference XR - Y
    • Compute the squared Frobenius norm of the difference and divide it by m.

    Use matrix operations instead of the numpy.norm function.

    def compute_loss(X, Y, R):
        '''
        Inputs: 
           X: a matrix of dimension (m,n) where the columns are the English embeddings.
           Y: a matrix of dimension (m,n) where the columns correspong to the French embeddings.
           R: a matrix of dimension (n,n) - transformation matrix from English to French vector space embeddings.
        Outputs:
           L: a matrix of dimension (m,n) - the value of the loss function for given X, Y and R.
        '''
        ### START CODE HERE (REPLACE INSTANCES OF 'None' with your code) ###
        # m is the number of rows in X
        m = len(X)
    
        # diff is XR - Y
        diff = numpy.dot(X, R) - Y
    
        # diff_squared is the element-wise square of the difference
        diff_squared = diff**2
    
        # sum_diff_squared is the sum of the squared elements
        sum_diff_squared = diff_squared.sum()
    
        # loss is the sum_diff_squared divided by the number of examples (m)
        loss = sum_diff_squared/m
        ### END CODE HERE ###
        return loss
    

Step 2: Computing the gradient of loss in respect to transform matrix R

  • Calculate the gradient of the loss with respect to transform matrix R.
  • The gradient is a matrix that encodes how much a small change in R affects the change in the loss function.
  • The gradient gives us the direction in which we should decrease R to minimize the loss.
  • \(m\) is the number of training examples (number of rows in X).
  • The formula for the gradient of the loss function 𝐿(𝑋,𝑌,𝑅) is:

\[ \frac{d}{dR}𝐿(𝑋,𝑌,𝑅)=\frac{d}{dR}\Big(\frac{1}{m}\| X R -Y\|_{F}^{2}\Big) = \frac{2}{m}X^{T} (X R - Y) \]

  • Instructions: Complete the `compute_gradient` function below.
    • Hints
      def compute_gradient(X, Y, R):
          '''
          Inputs: 
             X: a matrix of dimension (m,n) where the columns are the English embeddings.
             Y: a matrix of dimension (m,n) where the columns correspond to the French embeddings.
             R: a matrix of dimension (n,n) - transformation matrix from English to French vector space embeddings.
          Outputs:
             g: a matrix of dimension (n,n) - gradient of the loss function L for given X, Y and R.
          '''
          ### START CODE HERE (REPLACE INSTANCES OF 'None' with your code) ###
          # m is the number of rows in X
          rows, columns = X.shape
      
          # gradient is X^T(XR - Y) * 2/m
          gradient = (numpy.dot(X.T, numpy.dot(X, R) - Y) * 2)/rows
          assert gradient.shape == (columns, columns)
          ### END CODE HERE ###
          return gradient
      

Step 3: Finding the optimal R with gradient descent algorithm

  • Gradient descent

    Gradient descent is an iterative algorithm which is used in searching for the optimum of the function.

    • Earlier, we mentioned that the gradient of the loss with respect to the matrix encodes how much a tiny change in some coordinate of that matrix affect the change of loss function.
    • Gradient descent uses that information to iteratively change matrix R until we reach a point where the loss is minimized.
    • Training with a fixed number of iterations

      Most of the time we iterate for a fixed number of training steps rather than iterating until the loss falls below a threshold.

      Pseudocode:

      1. Calculate gradient g of the loss with respect to the matrix R.
      2. Update R with the formula:

      \[ R_{\text{new}}= R_{\text{old}}-\alpha g \]

      Where \(\alpha\) is the learning rate, which is a scalar.

    • Learning rate
      • The learning rate or "step size" \(\alpha\) is a coefficient which decides how much we want to change R in each step.
      • If we change R too much, we could skip the optimum by taking too large of a step.
      • If we make only small changes to R, we will need many steps to reach the optimum.
      • Learning rate \(\alpha\) is used to control those changes.
      • Values of \(\alpha\) are chosen depending on the problem, and we'll use learning_rate =0.0003 as the default value for our algorithm.
    • Exercise 04

      Instructions: Implement align_embeddings()

       def align_embeddings(X: numpy.ndarray, Y: numpy.ndarray,
                            train_steps: int=100,
                            learning_rate: float=0.0003,
                            seed: int=129) -> numpy.ndarray:
           '''
           Inputs:
              X: a matrix of dimension (m,n) where the columns are the English embeddings.
              Y: a matrix of dimension (m,n) where the columns correspong to the French embeddings.
              train_steps: positive int - describes how many steps will gradient descent algorithm do.
              learning_rate: positive float - describes how big steps will  gradient descent algorithm do.
           Outputs:
              R: a matrix of dimension (n,n) - the projection matrix that minimizes the F norm ||X R -Y||^2
           '''
           # the number of columns in X is the number of dimensions for a word vector (e.g. 300)
           # R is a square matrix with length equal to the number of dimensions in th  word embedding
           R = numpy.random.rand(X.shape[1], X.shape[1])
      
           for i in range(train_steps):
               if i % 25 == 0:
                   print(f"loss at iteration {i} is: {compute_loss(X, Y, R):.4f}")
               ### START CODE HERE (REPLACE INSTANCES OF 'None' with your code) ###
               # use the function that you defined to compute the gradient
               gradient = compute_gradient(X, Y, R)
      
               # update R by subtracting the learning rate times gradient
               R -= learning_rate * gradient
               ### END CODE HERE ###
           return R
      
  • Testing Your Implementation.
     numpy.random.seed(129)
     m = 10
     n = 5
     X = numpy.random.rand(m, n)
     Y = numpy.random.rand(m, n) * .1
     R = align_embeddings(X, Y)
    
    loss at iteration 0 is: 3.4697
    loss at iteration 25 is: 3.3795
    loss at iteration 50 is: 3.2918
    loss at iteration 75 is: 3.2064
    

    Expected Output:

    loss at iteration 0 is: 3.7242 loss at iteration 25 is: 3.6283 loss at iteration 50 is: 3.5350 loss at iteration 75 is: 3.4442

  • Calculate transformation matrix

    Using those the training set, find the transformation matrix \(\mathbf{R}\) by calling the function align_embeddings().

    NOTE: The code cell below will take a few minutes to fully execute (~3 mins)

     data = TrainingData()
     R_train = align_embeddings(data.x_train, data.y_train, train_steps=400, learning_rate=0.8)
    
    loss at iteration 0 is: 968.1416
    loss at iteration 25 is: 97.6094
    loss at iteration 50 is: 26.7949
    loss at iteration 75 is: 9.7902
    loss at iteration 100 is: 4.3831
    loss at iteration 125 is: 2.3324
    loss at iteration 150 is: 1.4509
    loss at iteration 175 is: 1.0356
    loss at iteration 200 is: 0.8263
    loss at iteration 225 is: 0.7152
    loss at iteration 250 is: 0.6539
    loss at iteration 275 is: 0.6188
    loss at iteration 300 is: 0.5983
    loss at iteration 325 is: 0.5859
    loss at iteration 350 is: 0.5783
    loss at iteration 375 is: 0.5736
    

    Expected Output

    #+RESULTS loss at iteration 0 is: 963.0146 loss at iteration 25 is: 97.8292 loss at iteration 50 is: 26.8329 loss at iteration 75 is: 9.7893 loss at iteration 100 is: 4.3776 loss at iteration 125 is: 2.3281 loss at iteration 150 is: 1.4480 loss at iteration 175 is: 1.0338 loss at iteration 200 is: 0.8251 loss at iteration 225 is: 0.7145 loss at iteration 250 is: 0.6534 loss at iteration 275 is: 0.6185 loss at iteration 300 is: 0.5981 loss at iteration 325 is: 0.5858 loss at iteration 350 is: 0.5782 loss at iteration 375 is: 0.5735

Saving It For Later

<<trainer-imports>>


<<the-trainer>>

    <<trainer-timer>>

    <<trainer-loss>>

    <<trainer-gradient>>

    <<trainer-align-embeddings>>

Imports

# pypi
import attr
import numpy

# my stuff
from graeae import Timer

The Trainer Class

We could keep it as just functions like it is here, but, nah.

@attr.s(auto_attribs=True)
class TheTrainer:
    """Trains the word-embeddings data

    Args:
     x_train: the training input
     y_train: the training labels
     training_steps: number of times to run the training loop
     learning_rate: multiplier for the gradient (alpha)
     seed: random-seed for numpy
     loss_every: if verbose, how often to show loss during fitting
     verbose: whether to emit messages
    """
    x_train: numpy.ndarray
    y_train: numpy.ndarray
    _timer: Timer=None
    training_steps: int=400
    learning_rate: float=0.8
    seed: int=129
    loss_every: int=25
    verbose: bool=True

A Timer

Just something to keep track of how long things take.

@property
def timer(self) -> Timer:
    """A timer"""
    if self._timer is None:
        self._timer = Timer(emit=self.verbose)
    return self._timer

The Loss Method

def loss(self, transformation: numpy.ndarray) -> numpy.float:
    """
    Calculates the loss between XR and Y as the average sum of difference squared

    Args: 
       transformation: a matrix of dimension (n,n) - transformation matrix.

    Returns:
       loss: value of loss function for X, Y and R
    """
    rows, columns = self.x_train.shape

    difference = numpy.dot(self.x_train, transformation) - self.y_train
    difference_squared = difference**2
    sum_of_difference_squared = difference_squared.sum()
    return sum_of_difference_squared/rows

The Gradient

def gradient(self, transformation: numpy.ndarray) -> numpy.ndarray:
    """computes the gradient (slope) of the loss

    Args: 
       transformation: transformation matrix of dimension (n,n)

    Returns:
       gradient: a matrix of dimension (n,n)
    """
    rows, columns = self.x_train.shape

    gradient = (
        numpy.dot(self.x_train.T,
                  numpy.dot(self.x_train, transformation) - self.y_train) * 2
    )/rows
    assert gradient.shape == (columns, columns)
    return gradient

The Embeddings Aligner

def fit(self) -> numpy.ndarray:
    """Fits the transformation matrix to the data

    Side Effect:
     sets self.transformation  and self.losses

    Returns:
     the projection matrix that minimizes the F norm ||X R -Y||^2
    """
    numpy.random.seed(self.seed)
    assert self.x_train.shape == self.y_train.shape
    rows, columns = self.x_train.shape
    self.transformation = numpy.random.rand(columns, columns)
    self.losses = []
    if self.verbose:
        print("Step\tLoss")
    with self.timer:
        for step in range(self.training_steps):
            loss = self.loss(self.transformation)
            if self.verbose and step % 25 == 0:
                print(f"{step}\t{loss:0.4f}")
            self.transformation -= self.learning_rate * self.gradient(
                self.transformation)
            self.losses.append(loss)
    assert self.transformation.shape == (columns, columns)
    return self.transformation

Check the Tester

Sanity Check

from neurotic.nlp.word_embeddings.training import TheTrainer
numpy.random.seed(129)
m = 10
n = 5
X = numpy.random.rand(m, n)
Y = numpy.random.rand(m, n) * .1
trainer = TheTrainer(X, Y, training_steps=100, learning_rate=0.003)
R = trainer.fit()
2020-10-23 18:27:50,195 graeae.timers.timer start: Started: 2020-10-23 18:27:50.195052
2020-10-23 18:27:50,201 graeae.timers.timer end: Ended: 2020-10-23 18:27:50.201767
2020-10-23 18:27:50,203 graeae.timers.timer end: Elapsed: 0:00:00.006715
The loss at step 0 is: 3.7242
The loss at step 25 is: 2.8709
The loss at step 50 is: 2.2201
The loss at step 75 is: 1.7235

The Real Data

trainer = TheTrainer(data.x_train, data.y_train)
r = trainer.fit()
2020-10-23 18:30:45,558 graeae.timers.timer start: Started: 2020-10-23 18:30:45.558693
Step    Loss
0       963.0146
25      97.8292
50      26.8329
75      9.7893
100     4.3776
125     2.3281
150     1.4480
175     1.0338
200     0.8251
225     0.7145
250     0.6534
275     0.6185
300     0.5981
325     0.5858
350     0.5782
375     0.5735
2020-10-23 18:31:16,471 graeae.timers.timer end: Ended: 2020-10-23 18:31:16.471708
2020-10-23 18:31:16,473 graeae.timers.timer end: Elapsed: 0:00:30.913015

Plotting the Losses

losses = pandas.DataFrame(dict(Step=list(range(len(trainer.losses))),
                               Loss=trainer.losses))
plot = losses.hvplot(x="Step", y="Loss").opts(
    title="Training Losses",
    width=990,
    height=780,
    fontscale=2,
    color="#4687b7",
)

outcome = Embed(plot=plot, file_name="train_loss")()
print(outcome)

Figure Missing

Although the losses continue to go down, it looks like most of the gains come in the first 100 rounds of training.

End