POS Tagging: Checking the Accuracy of Model

Predicting on a data set

Compute the accuracy of your prediction by comparing it with the true `y` labels.

  • `pred` is a list of predicted POS tags corresponding to the words of the `test_corpus`.

Imports

# python
import math

# pypi
from dotenv import load_dotenv

# this stuff
from neurotic.nlp.parts_of_speech import DataLoader, HiddenMarkov, Matrices, TheTrainer

# some other stuff
from graeae import Timer

Set Up

The Timer

TIMER = Timer()

The Matrices

with TIMER:
    load_dotenv("posts/nlp/.env")
    loader = DataLoader()
    trainer = TheTrainer(loader.processed_training)
    matrices = Matrices(transition_counts=trainer.transition_counts,
                        emission_counts=trainer.emission_counts,
                        words=loader.vocabulary_words,
                        tag_counts=trainer.tag_counts,
                        alpha=0.001)
    model = HiddenMarkov(loader, trainer, matrices)
    model()
2020-11-30 20:41:51,497 graeae.timers.timer start: Started: 2020-11-30 20:41:51.497226
2020-11-30 20:43:32,311 graeae.timers.timer end: Ended: 2020-11-30 20:43:32.311608
2020-11-30 20:43:32,312 graeae.timers.timer end: Elapsed: 0:01:40.814382

These classes were defined in other posts:

Middle

Aliases

The original notebooks use a naming scheme that I don't really like so have to be aliased to make my stuff work with theirs.

prep = loader.test_words
pred = model.predictions

raw = loader.test_data_tuples
missing = [index for index, pair in enumerate(raw) if not pair]
for index in missing:
    raw[index] = tuple(("", "--n--"))

y = [label for word, label in raw]

print('The third word is:', prep[3])
print('Your prediction is:', pred[3])
print('Your corresponding label y is: ', y[3])

assert len(y) == len(prep)
assert len(y) == len(pred)
The third word is: temperature
Your prediction is: NN
Your corresponding label y is:  NN

Redo Y

Now that I look at their code, they expect the "y" list to be the un-split strings. Ugh.

y = loader.test_data_raw
print('Your corresponding label y is: ', y[3])
Your corresponding label y is:  temperature     NN

Compute Accuracy

def compute_accuracy(predlist, y: list) -> float:
    """Calculate the accuracy of our model's predictions

    Args: 
      pred: a list of the predicted parts-of-speech 
      y: a list of lines where each word is separated by a '\t' (i.e. word \t tag)
    Returns: 
      accuracy of the predictions
    """
    num_correct = 0
    total = 0

    # Zip together the prediction and the labels
    for prediction, y in zip(pred, y):
        # Split the label into the word and the POS tag
        word_tag_tuple = y.split()

        # Check that there is actually a word and a tag
        # no more and no less than 2 items
        if len(word_tag_tuple) != 2: # complete this line
            continue 

        # store the word and tag separately
        word, tag = word_tag_tuple

        # Check if the POS tag label matches the prediction
        if tag == prediction: # complete this line

            # count the number of times that the prediction
            # and label match
            num_correct += 1

        # keep track of the total number of examples (that have valid labels)
        total += 1

    return num_correct/total
accuracy = compute_accuracy(pred, y)
print(f"Accuracy of the Viterbi algorithm is {accuracy:.4f}")
assert math.isclose(accuracy, 0.95, abs_tol=1e-2)
Accuracy of the Viterbi algorithm is 0.9545

Note: The original notebook accuracy was 0.9531. I don't really know what caused the difference - I suspect their preprocessing - but since this is better I'll keep it.

End

So, there you go, parts-of-speech tagging.

Parts-of-Speech: Viterbi Algorithm

Beginning

Imports

# python
from unittest.mock import call, MagicMock

import math

# pypi
from dotenv import load_dotenv
from expects import (equal, expect)

import numpy
np = numpy

# this stuff
from neurotic.nlp.parts_of_speech import DataLoader, Matrices, TheTrainer

# my other stuff
from graeae import Timer

Set Up

The Timer

TIMER = Timer()

The Matrices

load_dotenv("posts/nlp/.env")
loader = DataLoader()
trainer = TheTrainer(loader.processed_training)
matrices = Matrices(transition_counts=trainer.transition_counts,
                    emission_counts=trainer.emission_counts,
                    words=loader.vocabulary_words,
                    tag_counts=trainer.tag_counts,
                    alpha=0.001)

These classes were defined in other posts:

Middle

Initialization

Write a program below that initializes the best_probs and the best_paths matrix.

Both matrices will be initialized to zero except for column zero of best_probs.

  • Column zero of best_probs is initialized with the assumption that the first word of the corpus was preceded by a start token ("ā€“sā€“").
  • This allows you to reference the A matrix for the transition probability

Here is how to initialize column 0 of best_probs:

  • The probability of the best path going from the start index to a given POS tag indexed by integer i is denoted by \(\textrm{best_probs}[s_{idx}, i]\).
  • This is estimated as the probability that the start tag transitions to the POS denoted by index i: \(\mathbf{A}[s_{idx}, i]\) AND that the POS tag denoted by i emits the first word of the given corpus, which is \(\mathbf{B}[i, vocab[corpus[0]]]\).
  • Note that vocab[corpus[0]] refers to the first word of the corpus (the word at position 0 of the corpus).
  • vocab is a dictionary that returns the unique integer that refers to that particular word.

Conceptually, it looks like this:

\[ \textrm{best_probs}[s_{idx}, i] = \mathbf{A}[s_{idx}, i] \times \mathbf{B}[i, corpus[0] ] \]

In order to avoid multiplying and storing small values on the computer, we'll take the log of the product, which becomes the sum of two logs:

\[ best\_probs[i,0] = log(A[s_{idx}, i]) + log(B[i, vocab[corpus[0]] \]

Also, to avoid taking the log of 0 (which is defined as negative infinity), the code itself will just set \(best\_probs[i,0] = float('-inf')\) when \(A[s_{idx}, i] == 0\).

So the implementation to initialize best_probs looks like this:

\[ if A[s_{idx}, i] <> 0 : best\_probs[i,0] = log(A[s_{idx}, i]) + log(B[i, vocab[corpus[0]]]) \]

\[ if A[s_{idx}, i] == 0 : best\_probs[i,0] = float('-inf') \]

Please use math.log to compute the natural logarithm.

Represent infinity and negative infinity like this:

float('inf')
float('-inf')
def initialize(states, tag_counts, A, B, corpus, vocab):
    """Initializes the ``best_probs`` and ``best_paths`` matrices

    Args: 
       states: a list of all possible parts-of-speech
       tag_counts: a dictionary mapping each tag to its respective count
       A: Transition Matrix of dimension (num_tags, num_tags)
       B: Emission Matrix of dimension (num_tags, len(vocab))
       corpus: a sequence of words whose POS is to be identified in a list 
       vocab: a dictionary where keys are words in vocabulary and value is an index

    Returns:
       best_probs: matrix of dimension (num_tags, len(corpus)) of floats
       best_paths: matrix of dimension (num_tags, len(corpus)) of integers
    """
    # Get the total number of unique POS tags
    num_tags = len(tag_counts)

    # Initialize best_probs matrix 
    # POS tags in the rows, number of words in the corpus as the columns
    best_probs = np.zeros((num_tags, len(corpus)))

    # Initialize best_paths matrix
    # POS tags in the rows, number of words in the corpus as columns
    best_paths = np.zeros((num_tags, len(corpus)), dtype=int)

    # Define the start token
    s_idx = states.index("--s--")
    ### START CODE HERE (Replace instances of 'None' with your code) ###

    # Go through each of the POS tags
    for i in range(len(states)): # complete this line
        # Handle the special case when the transition from start token to POS tag i is zero
        if A[s_idx, i] == 0: # complete this line

            # Initialize best_probs at POS tag 'i', column 0, to negative infinity
            best_probs[i,0] = float("-inf")
            print(f"{i}: negitive infinity")

        # For all other cases when transition from start token to POS tag i is non-zero:
        else:
            # Initialize best_probs at POS tag 'i', column 0
            # Check the formula in the instructions above
            best_probs[i,0] = math.log(A[s_idx, i]) + math.log(B[i, vocab[corpus[0]]])
    ### END CODE HERE ### 
    return best_probs, best_paths
states = matrices.tags
tag_counts = trainer.tag_counts
A = matrices.transition
B = matrices.emission
prep = loader.test_words
vocab = loader.vocabulary
best_probs, best_paths = initialize(states, tag_counts, A, B, prep, vocab)

Test the function

actual = best_probs[0,0]
expected = -22.6098
print(f"best_probs[0,0]: {actual:.4f}")

assert math.isclose(actual, expected, abs_tol=1e-4), (actual, expected)

actual = best_paths[2,3]
expected = 0.0000
print(f"best_paths[2,3]: {actual:.4f}")
assert math.isclose(actual, expected)
best_probs[0,0]: -22.6099
best_paths[2,3]: 0.0000

Viterby Forward

In this part of the assignment, you will implement the viterbi_forward segment. In other words, you will populate your best_probs and best_paths matrices.

  • Walk forward through the corpus.
  • For each word, compute a probability for each possible tag.
  • Unlike the previous algorithm predict_pos (the 'warm-up' exercise), this will include the path up to that (word,tag) combination.

Here is an example with a three-word corpus "Loss tracks upward":

  • Note, in this example, only a subset of states (POS tags) are shown in the diagram below, for easier reading.
  • In the diagram below, the first word "Loss" is already initialized.
  • The algorithm will compute a probability for each of the potential tags in the second and future words.

Compute the probability that the tag of the second work ('tracks') is a verb, 3rd person singular present (VBZ).

  • In the best_probs matrix, go to the column of the second word ('tracks'), and row 40 (VBZ), this cell is highlighted in light orange in the diagram below.
  • Examine each of the paths from the tags of the first word ('Loss') and choose the most likely path.
  • An example of the calculation for one of those paths is the path from ('Loss', NN) to ('tracks', VBZ).
  • The log of the probability of the path up to and including the first word 'Loss' having POS tag NN is -14.32. The best_probs matrix contains this value -14.32 in the column for 'Loss' and row for 'NN'.
  • Find the probability that NN transitions to VBZ. To find this probability, go to the A transition matrix, and go to the row for 'NN' and the column for 'VBZ'. The value is 4.37e-02, which is circled in the diagram, so add \(-14.32 + \log(4.37e-02)\).
  • Find the log of the probability that the tag VBS would 'emit' the word 'tracks'. To find this, look at the 'B' emission matrix in row 'VBZ' and the column for the word 'tracks'. The value 4.61e-04 is circled in the diagram below. So add \(-14.32 + \log(4.37e-02) + \log(4.61e-04)\).
  • The sum of \(-14.32 + \log(4.37e-02) + \log(4.61e-04)\) is -25.13. Store -25.13 in the best_probs matrix at row 'VBZ' and column 'tracks' (as seen in the cell that is highlighted in light orange in the diagram).
  • All other paths in best_probs are calculated. Notice that -25.13 is greater than all of the other values in column 'tracks' of matrix best_probs, and so the most likely path to 'VBZ' is from 'NN'. 'NN' is in row 20 of the best_probs matrix, so 20 is the most likely path.
  • Store the most likely path 20 in the best_paths table. This is highlighted in light orange in the diagram below.

The formula to compute the probability and path for the \(i^{th}\) word in the corpus, the prior word i-1 in the corpus, current POS tag j, and previous POS tag k is:

\[ \mathrm{prob} = \mathbf{best\_prob}_{k, i-1} + \mathrm{log}(\mathbf{A}_{k, j}) + \mathrm{log}(\mathbf{B}_{j, vocab(corpus_{i})}) \]

where \(corpus_{i}\) is the word in the corpus at index i, and vocab is the dictionary that gets the unique integer that represents a given word.

\[ \mathrm{path} = k \]

where k is the integer representing the previous POS tag.

Implement the `viterbi_forward` algorithm and store the best_path and best_prob for every possible tag for each word in the matrices `best_probs` and `best_tags` using the pseudo code below.


for each word in the corpus

    for each POS tag type that this word may be

        for POS tag type that the previous word could be

            compute the probability that the previous word had a given POS tag, that the current word has a given POS tag, and that the POS tag would emit this current word.

            retain the highest probability computed for the current word

            set best_probs to this highest probability

            set best_paths to the index 'k', representing the POS tag of the previous word which produced the highest probability `

Please use math.log to compute the natural logarithm.

  • Remember that when accessing emission matrix B, the column index is the unique integer ID associated with the word. It can be accessed by using the 'vocab' dictionary, where the key is the word, and the value is the unique integer ID for that word.
def viterbi_forward(A, B, test_corpus, best_probs, best_paths, vocab):
    """The forward training pass

    Args: 
       A, B: The transition and emission matrices respectively
       test_corpus: a list containing a preprocessed corpus
       best_probs: an initilized matrix of dimension (num_tags, len(corpus))
       best_paths: an initilized matrix of dimension (num_tags, len(corpus))
       vocab: a dictionary where keys are words in vocabulary and value is an index 
    Returns: 
       best_probs: a completed matrix of dimension (num_tags, len(corpus))
       best_paths: a completed matrix of dimension (num_tags, len(corpus))
    """
    # Get the number of unique POS tags (which is the num of rows in best_probs)
    num_tags = best_probs.shape[0]

    # Go through every word in the corpus starting from word 1
    # Recall that word 0 was initialized in `initialize()`
    for i in range(1, len(test_corpus)): 

        # Print number of words processed, every 5000 words
        if i % 5000 == 0:
            print("Words processed: {:>8}".format(i))

        ### START CODE HERE (Replace instances of 'None' with your code EXCEPT the first 'best_path_i = None') ###
        # For each unique POS tag that the current word can be
        for j in range(num_tags): # complete this line

            # Initialize best_prob for word i to negative infinity
            best_prob_i = float("-inf")

            # Initialize best_path for current word i to None
            best_path_i = None

            # For each POS tag that the previous word can be:
            for k in range(num_tags): # complete this line

                # Calculate the probability = 
                # best probs of POS tag k, previous word i-1 + 
                # log(prob of transition from POS k to POS j) + 
                # log(prob that emission of POS j is word i)
                prob = best_probs[k, i-1] + math.log(A[k, j]) + math.log(B[j, vocab[test_corpus[i]]])

                # check if this path's probability is greater than
                # the best probability up to and before this point
                if prob > best_prob_i: # complete this line

                    # Keep track of the best probability
                    best_prob_i = prob

                    # keep track of the POS tag of the previous word
                    # that is part of the best path.  
                    # Save the index (integer) associated with 
                    # that previous word's POS tag
                    best_path_i = k

            # Save the best probability for the 
            # given current word's POS tag
            # and the position of the current word inside the corpus
            best_probs[j,i] = best_prob_i

            # Save the unique integer ID of the previous POS tag
            # into best_paths matrix, for the POS tag of the current word
            # and the position of the current word inside the corpus.
            best_paths[j,i] = best_path_i

        ### END CODE HERE ###
    return best_probs, best_paths

Run the viterbi_forward function to fill in the best_probs and best_paths matrices.

Note that this will take a few minutes to run. There are about 30,000 words to process.

with TIMER:
    best_probs, best_paths = viterbi_forward(A, B,
                                             prep,
                                             best_probs,
                                             best_paths,
                                             vocab)
2020-11-30 19:35:42,383 graeae.timers.timer start: Started: 2020-11-30 19:35:42.383922
Words processed:     5000
Words processed:    10000
Words processed:    15000
Words processed:    20000
Words processed:    25000
Words processed:    30000
2020-11-30 19:37:56,143 graeae.timers.timer end: Ended: 2020-11-30 19:37:56.143551
2020-11-30 19:37:56,144 graeae.timers.timer end: Elapsed: 0:02:13.759629
expected = -24.7822
actual = best_probs[0,1]
print(f"best_probs[0,1]: {actual:.4f}")
assert math.isclose(expected, actual, abs_tol=1e-4)

actual = best_probs[0,4]
expected = -49.5601
print(f"best_probs[0,4]: {actual:.4f}")
assert math.isclose(actual, expected, abs_tol=1e-4)
best_probs[0,1]: -24.7822
best_probs[0,4]: -49.5602

Viterbi Backward

Now you will implement the Viterbi backward algorithm.

  • The Viterbi backward algorithm gets the predictions of the POS tags for each word in the corpus using the best_paths and the best_probs matrices.

The example below shows how to walk backwards through the best_paths matrix to get the POS tags of each word in the corpus. Recall that this example corpus has three words: "Loss tracks upward".

POS tag for 'upward' is RB

  • Select the the most likely POS tag for the last word in the corpus, 'upward' in the best_prob table.
  • Look for the row in the column for 'upward' that has the largest probability.
  • Notice that in row 28 of best_probs, the estimated probability is -34.99, which is larger than the other values in the column. So the most likely POS tag for 'upward' is RB an adverb, at row 28 of best_prob.
  • The variable z is an array that stores the unique integer ID of the predicted POS tags for each word in the corpus. In array z, at position 2, store the value 28 to indicate that the word 'upward' (at index 2 in the corpus), most likely has the POS tag associated with unique ID 28 (which is RB).
  • The variable pred contains the POS tags in string form. So pred at index 2 stores the string RB.

POS tag for 'tracks' is VBZ

  • The next step is to go backward one word in the corpus ('tracks'). Since the most likely POS tag for 'upward' is RB, which is uniquely identified by integer ID 28, go to the best_paths matrix in column 2, row 28. The value stored in best_paths, column 2, row 28 indicates the unique ID of the POS tag of the previous word. In this case, the value stored here is 40, which is the unique ID for POS tag VBZ (verb, 3rd person singular present).
  • So the previous word at index 1 of the corpus ('tracks'), most likely has the POS tag with unique ID 40, which is VBZ.
  • In array z, store the value 40 at position 1, and for array pred, store the string VBZ to indicate that the word 'tracks' most likely has POS tag VBZ.

POS tag for 'Loss' is NN

  • In best_paths at column 1, the unique ID stored at row 40 is 20. 20 is the unique ID for POS tag NN.
  • In array z at position 0, store 20. In array pred at position 0, store NN.

Implement the viterbi_backward algorithm, which returns a list of predicted POS tags for each word in the corpus.

  • Note that the numbering of the index positions starts at 0 and not 1.
  • m is the number of words in the corpus.
    • So the indexing into the corpus goes from 0 to m - 1.
    • Also, the columns in best_probs and best_paths are indexed from 0 to m - 1

In Step 1: Loop through all the rows (POS tags) in the last entry of `best_probs` and find the row (POS tag) with the maximum value. Convert the unique integer ID to a tag (a string representation) using the list `states`.

Referring to the three-word corpus described above:

  • `z[2] = 28`: For the word 'upward' at position 2 in the corpus, the POS tag ID is 28. Store 28 in `z` at position 2.
  • `states[28]` is 'RB': The POS tag ID 28 refers to the POS tag 'RB'.
  • `pred[2] = 'RB'`: In array `pred`, store the POS tag for the word 'upward'.

In Step 2:

  • Starting at the last column of best_paths, use `best_probs` to find the most likely POS tag for the last word in the corpus.
  • Then use `best_paths` to find the most likely POS tag for the previous word.
  • Update the POS tag for each word in `z` and in `preds`.

Referring to the three-word example from above, read best_paths at column 2 and fill in z at position 1. `z[1] = best_paths[z[2],2]`

The small test following the routine prints the last few words of the corpus and their states to aid in debug.

def viterbi_backward(best_probs: numpy.ndarray,
                     best_paths: numpy.ndarray,
                     corpus: list,
                     states: dict) -> list:
    """
    This function returns the best path.
    """
    # Get the number of words in the corpus
    # which is also the number of columns in best_probs, best_paths
    m = best_paths.shape[1] 

    # Initialize array z, same length as the corpus
    z = [None] * m

    # Get the number of unique POS tags
    num_tags = best_probs.shape[0]

    # Initialize the best probability for the last word
    best_prob_for_last_word = float('-inf')

    # Initialize pred array, same length as corpus
    pred = [None] * m

    ### START CODE HERE (Replace instances of 'None' with your code) ###
    ## Step 1 ##

    # Go through each POS tag for the last word (last column of best_probs)
    # in order to find the row (POS tag integer ID) 
    # with highest probability for the last word
    for k in range(num_tags): # complete this line

        # If the probability of POS tag at row k 
        # is better than the previously best probability for the last word:
        if best_probs[k, -1] > best_prob_for_last_word: # complete this line

            # Store the new best probability for the lsat word
            best_prob_for_last_word = best_probs[k, -1]

            # Store the unique integer ID of the POS tag
            # which is also the row number in best_probs
            z[m - 1] = k

    # Convert the last word's predicted POS tag
    # from its unique integer ID into the string representation
    # using the 'states' dictionary
    # store this in the 'pred' array for the last word
    pred[m - 1] = states[z[m - 1]]

    ## Step 2 ##
    # Find the best POS tags by walking backward through the best_paths
    # From the last word in the corpus to the 0th word in the corpus
    for i in range(m - 1, 0, -1): # complete this line

        # Retrieve the unique integer ID of
        # the POS tag for the word at position 'i' in the corpus
        pos_tag_for_word_i = z[i]

        # In best_paths, go to the row representing the POS tag of word i
        # and the column representing the word's position in the corpus
        # to retrieve the predicted POS for the word at position i-1 in the corpus
        z[i - 1] = best_paths[pos_tag_for_word_i, i]

        # Get the previous word's POS tag in string form
        # Use the 'states' dictionary, 
        # where the key is the unique integer ID of the POS tag,
        # and the value is the string representation of that POS tag
        pred[i - 1] = states[z[i - 1]]

     ### END CODE HERE ###
    return pred
states = matrices.tags
pred = viterbi_backward(best_probs, best_paths, corpus=prep, states=states)
m=len(pred)
actual_prep = prep[-7:m-1]
actual_pred = pred[-7:m-1]

expected_prep =  ['see', 'them', 'here', 'with', 'us', '.']  
print('The prediction for pred[-7:m-1] is: \n', actual_prep, "\n", actual_pred, "\n")
print('The prediction for pred[0:7] is: \n', pred[0:7], "\n", prep[0:7])
The prediction for pred[-7:m-1] is: 
 ['them', 'here', 'with', 'us', '.', '--n--'] 
 ['PRP', 'RB', 'IN', 'PRP', '.', '--s--'] 

The prediction for pred[0:8] is: 
 ['DT', 'NN', 'POS', 'NN', 'MD', 'VB', 'VBN'] 
 ['The', 'economy', "'s", 'temperature', 'will', 'be', 'taken']

End

Bundle it up.

Imports

# python
from collections import Counter

import math

# pypi
import attr
import numpy

# this project
from .preprocessing import DataLoader, Empty
from .training import TheTrainer
from .matrices import Matrices

An Exception

This is so that if the viterbi is called out of order things will break.

class AlgorithmError(Exception):
    """Called when the methods are called out of order"""

The Model Class

@attr.s(auto_attribs=True)
class HiddenMarkov:
    """A Hidden Markov Model Class

    Args:
     loader: a DataLoader
     trainer: A TheTrainer object
     matrices: A Matrices object
    """
    loader: DataLoader
    trainer: TheTrainer
    matrices: Matrices
    best_probabilities: numpy.ndarray=None
    best_paths: numpy.ndarray=None
    _states: list=None
    _tag_counts: Counter=None
    _tag_count: int=None
    _transition_matrix: numpy.ndarray=None
    _emission_matrix: numpy.ndarray=None
    _test_words: list=None
    _test_word_count: int=None
    _vocabulary: dict=None
    _start_token_index: int=None
    _negative_infinity: float = None

The States List

@property
def states(self) -> list:
    """POS Tags representing nodes in the HMM graph

    Returns:
     list of POS tags found in the training set
    """
    if self._states is None:
        self._states = self.matrices.tags
    return self._states

The Tag Counts

@property
def tag_counts(self) -> Counter:
    """The number of times a POS tag was in the training set

    Returns:
     dict-like of POS: Count
    """
    if self._tag_counts is None:
        self._tag_counts = self.trainer.tag_counts
    return self._tag_counts

Tag Count

@property
def tag_count(self) -> int:
    """The Number of tags in the corpus"""
    if self._tag_count is None:
        self._tag_count = len(self.tag_counts)
    return self._tag_count

Transition Matrix (A)

@property
def transition_matrix(self) -> numpy.ndarray:
    """The 'A' Matrix with the transitions"""
    if self._transition_matrix is None:
        self._transition_matrix = self.matrices.transition
    return self._transition_matrix

Emission Matrix (B)

@property
def emission_matrix(self) -> numpy.ndarray:
    """The Emission matrix (B)"""
    if self._emission_matrix is None:
        self._emission_matrix = self.matrices.emission
    return self._emission_matrix

Test Words

@property
def test_words(self) -> list:
    """The preprocessed test-words"""
    if self._test_words is None:
        self._test_words = self.loader.test_words
    return self._test_words

Test Word Count

@property
def test_word_count(self) -> int:
    """Number of words in the test set"""
    if self._test_word_count is None:
        self._test_word_count = len(self.test_words)
    return self._test_word_count

Vocabulary

@property
def vocabulary(self) -> dict:
    """Training tokens mapped to index in the training corpus"""
    if self._vocabulary is None:
        self._vocabulary = self.loader.vocabulary
    return self._vocabulary

Start Token Index

@property
def start_token_index(self) -> int:
    """The index of the start token in the graph states"""
    if self._start_token_index is None:
        self._start_token_index = self.states.index(Empty.tag)
    return self._start_token_index

Negative Infinity

@property
def negative_infinity(self) -> float:
    """a value for no probability"""
    if self._negative_infinity is None:
        self._negative_infinity = float("-inf")
    return self._negative_infinity

Initialize the Matrices

def initialize_matrices(self):
    """Initializes the ``best_probs`` and ``best_paths`` matrices

    """
    self.best_probabilities = numpy.zeros((self.tag_count, self.test_word_count))
    self.best_paths = numpy.zeros((self.tag_count, self.test_word_count), dtype=int)

    for pos_tag in range(len(self.states)):
        if self.transition_matrix[self.start_token_index, pos_tag] == 0:
            self.best_probabilities[pos_tag, 0] = self.negative_infinity
        else:
            self.best_probabilities[pos_tag, 0] = (
                math.log(self.transition_matrix[self.start_token_index, pos_tag])
                + math.log(self.emission_matrix[
                    pos_tag, self.vocabulary[self.test_words[0]]]))
    return

Virterbi Forward

def viterbi_forward(self):
    """The forward training pass

    Raises:
      AlgorithmError: initalize_matrices wasn't run before this method
    """
    if self.best_probabilities is None:
        raise AlgorithmError("initialize_matrices must be called before viterbi_forward")
    for word in range(1, self.test_word_count): 
        for pos_tag in range(self.tag_count):
            best_probability_for_this_tag = self.negative_infinity
            best_path_for_this_tag = None
            for previous_possible_tag in range(self.tag_count):

                probability = (
                    self.best_probabilities[previous_possible_tag, word-1]
                    + math.log(self.transition_matrix[previous_possible_tag, pos_tag])
                    + math.log(self.emission_matrix[
                        pos_tag,
                        self.vocabulary[self.test_words[word]]]))

                if probability > best_probability_for_this_tag:
                    best_probability_for_this_tag = probability
                    best_path_for_this_tag = previous_possible_tag
            self.best_probabilities[pos_tag, word] = best_probability_for_this_tag
            self.best_paths[pos_tag, word] = best_path_for_this_tag
    return

Viterbi Backward

def viterbi_backward(self):
    """
    This function creates the best path.

    Raises:
     AlgorithmError: initialize or forward-pass not done
    """
    if self.best_probabilities is None:
        raise AlgorithmError("initialize and forward-pass not run")
    elif self.best_probabilities[:, 1:].sum() == 0:
        raise AlgorithmError("forward-pass not run")

    z = [None] * self.test_word_count

    best_probability_for_last_word = self.negative_infinity
    prediction = [None] * self.test_word_count
    last_column = self.test_word_count - 1
    for pos_tag in range(self.tag_count):
        if self.best_probabilities[pos_tag, last_column] > best_probability_for_last_word:
            best_probability_for_last_word = self.best_probabilities[pos_tag, last_column]
            z[last_column] = pos_tag
    prediction[last_column] = self.states[z[last_column]]

    for word in range(last_column, 0, -1):
        previous_word = word - 1
        pos_tag_for_word = z[word]
        z[previous_word] = self.best_paths[pos_tag_for_word, word]
        prediction[previous_word] = self.states[z[previous_word]]
    self.predictions = prediction    
    return

Call It

def __call__(self):
    """Calls the methods in order"""
    self.initialize_matrices()
    self.viterbi_forward()
    self.viterbi_backward()
    return

Test it out

from neurotic.nlp.parts_of_speech import HiddenMarkov

model = HiddenMarkov(loader, trainer, matrices)
assert all(original == other for original, other in zip(states, model.states))
assert all(value == model.tag_counts[key] for key, value in tag_counts.items())
assert numpy.array_equal(A, model.transition_matrix)
assert numpy.array_equal(B, model.emission_matrix)
assert all(original == new for original, new in zip(prep, model.test_words))
assert all(value == model.vocabulary[key] for key, value in vocab.items())

model.initialize_matrices()

assert model.best_probabilities.shape == best_probs.shape
assert model.best_paths.shape == best_paths.shape

actual = model.best_probabilities[0,0]
expected = -22.6098
assert math.isclose(actual, expected, abs_tol=1e-4), (actual, expected)

actual = model.best_paths[2,3]
expected = 0.0000
assert math.isclose(actual, expected)

model.viterbi_forward()
expected = -24.7822
actual = model.best_probabilities[0,1]
assert math.isclose(expected, actual, abs_tol=1e-4)

actual = model.best_probabilities[0,4]
expected = -49.5601
assert math.isclose(actual, expected, abs_tol=1e-4)

model.viterbi_backward()
actual = pred[0:7]
expected = ['DT', 'NN', 'POS', 'NN', 'MD', 'VB', 'VBN']
assert all((a==e for a,e in zip(actual, expected)))

Run The Methods In the correct order with a call

mock = MagicMock()
step_1 = MagicMock()
step_2 = MagicMock()
step_3 = MagicMock()

mock.initialize = step_1
mock.viterbi_forward = step_2
mock.viterbi_backward = step_3

HiddenMarkov.initialize_matrices = step_1
HiddenMarkov.viterbi_forward = step_2
HiddenMarkov.viterbi_backward = step_3
model = HiddenMarkov(loader, trainer, matrices)
model()
expect(mock.mock_calls).to(equal([call.initialize(),
                                  call.viterbi_forward(),
                                  call.viterbi_backward()]))

Parts-of-Speech Tagging: Hidden Markov Model

Beginning

Now you will build something more context specific. Concretely, you will be implementing a Hidden Markov Model (HMM) with a Viterbi decoder

  • The HMM is one of the most commonly used algorithms in Natural Language Processing, and is a foundation to many deep learning techniques you will see in this specialization.
  • In addition to parts-of-speech tagging, HMM is used in speech recognition, speech synthesis, etc.
  • By completing this part of the assignment you will get a 95% accuracy on the same dataset you used in Part 1.

The Markov Model contains a number of states and the probability of transition between those states.

  • In this case, the states are the parts-of-speech.
  • A Markov Model utilizes a transition matrix, A.
  • A Hidden Markov Model adds an observation or emission matrix B which describes the probability of a visible observation when we are in a particular state.
  • In this case, the emissions are the words in the corpus
  • The state, which is hidden, is the POS tag of that word.

Imports

# python
from argparse import Namespace
from functools import partial

import math

# pypi
from dotenv import load_dotenv

import holoviews
import hvplot.pandas
import numpy
import pandas

# this repository
from neurotic.nlp.parts_of_speech import DataLoader, TheTrainer

# my other stuff
from graeae import EmbedHoloviews

Set Up

The Data

load_dotenv("posts/nlp/.env")
loader = DataLoader()
trainer = TheTrainer(loader.processed_training)

Plotting

SLUG = "parts-of-speech-tagging-hidden-markov-model"
FOLDER = f"files/posts/nlp/{SLUG}/" 
Embed = partial(EmbedHoloviews, folder_path=FOLDER)

Plot = Namespace(
    width=990,
    height=780,
    fontscale=2,
    tan="#ddb377",
    blue="#4687b7",
    red="#ce7b6d",
    color_map="OrRd",
    path_color_map="blues",
)

Middle

Generating Matrices

Creating the 'A' transition probabilities matrix

Now that you have your `emission_counts`, `transition_counts`, and `tag_counts`, you will start implementing the Hidden Markov Model.

This will allow you to quickly construct the

  • A transition probabilities matrix.
  • and the B emission probabilities matrix.

You will also use some smoothing when computing these matrices.

Here is an example of what the A transition matrix would look like (it is simplified to 5 tags for viewing. It is a 46x46 matrix in this assignment.

A ā€¦ RBS RP SYM TO UH ā€¦
RBS ā€¦ 2.217069e-06 2.217069e-06 2.217069e-06 0.008870 2.217069e-06 ā€¦
RP ā€¦ 3.756509e-07 7.516775e-04 3.756509e-07 0.051089 3.756509e-07 ā€¦
SYM ā€¦ 1.722772e-05 1.722772e-05 1.722772e-05 0.000017 1.722772e-05 ā€¦
TO ā€¦ 4.477336e-05 4.472863e-08 4.472863e-08 0.000090 4.477336e-05 ā€¦
UH ā€¦ 1.030439e-05 1.030439e-05 1.030439e-05 0.061837 3.092348e-02 ā€¦
ā€¦ ā€¦ ā€¦ ā€¦ ā€¦ ā€¦ ā€¦ ā€¦

Note that the matrix above was computed with smoothing.

Each cell gives you the probability to go from one part of speech to another.

  • In other words, there is a 4.47e-8 chance of going from parts-of-speech TO to RP.
  • The sum of each row has to equal 1, because we assume that the next POS tag must be one of the available columns in the table.

The smoothing was done as follows:

\[ P(t_i | t_{i-1}) = \frac{C(t_{i-1}, t_{i}) + \alpha }{C(t_{i-1}) +\alpha * N}\tag{3} \]

  • \(N\) is the total number of tags
  • \(C(t_{i-1}, t_{i})\) is the count of the tuple (previous POS, current POS) in `transition_counts` dictionary.
  • \(C(t_{i-1})\) is the count of the previous POS in the `tag_counts` dictionary.
  • \(\alpha\) is a smoothing parameter.
def create_transition_matrix(alpha: float, tag_counts: dict,
                             transition_counts: dict) -> numpy.ndarray:
    """Transition Matrix for the Hidden Markov Model

    Args: 
      ``alpha``: number used for smoothing
      ``tag_counts``: a dictionary mapping each tag to its respective count
      ``transition_counts``: transition count for the previous word and tag

    Returns:
      ``A``: matrix of dimension (``num_tags``,``num_tags``)
    """
    # Get a sorted list of unique POS tags
    all_tags = sorted(tag_counts.keys())

    # Count the number of unique POS tags
    num_tags = len(all_tags)

    # Initialize the transition matrix 'A'
    A = numpy.zeros((num_tags,num_tags))

    # Get the unique transition tuples (previous POS, current POS)
    trans_keys = set(transition_counts.keys())

    # Go through each row of the transition matrix A
    for i in range(num_tags):

        # Go through each column of the transition matrix A
        for j in range(num_tags):

            # Initialize the count of the (prev POS, current POS) to zero
            count = 0

            # Define the tuple (prev POS, current POS)
            # Get the tag at position i and tag at position j (from the all_tags list)
            key = (all_tags[i], all_tags[j])

            # Check if the (prev POS, current POS) tuple 
            # exists in the transition counts dictionary
            if key in transition_counts: #complete this line

                # Get count from the transition_counts dictionary 
                # for the (prev POS, current POS) tuple
                count = transition_counts[key]

            # Get the count of the previous tag (index position i) from tag_counts
            count_prev_tag = tag_counts[all_tags[i]]

            # Apply smoothing using count of the tuple, alpha, 
            # count of previous tag, alpha, and total number of tags
            A[i,j] = (count + alpha)/(count_prev_tag + alpha * num_tags)
    return A
# setup some values
alpha = 0.001
states = sorted(trainer.tag_counts.keys())

A = create_transition_matrix(alpha,
                             trainer.tag_counts,
                             trainer.transition_counts)
# Testing your function
expected = 0.000007040
actual = A[0,0]

print(f"A at row 0, col 0: {actual:.9f}")
assert math.isclose(expected, actual, abs_tol=1e-6), (expected, actual)

expected = 0.1691
actual = A[3,1]
print(f"A at row 3, col 1: {actual:.4f}")
assert math.isclose(expected, actual, abs_tol=1e-4)

print("View a subset of transition matrix A")
actual = A[30:35,30:35]
A_sub = pandas.DataFrame(actual, index=states[30:35], columns = states[30:35] )
print(A_sub)

expected = numpy.array([
 [2.217069e-06, 2.217069e-06, 2.217069e-06, 0.008870, 2.217069e-06],
 [3.756509e-07, 7.516775e-04, 3.756509e-07, 0.051089, 3.756509e-07],
 [1.722772e-05, 1.722772e-05, 1.722772e-05, 0.000017, 1.722772e-05],
 [4.477336e-05, 4.472863e-08, 4.472863e-08, 0.000090, 4.477336e-05],
 [1.030439e-05, 1.030439e-05, 1.030439e-05, 0.061837, 3.092348e-02],
])

assert numpy.allclose(expected, actual, atol=1e-5)
A at row 0, col 0: 0.000007040
A at row 3, col 1: 0.1691
View a subset of transition matrix A
              RBS            RP           SYM        TO            UH
RBS  2.217069e-06  2.217069e-06  2.217069e-06  0.008870  2.217069e-06
RP   3.756509e-07  7.516775e-04  3.756509e-07  0.051089  3.756509e-07
SYM  1.722772e-05  1.722772e-05  1.722772e-05  0.000017  1.722772e-05
TO   4.477336e-05  4.472863e-08  4.472863e-08  0.000090  4.477336e-05
UH   1.030439e-05  1.030439e-05  1.030439e-05  0.061837  3.092348e-02
plotter = pandas.DataFrame(A, index=states, columns=states)[::-1]
plot = plotter.hvplot.heatmap(cmap=Plot.color_map).opts(
    title="Emission Matrix A",
    width=Plot.width, height=Plot.height, fontscale=Plot.fontscale,
    xrotation=90,
)
#plot *= holoviews.Labels(plot)
outcome = Embed(plot=plot, file_name="emission_matrix_a")()
print(outcome)

Figure Missing

Looking at the plot you can see that there are a few transitions that are very likely. Maybe we can look at them to see if they're helpful.

URL = "https://www.ling.upenn.edu/courses/Fall_2003/ling001/penn_treebank_pos.html"
data = pandas.read_html(URL, header=0)[0]

TRANSLATOR = {row.Tag:row.Description for row in data.itertuples()}
print("|Tag| Description|")
print("|-+-|")
for tag in states:
    print(f"|{tag}|{TRANSLATOR.get(tag, 'unknown')}|")
Tag Description
# unknown
$ unknown
'' unknown
( unknown
) unknown
, unknown
. unknown
: unknown
CC Coordinating conjunction
CD Cardinal number
DT Determiner
EX Existential there
FW Foreign word
IN Preposition or subordinating conjunction
JJ Adjective
JJR Adjective, comparative
JJS Adjective, superlative
LS List item marker
MD Modal
NN Noun, singular or mass
NNP Proper noun, singular
NNPS Proper noun, plural
NNS Noun, plural
PDT Predeterminer
POS Possessive ending
PRP Personal pronoun
PRP$ Possessive pronoun
RB Adverb
RBR Adverb, comparative
RBS Adverb, superlative
RP Particle
SYM Symbol
TO to
UH Interjection
VB Verb, base form
VBD Verb, past tense
VBG Verb, gerund or present participle
VBN Verb, past participle
VBP Verb, non-3rd person singular present
VBZ Verb, 3rd person singular present
WDT Wh-determiner
WP Wh-pronoun
WP$ Possessive wh-pronoun
WRB Wh-adverb
`` unknown

The highest probabilities are at the bottom of the table (the red blocks) where it show that there's a 99% probability that a $ or a # will be followed by a Cardinal Number, which seems to make sense, especially since the original source was the Wall Street Journal, and you might expect there to be references to dollars. The next highest probability is that a Predeterminer will be followed by a Determiner, which makes sense based on the names, although I have no idea what those things are. And then the notion that a . will be followed by a --s--- (a period will be followed by the start of a new statement). So, at least for the most common cases it looks fairly intuitive.

Create the 'B' emission probabilities matrix

Now you will create the B transition matrix which computes the emission probability.

You will use smoothing as defined below:

\[ P(w_i | t_i) = \frac{C(t_i, word_i)+ \alpha}{C(t_{i}) +\alpha * N} \]

  • \(C(t_i, word_i)\) is the number of times\$word_i\) was associated with \(tag_i\) in the training data (stored in emission_counts dictionary).
  • \(C(t_i)\) is the number of times \(tag_i\) was in the training data (stored in tag_counts dictionary).
  • \(N\) is the number of words in the vocabulary
  • \(\alpha\) is a smoothing parameter.

The matrix B is of dimension (num_tags, N), where num_tags is the number of possible parts-of-speech tags.

Here is an example of the matrix, only a subset of tags and words are shown:

B ā€¦ 725 adroitly engineers promoted synergy ā€¦
  ā€¦ 8.201296e-05 2.732854e-08 2.732854e-08 2.732854e-08 2.732854e-08 ā€¦
NN ā€¦ 7.521128e-09 7.521128e-09 7.521128e-09 7.521128e-09 2.257091e-05 ā€¦
NNS ā€¦ 1.670013e-08 1.670013e-08 4.676203e-04 1.670013e-08 1.670013e-08 ā€¦
VB ā€¦ 3.779036e-08 3.779036e-08 3.779036e-08 3.779036e-08 3.779036e-08 ā€¦
RB ā€¦ 3.226454e-08 6.456135e-05 3.226454e-08 3.226454e-08 3.226454e-08 ā€¦
RP ā€¦ 3.723317e-07 3.723317e-07 3.723317e-07 3.723317e-07 3.723317e-07 ā€¦
ā€¦ ā€¦ ā€¦ ā€¦ ā€¦ ā€¦ ā€¦ ā€¦

We're now going to implement the create_emission_matrix that computes the B emission probabilities matrix. Your function takes in \(\alpha\), the smoothing parameter, tag_counts, which is a dictionary mapping each tag to its respective count, the emission_counts dictionary where the keys are (tag, word) and the values are the counts. Your task is to output a matrix that computes equation 4 for each cell in matrix B.

Create Emission Matrix

def create_emission_matrix(alpha: float,
                           tag_counts: dict,
                           emission_counts: dict,
                           vocab: dict) -> numpy.ndarray:
    """Create Matrix B

    Args: 
       ``alpha``: tuning parameter used in smoothing 
       ``tag_counts``: a dictionary mapping each tag to its respective count
       ``emission_counts``: a dictionary where the keys are (tag, word) and the values are the counts
       ``vocab``: a dictionary where keys are words in vocabulary and value is an index.
              within the function it'll be treated as a list
    Returns:
       ``B``: a matrix of dimension ``(num_tags, len(vocab))``
    """

    # get the number of POS tag
    num_tags = len(tag_counts)

    # Get a list of all POS tags
    all_tags = sorted(tag_counts.keys())

    # Get the total number of unique words in the vocabulary
    num_words = len(vocab)

    # Initialize the emission matrix B with places for
    # tags in the rows and words in the columns
    B = numpy.zeros((num_tags, num_words))

    # Get a set of all (POS, word) tuples 
    # from the keys of the emission_counts dictionary
    emis_keys = set(list(emission_counts.keys()))

    ### START CODE HERE (Replace instances of 'None' with your code) ###

    # Go through each row (POS tags)
    for i in range(num_tags): # complete this line

        # Go through each column (words)
        for j in range(num_words): # complete this line

            # Initialize the emission count for the (POS tag, word) to zero
            count = 0

            # Define the (POS tag, word) tuple for this row and column
            key =  (all_tags[i], vocab[j])

            # check if the (POS tag, word) tuple exists as a key in emission counts
            if key in emission_counts: # complete this line

                # Get the count of (POS tag, word) from the emission_counts d
                count = emission_counts[key]

            # Get the count of the POS tag
            count_tag = tag_counts[all_tags[i]]

            # Apply smoothing and store the smoothed value 
            # into the emission matrix B for this row and column
            B[i,j] = (count + alpha)/(count_tag + alpha * num_words)

    ### END CODE HERE ###
    return B
# creating your emission probability matrix. this takes a few minutes to run.
vocab = loader.vocabulary
B = create_emission_matrix(alpha,
                           trainer.tag_counts,
                           trainer.emission_counts,
                           list(vocab))

actual = B[0,0]
expected = 0.000006032
print(f"View Matrix position at row 0, column 0: {actual:.9f}")
assert math.isclose(actual, expected, abs_tol=1e-6)

actual = B[3,1]
expected = 0.000000720
print(f"View Matrix position at row 3, column 1: {actual:.9f}")
assert math.isclose(actual, expected, abs_tol=1e-7)

# Try viewing emissions for a few words in a sample dataframe
cidx  = ['725','adroitly','engineers', 'promoted', 'synergy']

# Get the integer ID for each word
cols = [vocab[a] for a in cidx]

# Choose POS tags to show in a sample dataframe
rvals =['CD','NN','NNS', 'VB','RB','RP']

# For each POS tag, get the row number from the 'states' list
rows = [states.index(a) for a in rvals]

# Get the emissions for the sample of words, and the sample of POS tags
actual = B[numpy.ix_(rows,cols)]
B_sub = pandas.DataFrame(actual, index=rvals, columns = cidx )
print(B_sub)
expected = ([
 [8.201296e-05, 2.732854e-08, 2.732854e-08, 2.732854e-08, 2.732854e-08],
 [7.521128e-09, 7.521128e-09, 7.521128e-09, 7.521128e-09, 2.257091e-05],
 [1.670013e-08, 1.670013e-08, 4.676203e-04, 1.670013e-08, 1.670013e-08],
 [3.779036e-08, 3.779036e-08, 3.779036e-08, 3.779036e-08, 3.779036e-08],
 [3.226454e-08, 6.456135e-05, 3.226454e-08, 3.226454e-08, 3.226454e-08],
 [3.723317e-07, 3.723317e-07, 3.723317e-07, 3.723317e-07, 3.723317e-07],
])

assert numpy.allclose(expected, actual, atol=1e-5)
View Matrix position at row 0, column 0: 0.000006032
View Matrix position at row 3, column 1: 0.000000720
              725      adroitly     engineers      promoted       synergy
CD   8.201296e-05  2.732854e-08  2.732854e-08  2.732854e-08  2.732854e-08
NN   7.521128e-09  7.521128e-09  7.521128e-09  7.521128e-09  2.257091e-05
NNS  1.670013e-08  1.670013e-08  4.676203e-04  1.670013e-08  1.670013e-08
VB   3.779036e-08  3.779036e-08  3.779036e-08  3.779036e-08  3.779036e-08
RB   3.226454e-08  6.456135e-05  3.226454e-08  3.226454e-08  3.226454e-08
RP   3.723317e-07  3.723317e-07  3.723317e-07  3.723317e-07  3.723317e-07
plotter = B_sub[::-1]
plot = plotter.hvplot.heatmap(cmap=Plot.color_map).opts(
    title="Emission Matrix B",
    width=Plot.width, height=Plot.height, fontscale=Plot.fontscale,
    xrotation=90,
)
outcome = Embed(plot=plot, file_name="emission_matrix_b")()
print(outcome)

Figure Missing

There's 23,777 words in the vocabulary so I'm not plotting the whole thing.

End

Bundle It Up

The Imports

# pypi
import attr
import numpy

The Matrices

@attr.s(auto_attribs=True)
class Matrices:
    """The matrices for the hidden markov model

    Args:
     ``transition_counts``: dictionary of counts of adjacent POS tags
     ``emission_counts``: dictionary of (word, POS) counts
     ``tag_counts``: dictionary of POS tag-counts
     ``words``: list of words in the vocabulary
     ``alpha``: The smoothing value
    """
    transition_counts: dict
    emission_counts: dict
    tag_counts: dict
    # all the lists have to be sorted for the matrices to match
    words: list=attr.ib(converter=sorted)
    alpha: float=0.001
    _tags: list=None
    _tag_count: int=None
    _word_count: int=None
    _transition: numpy.ndarray=None
    _emission: numpy.ndarray=None

The Tags

@property
def tags(self) -> list:
    """Sorted list of the POS tags"""
    if self._tags is None:
        self._tags = sorted(self.tag_counts)
    return self._tags

The Tag Count

@property
def tag_count(self) -> int:
    """Number of tags"""
    if self._tag_count is None:
        self._tag_count = len(self.tags)
    return self._tag_count

The Word Count

@property
def word_count(self) -> int:
    """Number of words in the vocabulary"""
    if self._word_count is None:
        self._word_count = len(self.words)
    return self._word_count

The Transition Matrix

@property
def transition(self) -> numpy.ndarray:
    """The Transition Matrix"""
    if self._transition is None:
        self._transition = numpy.zeros((self.tag_count, self.tag_count))
        for row in range(self.tag_count):
            for column in range(self.tag_count):
                key = (self.tags[row], self.tags[column])
                count = self.transition_counts[key] if key in self.transition_counts else 0
                previous_tag_count = self.tag_counts[self.tags[row]]
                self._transition[row, column] = (
                    (count + self.alpha)
                    /(previous_tag_count + self.alpha * self.tag_count))
    return self._transition

The Emission Matrix

@property
def emission(self) -> numpy.ndarray:
    """The Emission Matrix"""
    if self._emission is None:
        self._emission = numpy.zeros((self.tag_count, self.word_count))
        for row in range(self.tag_count):
            for column in range(self.word_count):
                key = (self.tags[row], self.words[column])
                emission_count = self.emission_counts[key] if key in self.emission_counts else 0
                tag_count = self.tag_counts[self.tags[row]]
                self._emission[row, column] = (
                    (emission_count + self.alpha)
                    /(tag_count + self.alpha * self.word_count)                
                )
    return self._emission

Test It Out

from neurotic.nlp.parts_of_speech.matrices import Matrices
load_dotenv("posts/nlp/.env")
loader = DataLoader()
trainer = TheTrainer(loader.processed_training)

matrices = Matrices(transition_counts=trainer.transition_counts,
                    emission_counts=trainer.emission_counts,
                    tag_counts=trainer.tag_counts,
                    words=loader.vocabulary_words,
                    alpha=0.001)

The Transition Matrix

transition = matrices.transition
expected = 0.000007040
actual = transition_matrix[0,0]

print(f"Transition Matrix at row 0, col 0: {actual:.9f}")
assert math.isclose(expected, actual, abs_tol=1e-6), (expected, actual)

expected = 0.1691
actual = transition_matrix[3,1]
print(f"Transition Matrix at row 3, col 1: {actual:.4f}")
assert math.isclose(expected, actual, abs_tol=1e-4)

print("View a subset of the transition matrix")
actual = transition_matrix[30:35,30:35]
A_sub = pandas.DataFrame(actual, index=states[30:35], columns = states[30:35] )
print(A_sub)

expected = numpy.array([
 [2.217069e-06, 2.217069e-06, 2.217069e-06, 0.008870, 2.217069e-06],
 [3.756509e-07, 7.516775e-04, 3.756509e-07, 0.051089, 3.756509e-07],
 [1.722772e-05, 1.722772e-05, 1.722772e-05, 0.000017, 1.722772e-05],
 [4.477336e-05, 4.472863e-08, 4.472863e-08, 0.000090, 4.477336e-05],
 [1.030439e-05, 1.030439e-05, 1.030439e-05, 0.061837, 3.092348e-02],
])

assert numpy.allclose(expected, actual, atol=1e-5)
Transition Matrix at row 0, col 0: 0.000007040
Transition Matrix at row 3, col 1: 0.1691
View a subset of the transition matrix
              RBS            RP           SYM        TO            UH
RBS  2.217069e-06  2.217069e-06  2.217069e-06  0.008870  2.217069e-06
RP   3.756509e-07  7.516775e-04  3.756509e-07  0.051089  3.756509e-07
SYM  1.722772e-05  1.722772e-05  1.722772e-05  0.000017  1.722772e-05
TO   4.477336e-05  4.472863e-08  4.472863e-08  0.000090  4.477336e-05
UH   1.030439e-05  1.030439e-05  1.030439e-05  0.061837  3.092348e-02

The Emission Matrix

matrices = Matrices(transition_counts=trainer.transition_counts,
                    emission_counts=trainer.emission_counts,
                    words=loader.vocabulary_words,
                    tag_counts=trainer.tag_counts,
                    alpha=0.001)
emission = matrices.emission

actual = emission[0,0]
expected = 0.000006032
print(f"Emission Matrix position at row 0, column 0: {actual:.9f}")
assert math.isclose(actual, expected, abs_tol=1e-6)

actual = emission[3,1]
expected = 0.000000720
print(f"Emission Matrix position at row 3, column 1: {actual:.9f}")
assert math.isclose(actual, expected, abs_tol=1e-7)

# Try viewing emissions for a few words in a sample dataframe
columns  = ['725','adroitly','engineers', 'promoted', 'synergy']

# Get the integer ID for each word
column_ids = [loader.vocabulary[column] for column in columns]

# Choose POS tags to show in a sample dataframe
rows =['CD','NN','NNS', 'VB','RB','RP']

# For each POS tag, get the row number from the 'states' list
row_numbers = [matrices.tags.index(a) for a in rows]

# Get the emissions for the sample of words, and the sample of POS tags
actual = emission[numpy.ix_(row_numbers,column_ids)]
B_sub = pandas.DataFrame(actual, index=rows, columns = columns)
print(B_sub)
expected = ([
 [8.201296e-05, 2.732854e-08, 2.732854e-08, 2.732854e-08, 2.732854e-08],
 [7.521128e-09, 7.521128e-09, 7.521128e-09, 7.521128e-09, 2.257091e-05],
 [1.670013e-08, 1.670013e-08, 4.676203e-04, 1.670013e-08, 1.670013e-08],
 [3.779036e-08, 3.779036e-08, 3.779036e-08, 3.779036e-08, 3.779036e-08],
 [3.226454e-08, 6.456135e-05, 3.226454e-08, 3.226454e-08, 3.226454e-08],
 [3.723317e-07, 3.723317e-07, 3.723317e-07, 3.723317e-07, 3.723317e-07],
])

assert numpy.allclose(expected, actual, atol=1e-5)
Emission Matrix position at row 0, column 0: 0.000006032
Emission Matrix position at row 3, column 1: 0.000000720
              725      adroitly     engineers      promoted       synergy
CD   8.201296e-05  2.732854e-08  2.732854e-08  2.732854e-08  2.732854e-08
NN   7.521128e-09  7.521128e-09  7.521128e-09  7.521128e-09  2.257091e-05
NNS  1.670013e-08  1.670013e-08  4.676203e-04  1.670013e-08  1.670013e-08
VB   3.779036e-08  3.779036e-08  3.779036e-08  3.779036e-08  3.779036e-08
RB   3.226454e-08  6.456135e-05  3.226454e-08  3.226454e-08  3.226454e-08
RP   3.723317e-07  3.723317e-07  3.723317e-07  3.723317e-07  3.723317e-07

Note: I was getting the wrong values because I switched to the DataLoader.vocabulary_words list, which isn't sorted (because I was trying to follow the example). The vocabulary is sorted, but it's a dict so then you have to convert it to a listā€¦ in the future just sort everything.

Parts-of-Speech Tagging: Most Frequent Class Baseline

Begin

Imports

# python
from collections import Counter

# pypi
from dotenv import load_dotenv

# this repo
from neurotic.nlp.parts_of_speech.preprocessing import DataLoader
from neurotic.nlp.parts_of_speech.training import TheTrainer

Set Up

The Environment

load_dotenv("posts/nlp/.env")

The Data

loader = DataLoader()

The Trained Model

trainer = TheTrainer(loader.processed_training)

Middle

Most Frequent Class Baseline

The main purpose of part-of-speech tagging is to disambiguate words (Jurasky) - most words only have one part-of-speech tag, but the most commonly used words have more than one. To decide what the correct part-of-speech tag is for a word, you have to know its context and how to interpret it based on the context. Another way to choose a tag is to give each word the tag that it is most commonly associated with. This will then serve as a baseline for any more sophisticated algorithm. That's what we're going to do here.

Processed vs Vocabulary

Our preprocessed data is the words in the testing data and our vocabulary data is the words in the training set. We can't predict anything for the words in the testing set that aren't in the training set so let's see how much of an overlap there is.

preprocessed = set(loader.test_words)
vocabulary = set(loader.vocabulary)

in_common = preprocessed.intersection(vocabulary)
print(f"y in x: {100 * len(in_common)/len(preprocessed):0.3f} %")
y in x: 100.000 %

So, we shouldn't lose any words just because we never saw them during training.

print(loader.processed_training[:2])
[('In', 'IN'), ('an', 'DT')]

Our processed_training attribute is a list of <word>, <tag> tuples from the training set.

words_tags = set(loader.processed_training)
words = [word for word, tag in words_tags]
word_tag_counts = Counter(words)

word_tag_counts is the count of the number of tags each word had.

unambiguous = [word for word,count in word_tag_counts.items() if count == 1]
print("Percent of unambiguous training words: "
      f"{100 * len(unambiguous)/len(word_tag_counts):.2f} %")
Percent of unambiguous training words: 75.11 %

And now for the subset that is in both the training and testing set.

common_unambiguous = set(unambiguous).intersection(in_common)
print("percent of words in common that are unambiguous:"
      f"{len(common_unambiguous)/len(in_common) * 100: .2f}")
percent of words in common that are unambiguous: 58.19

Sort of low.

This means that if we just labeled the unambiguous words we'd be right about 58% of the time. Let's just double-check.

guesses = {word: tag for word, tag in loader.test_data.items() if word in unambiguous}

total = len(preprocessed)
correct_guess = 0

for word in preprocessed:
    if word in guesses:
        guess = guesses[word]
        correct_guess += 1 if guess == loader.test_data[word] else 0
    else:
        continue

print(f"Correct: {100 * correct_guess/total: 0.2f}%")
Correct:  58.17%

So this is the amount we get if we just use the unambiguous words. Our baseline should be even better.

The Baseline Implementation

def predict_pos(prep, y, emission_counts, vocab, states):
    '''
    Input: 
       prep: a preprocessed version of 'y'. A list with the 'word' component of the tuples.
       y: a corpus composed of a list of tuples where each tuple consists of (word, POS)
       emission_counts: a dictionary where the keys are (tag,word) tuples and the value is the count
       vocab: a dictionary where keys are words in vocabulary and value is an index
       states: a sorted list of all possible tags for this assignment
    Output: 
       accuracy: Number of times you classified a word correctly
    '''

    # Initialize the number of correct predictions to zero
    num_correct = 0

    # Get the (tag, word) tuples, stored as a set
    all_words = set(emission_counts.keys())

    # Get the number of (word, POS) tuples in the corpus 'y'
    total = len(y)
    for word, y_tup in zip(prep, y): 

        # Split the (word, POS) string into a list of two items
        y_tup_l = y_tup.split()

        # Verify that y_tup contain both word and POS
        if len(y_tup_l) == 2:

            # Set the true POS label for this word
            true_label = y_tup_l[1]

        else:
            # If the y_tup didn't contain word and POS, go to next word
            continue

        count_final = 0
        pos_final = ''

        # If the word is in the vocabulary...
        if word in vocab:
            for pos in states:

                # define the key as the tuple containing the POS and word
                key = (pos, word)

                # check if the (pos, word) key exists in the emission_counts dictionary
                if key in emission_counts: # complete this line
                # get the emission count of the (pos,word) tuple 
                    count = emission_counts[key]
                    # keep track of the POS with the largest count
                    if count > count_final: # complete this line
                        # update the final count (largest count)
                        count_final = count

                        # update the final POS
                        pos_final = pos
            # If the final POS (with the largest count) matches the true POS:
            if true_label == pos_final: # complete this line
                # Update the number of correct predictions
                num_correct += 1

    accuracy = num_correct / total

    return accuracy
states = sorted(trainer.tag_counts.keys())
accuracy_predict_pos = predict_pos(prep=loader.test_words, y=loader.test_data_raw,
                                   emission_counts=trainer.emission_counts,
                                   vocab=loader.vocabulary, states=states)
print(f"Accuracy of prediction using predict_pos is {accuracy_predict_pos:.4f}")
Accuracy of prediction using predict_pos is 0.8913

So our baseline prediction is 89% - any algorithm we create should do as well or better (but probably better for it to be worth doing).

Take Two

def predict(preprocessed: list, y: list, emission_counts: dict, vocabulary: dict, states: list):
    """Calculate the baseline accuracy
    Args: 
       preprocessed: a preprocessed version of 'y'. A list with the 'word' component of the tuples.
       y: a corpus composed of a list of tuples where each tuple consists of (word, POS)
       emission_counts: a dictionary where the keys are (tag,word) tuples and the value is the count
       vocabulary: a dictionary where keys are words in vocabulary and value is an index
       states: a sorted list of all possible tags for this assignment

    Returns: 
       accuracy: Number of times you classified a word correctly
    """
    # filter (``preprocessed`` has unknown words replaced by special tags so we
    # need to use it to filter separately from the words in ``y``)
    # because using the raw word instead of the special tag will throw away
    # the row but we tagged it specifically to keep it
    inputs = [(word, tokens) for word, tokens in zip(preprocessed, y)
              if (len(tokens) == 2 and word in vocabulary)]

    # our final data sets    
    true_tags = (tag for word, (raw, tag) in inputs)
    words = (word for word, (raw, tag) in inputs)

    # each guess is the POS tag for the word with the highest occurrence in the training data
    guesses = (
        max((emission_counts.get((pos, word), 0), pos) for pos in states)
        for word in words)

    # accuracy is sum of correct guesses/total rows in y
    return (sum(
        actual == guess for actual, (count, guess) in zip(true_tags, guesses))
            /len(y))
states = sorted(trainer.tag_counts.keys())
accuracy = predict(preprocessed=loader.test_words,
                   y=loader.test_data_tuples,
                   emission_counts=trainer.emission_counts,
                   vocabulary=loader.vocabulary,
                   states=states)
print(f"Accuracy of prediction using predict_pos is {accuracy:.4f}")
Accuracy of prediction using predict_pos is 0.8921

Parts-of-Speech Tagging: Training

Beginning

Imports

# python
from collections import defaultdict

# pypi
from dotenv import load_dotenv

# this repository
from neurotic.nlp.parts_of_speech.preprocessing import Environment, DataLoader

Set Up

The Environment

load_dotenv("posts/nlp/.env")

The Data

loader = DataLoader()

Training

In this section, you will find the words that are not ambiguous.

  • For example, the word is is a verb and it is not ambiguous.
  • In the WSJ corpus, 86% of the tokens are unambiguous (meaning they have only one tag)
  • About 14% are ambiguous (meaning that they have more than one tag)

Before you start predicting the tags of each word, you will need to compute a few dictionaries that will help you to generate the tables.

Preprocessing

# replace the next three code blocks once the assignment is done
import string

punct = set(string.punctuation)
noun_suffix = ["action", "age", "ance", "cy", "dom", "ee", "ence", "er", "hood", "ion", "ism", "ist", "ity", "ling", "ment", "ness", "or", "ry", "scape", "ship", "ty"]
verb_suffix = ["ate", "ify", "ise", "ize"]
adj_suffix = ["able", "ese", "ful", "i", "ian", "ible", "ic", "ish", "ive", "less", "ly", "ous"]
adv_suffix = ["ward", "wards", "wise"]
def get_word_tag(line: str, vocab: dict):
    """splits line and handles unknowns and empty lines


    Args:
     line: whitespace separated string with word and tag
     vocab: hashable that holds the known words
    """
    if not line.split():
        word = "--n--"
        tag = "--s--"
        return word, tag
    else:
        word, tag = line.split()
        if word not in vocab: 
            # Handle unknown words
            word = assign_unk(word)
        return word, tag
    return None 
def assign_unk(tok):
    """
    Assign unknown word tokens
    """
    # Digits
    if any(char.isdigit() for char in tok):
        return "--unk_digit--"

    # Punctuation
    elif any(char in punct for char in tok):
        return "--unk_punct--"

    # Upper-case
    elif any(char.isupper() for char in tok):
        return "--unk_upper--"

    # Nouns
    elif any(tok.endswith(suffix) for suffix in noun_suffix):
        return "--unk_noun--"

    # Verbs
    elif any(tok.endswith(suffix) for suffix in verb_suffix):
        return "--unk_verb--"

    # Adjectives
    elif any(tok.endswith(suffix) for suffix in adj_suffix):
        return "--unk_adj--"

    # Adverbs
    elif any(tok.endswith(suffix) for suffix in adv_suffix):
        return "--unk_adv--"

    return "--unk--"

Transition counts

  • The first dictionary is the transition_counts dictionary which computes the number of times each tag happened next to another tag.

This dictionary will be used to compute: \[ P(t_i |t_{i-1}) \]

This is the probability of a tag at position i given the tag at position i-1.

In order for you to compute equation 1, you will create a transition_counts dictionary where

  • The keys are (prev_tag, tag)
  • The values are the number of times those two tags appeared in that order.

Emission counts

The second dictionary you will compute is the emission_counts dictionary. This dictionary will be used to compute:

\[ P(w_i|t_i) \]

In other words, you will use it to compute the probability of a word given its tag.

In order for you to compute equation 2, you will create an emission_counts dictionary where

  • The keys are (tag, word)
  • The values are the number of times that pair showed up in your training set.

Tag counts

The last dictionary you will compute is the tag_counts dictionary.

  • The key is the tag
  • The value is the number of times each tag appeared.
 def create_dictionaries(training_corpus: list, vocab: dict):
     """Creat the three training dictionaries

     Args: 
        ``training_corpus``: a corpus where each line has a word followed by its tag.
        ``vocab``: a dictionary where keys are words in vocabulary and value is an index
     Returns: 
        ``emission_counts``: a dictionary where the keys are (tag, word) and the values are the counts
        ``transition_counts``: a dictionary where the keys are (prev_tag, tag) and the values are the counts
        ``tag_counts``: a dictionary where the keys are the tags and the values are the counts
     """

     # initialize the dictionaries using defaultdict
     emission_counts = defaultdict(int)
     transition_counts = defaultdict(int)
     tag_counts = defaultdict(int)

     # Initialize "prev_tag" (previous tag) with the start state, denoted by '--s--'
     prev_tag = '--s--' 

     # use 'i' to track the line number in the corpus
     i = 0 

     # Each item in the training corpus contains a word and its POS tag
     # Go through each word and its tag in the training corpus
     for word_tag in training_corpus:

         # Increment the word_tag count
         i += 1

         # Every 50,000 words, print the word count
         if i % 50000 == 0:
             print(f"word count = {i}")

         ### START CODE HERE (Replace instances of 'None' with your code) ###
         # get the word and tag using the get_word_tag helper function (imported from utils_pos.py)
         word, tag = get_word_tag(word_tag, vocab)

         # Increment the transition count for the previous word and tag
         transition_counts[(prev_tag, tag)] += 1

         # Increment the emission count for the tag and word
         emission_counts[(tag, word)] += 1

         # Increment the tag count
         tag_counts[tag] += 1

         # Set the previous tag to this tag (for the next iteration of the loop)
         prev_tag = tag

         ### END CODE HERE ###

     return emission_counts, transition_counts, tag_counts
emission_counts, transition_counts, tag_counts = create_dictionaries(loader.training_corpus, loader.vocabulary)

Get all the POS states.

states = sorted(tag_counts.keys())
print(f"Number of POS tags (number of 'states'): {len(states)}")
print("View these POS tags (states)")
print(states)

expected_states = ['#', '$', "''", '(', ')', ',', '--s--', '.', ':', 'CC', 'CD', 'DT', 'EX', 'FW', 'IN', 'JJ', 'JJR', 'JJS', 'LS', 'MD', 'NN', 'NNP', 'NNPS', 'NNS', 'PDT', 'POS', 'PRP', 'PRP$', 'RB', 'RBR', 'RBS', 'RP', 'SYM', 'TO', 'UH', 'VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ', 'WDT', 'WP', 'WP$', 'WRB', '``']

print(set(expected_states) - set(states))
for expected, actual in zip(expected_states, states):
    assert expected == actual, (expected, actual)
assert len(states) == 46, len(states)    

:RESULTS:

Number of POS tags (number of 'states'): 45
View these POS tags (states)
['#', '$', "''", '(', ')', ',', '.', ':', 'CC', 'CD', 'DT', 'EX', 'FW', 'IN', 'JJ', 'JJR', 'JJS', 'LS', 'MD', 'NN', 'NNP', 'NNPS', 'NNS', 'PDT', 'POS', 'PRP', 'PRP$', 'RB', 'RBR', 'RBS', 'RP', 'SYM', 'TO', 'UH', 'VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ', 'WDT', 'WP', 'WP$', 'WRB', '``']
{'--s--'}
print("transition examples: ")
expected = ((('--s--', 'IN'), 5050),
            (('IN', 'DT'), 32364),
            (('DT', 'NNP'), 9044))

for index, example in enumerate(list(transition_counts.items())[:3]):
    print(example)
    assert example == expected[index]
transition examples: 
(('--s--', 'IN'), 5050)
(('IN', 'DT'), 32364)
(('DT', 'NNP'), 9044)
expected = ((('DT', 'any'), 721),
            (('NN', 'decrease'), 7),
            (('NN', 'insider-trading'), 5))

print("emission examples: ")
for actual, expected in zip(list(emission_counts.items())[200:203], expected):
    print (actual)
    assert actual == expected
emission examples: 
(('DT', 'any'), 721)
(('NN', 'decrease'), 7)
(('NN', 'insider-trading'), 5)
expected = ((('RB', 'back'), 304),
            (('VB', 'back'), 20),
            (('RP', 'back'), 84),
            (('JJ', 'back'), 25),
            (('NN', 'back'), 29),
            (('VBP', 'back'), 4))

print("ambiguous word example: ")
counter = 0
for tup, cnt in emission_counts.items():
    if tup[1] == 'back':
        print(tup, cnt)
        assert expected[counter] == (tup, cnt)
        counter += 1
ambiguous word example: 
('RB', 'back') 304
('VB', 'back') 20
('RP', 'back') 84
('JJ', 'back') 25
('NN', 'back') 29
('VBP', 'back') 4

Bundle It Up

Imports

# python
from collections import defaultdict, Counter
# pypi
import attr

The Trainer

@attr.s(auto_attribs=True)
class TheTrainer:
    """Trains the POS model

    Args:
     corpus: iterable of word, tag tuples
    """
    corpus: list
    _transition_counts: dict=None
    _emission_counts: dict=None
    _tag_counts: dict=None

Transition Counts

This dictionary will be used to compute: \[ P(t_i |t_{i-1}) \]

This is the probability of a tag at position i given the tag at position i-1.

@property
def transition_counts(self) -> dict:
    """maps previous, next tags to counts"""
    if self._transition_counts is None:
        self._transition_counts = defaultdict(int)
        previous_tag = "--s--"
        for word, tag in self.corpus:
            self._transition_counts[(previous_tag, tag)] += 1
            previous_tag = tag
    return self._transition_counts

Emission Counts

The second dictionary you will compute is the emission_counts dictionary. This dictionary will be used to compute:

\[ P(w_i|t_i) \]

In other words, you will use it to compute the probability of a word given its tag.

@property
def emission_counts(self) -> dict:
    """Maps tag, word pairs to counts"""
    if self._emission_counts is None:
        self._emission_counts = Counter(
            ((tag, word) for word, tag in self.corpus)
        )
    return self._emission_counts

Tag Counts

@property
def tag_counts(self) -> dict:
    """Count of tags"""
    if self._tag_counts is None:
        self._tag_counts = Counter((tag for word, tag in self.corpus))
    return self._tag_counts

Test It Out

from neurotic.nlp.parts_of_speech.training import TheTrainer

trainer = TheTrainer(loader.processed_training)

Tag Counts

states = sorted(trainer.tag_counts.keys())
print(f"Number of POS tags (number of 'states'): {len(states)}")
print("View these POS tags (states)")
print(states)

assert len(states) == 46, len(states)
expected_states = ['#', '$', "''", '(', ')', ',', '--s--', '.', ':', 'CC', 'CD', 'DT', 'EX', 'FW', 'IN', 'JJ', 'JJR', 'JJS', 'LS', 'MD', 'NN', 'NNP', 'NNPS', 'NNS', 'PDT', 'POS', 'PRP', 'PRP$', 'RB', 'RBR', 'RBS', 'RP', 'SYM', 'TO', 'UH', 'VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ', 'WDT', 'WP', 'WP$', 'WRB', '``']
for expected, actual in zip(expected_states, states):
    assert expected == actual
Number of POS tags (number of 'states'): 46
View these POS tags (states)
['#', '$', "''", '(', ')', ',', '--s--', '.', ':', 'CC', 'CD', 'DT', 'EX', 'FW', 'IN', 'JJ', 'JJR', 'JJS', 'LS', 'MD', 'NN', 'NNP', 'NNPS', 'NNS', 'PDT', 'POS', 'PRP', 'PRP$', 'RB', 'RBR', 'RBS', 'RP', 'SYM', 'TO', 'UH', 'VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ', 'WDT', 'WP', 'WP$', 'WRB', '``']

Transition Counts

print("transition examples: ")
expected = ((('--s--', 'IN'), 5050),
            (('IN', 'DT'), 32364),
            (('DT', 'NNP'), 9044))

for index, example in enumerate(list(trainer.transition_counts.items())[:3]):
    print(example)
    assert example == expected[index]
transition examples: 
(('--s--', 'IN'), 5050)
(('IN', 'DT'), 32364)
(('DT', 'NNP'), 9044)

Emission Counts

expected = ((('DT', 'any'), 721),
            (('NN', 'decrease'), 7),
            (('NN', 'insider-trading'), 5))

print("emission examples: ")
for actual, expected in zip(list(trainer.emission_counts.items())[200:203], expected):
    print (actual)
    assert actual == expected
emission examples: 
(('DT', 'any'), 721)
(('NN', 'decrease'), 7)
(('NN', 'insider-trading'), 5)

Ambiuguous Word Emission Counts

expected = ((('RB', 'back'), 304),
            (('VB', 'back'), 20),
            (('RP', 'back'), 84),
            (('JJ', 'back'), 25),
            (('NN', 'back'), 29),
            (('VBP', 'back'), 4))

print("ambiguous word example: ")
counter = 0
for tag_word, count in trainer.emission_counts.items():
    if tag_word[1] == 'back':
        print(tag_word, count)
        assert expected[counter] == (tag_word, count)
        counter += 1
ambiguous word example: 
('RB', 'back') 304
('VB', 'back') 20
('RP', 'back') 84
('JJ', 'back') 25
('NN', 'back') 29
('VBP', 'back') 4

Parts-of-Speech Tagging: The Data

Beginning

Imports

# python
from argparse import Namespace
from collections import Counter

import os
import random
import re
import string

# from pypi
from dotenv import load_dotenv

Set Up

The Environment

load_dotenv("posts/nlp/.env")
Environment = Namespace(
    training_corpus="WALL_STREET_JOURNAL_POS",
    vocabulary="WALL_STREET_JOURNAL_VOCABULARY",
    test_corpus= "WALL_STREET_JOURNAL_TEST_POS",
    test_words="WALL_STREET_JOURNAL_TEST_WORDS",
)

Middle

The Training Corpus

The training corpus is made up of words and parts-of-speech tags from the Wall Street Journal (previously looked at in this post).

with open(os.environ[Environment.training_corpus]) as reader:
    training_corpus = reader.read().split("\n")
for row in training_corpus[:5]:
    print(f" - {row}")
- In    IN
- an    DT
- Oct.  NNP
- 19    CD
- review        NN

The Vocabulary

There is also a pre-processed version of that same file that has only the words without the parts-of-speech tags that we can load. It has been altered slightly as well:

  • Words that appear only once have been removed
  • Words that don't exist in the original have been added so that there are some unknown words to handle.
with open(os.environ[Environment.vocabulary]) as reader:
    vocabulary_words = reader.read().split("\n")

for row in vocabulary_words[:5]:
    print(f" - {row}")
- !
- #
- $
- %
- &

Odd.

for row in random.sample(vocabulary_words, 5):
    print(f" - {row}")
- cabinet
- Byrd
- done
- Fueling
- investments

Our actual vocabulary is going to be a dictionary mapping each word to its index in the list we just loaded.

vocabulary_words = sorted(vocabulary_words)
vocabulary = {key: index for index, key in enumerate(vocabulary_words)}
assert len(vocabulary) == len(vocabulary_words)
training_counter = Counter([row.split("\t")[0] for row in training_corpus])
filtered = set(key for key, value in training_counter.items() if value > 1)
extra = set(vocabulary) - filtered
print(f"{len(extra):,}")
for item in random.sample(extra, 5):
    print(f" - {item}")
9
 - --unk_adj--
 - --n--
 - --unk--
 - --unk_noun--
 - --unk_adv--

So, it looks like the "unknowns" are pre-process tags.

The Test Corpus

This is another list of words taken from the Wall Street Journal with parts-of-speech tags added that we'll use as the test-set.

with open(os.environ[Environment.test_corpus]) as reader:
    test_corpus = reader.read().split("\n")

The Test Vocabulary

There is also a set of words that we're going to try and tag. These need to be pre-processed.

Handle Empty

We are going to replace empty lines with a special token.

def handle_empty(words: list, empty_token="--n--"):
    """replace empty strings withh empty_token

    Args:
     words: list to process
     empty_token: what to replace empty strings with

    Yields:
     processed words
    """
    for word in words:
        if not word.strip():
            yield empty_token
        else:
            yield word
    return

Labeling Unknowns

  • Suffixes
    Suffixes = Namespace(
        noun = ["action", "age", "ance", "cy", "dom", "ee", "ence", "er", "hood",
                "ion", "ism", "ist", "ity", "ling", "ment", "ness", "or", "ry",
                "scape", "ship", "ty"],
        verb = ["ate", "ify", "ise", "ize"],
        adjective = ["able", "ese", "ful", "i", "ian", "ible", "ic", "ish", "ive",
                     "less", "ly", "ous"],
        adverb = ["ward", "wards", "wise"]
    )
    
  • Labels for the Unknowns
    UNKNOWN = "--unknown-{}--"
    Label = Namespace(
        digit=UNKNOWN.format("digit"),
        punctuation=UNKNOWN.format("punctuation"),
        uppercase=UNKNOWN.format("uppercase"),
        noun=UNKNOWN.format("noun"),    
        verb=UNKNOWN.format("verb"),
        adjective=UNKNOWN.format("adjective"),
        adverb=UNKNOWN.format("adverb"),
        unknown="--unknown--"
    )
    
  • Bundle Them Up
    Unknown = Namespace(
        punctuation = set(string.punctuation),
        suffix = Suffixes,
        label=Label,
        has_digit=re.compile(r"\d"),
        has_uppercase=re.compile("[A-Z]")
    )
    
    def label_unknowns(words: str, vocabulary: set) -> str:
        """
        Assign tokens to unknown words
    
        Args:
         word: word not in our vocabulary
         vocabulary: something to check if it is a known word
    
        Yields:
         word or label for the word if unknown
        """
        for word in words:
            if word in vocabulary:
                yield word
    
            elif Unknown.has_digit.search(word):
                yield Unknown.label.digit
    
            elif not Unknown.punctuation.isdisjoint(set(word)):
                yield Unknown.label.punctuation
    
            elif Unknown.has_uppercase.search(word):
                yield Unknown.label.uppercase
    
            elif any(word.endswith(suffix) for suffix in Unknown.suffix.noun):
                yield Unknown.label.noun
    
            elif any(word.endswith(suffix) for suffix in Unknown.suffix.verb):
                yield Unknown.label.verb
    
            elif any(word.endswith(suffix) for suffix in Unknown.suffix.adjective):
                yield Unknown.label.adjective
    
            elif any(word.endswith(suffix) for suffix in Unknown.suffix.adverb):
                yield Unknown.label.adverb
            else:
                yield Unknown.label.unknown
        return
    

The Pre-Processor

def preprocess(words: list, vocabulary: set) -> list:
    """Preprocess words

    Args:
     words: words to pre-process

    Returns:
     words with empty lines and unknown words labeled
    """
    processed = (word.strip() for word in words)
    processed = handle_empty(processed)
    processed = [word for word in label_unknowns(processed, vocabulary)]
    return processed

Load the Test Words

with open(os.environ[Environment.test_words]) as reader:
    test_words = reader.read().strip()
before = len(test_words)
test_words = preprocess(test_words, vocabulary)
assert len(test_words) == before

print(f"{len(test_words):,}")
for word in random.sample(test_words, 5):
    print(f" - {word}")
180,264
 - --unknown--
 - --n--
 - e
 - r
 - --unknown--

Weird that the letters "e" and "r" are known wordsā€¦

End

So, that's our data. To make it replicable for the next section I'm going to tangle it out.

The Code

Imports

# from python
from argparse import Namespace

import os
import re
import string

# pypi
import attr

The Environment

I don't really know that I should save these keys, but I don't really want to cut and paste all the timeā€¦

Environment = Namespace(
    training_corpus="WALL_STREET_JOURNAL_POS",
    vocabulary="WALL_STREET_JOURNAL_VOCABULARY",
    test_corpus= "WALL_STREET_JOURNAL_TEST_POS",
    test_words="WALL_STREET_JOURNAL_TEST_WORDS",
)

The Suffixes

Suffixes = Namespace(
    noun = ["action", "age", "ance", "cy", "dom", "ee", "ence", "er", "hood",
            "ion", "ism", "ist", "ity", "ling", "ment", "ness", "or", "ry",
            "scape", "ship", "ty"],
    verb = ["ate", "ify", "ise", "ize"],
    adjective = ["able", "ese", "ful", "i", "ian", "ible", "ic", "ish", "ive",
                 "less", "ly", "ous"],
    adverb = ["ward", "wards", "wise"]
)

The Label

UNKNOWN = "--unknown-{}--"
Label = Namespace(
    digit=UNKNOWN.format("digit"),
    punctuation=UNKNOWN.format("punctuation"),
    uppercase=UNKNOWN.format("uppercase"),
    noun=UNKNOWN.format("noun"),    
    verb=UNKNOWN.format("verb"),
    adjective=UNKNOWN.format("adjective"),
    adverb=UNKNOWN.format("adverb"),
    unknown="--unknown--",
 )

Theirs To Ours

The vocabulary file has their unknown tags alreads inserted which don't matnch mine so, rather than redoing everything I'm going to translate theirs to match mine.

THEIR_UNKNOWN = "--unk_{}--"

THEIRS_TO_MINE = {
    "--unk--": "--unknown--",
    THEIR_UNKNOWN.format("digit"): Label.digit,
    THEIR_UNKNOWN.format("punct"): Label.punctuation,
    THEIR_UNKNOWN.format("upper"): Label.uppercase,
    THEIR_UNKNOWN.format("noun"): Label.noun,
    THEIR_UNKNOWN.format("verb"): Label.verb,
    THEIR_UNKNOWN.format("adj"): Label.adjective,
    THEIR_UNKNOWN.format("adv"): Label.adverb,
}

The Unknown

Unknown = Namespace(
    punctuation = set(string.punctuation),
    suffix = Suffixes,
    label=Label,
    has_digit=re.compile(r"\d"),
    has_uppercase=re.compile("[A-Z]")
)

The Empty

Empty = Namespace(
    word="--n--",
    tag="--s--",
)

The Corpus Pre-Processor

@attr.s(auto_attribs=True)
class CorpusProcessor:
    """Pre-processes the corpus

    Args:
     vocabulary: holder of our known words
    """
    vocabulary: dict
  • Split Tuples
    def split_tuples(self, lines: list):
        """Generates tuples
    
        Args:
         lines: iterable of lines from the corpus
    
        Yields:
         whitespace split of line
        """
        for line in lines:
            yield line.split()
        return
    
  • Handle Empty Lines
    def handle_empty(self, tuples: list):
        """checks for empty strings
    
        Args:
         tuples: tuples of corpus lines
    
        Yields:
         line with empty string marked
        """
        for line in tuples:
            if not line:
                yield Empty.word, Empty.tag
            else:
                yield line
        return
    
  • Handle Unknowns
    def label_unknowns(self, tuples: list) -> str:
        """
        Assign tokens to unknown words
    
        Args:
         tuples: word, tag tuples
    
        Yields:
         (word (or label for the word if unknown), tag) tuple
        """
        for word, tag in tuples:
            if word in self.vocabulary:
                yield word, tag
    
            elif Unknown.has_digit.search(word):
                yield Unknown.label.digit, tag
    
            elif not Unknown.punctuation.isdisjoint(set(word)):
                yield Unknown.label.punctuation, tag
    
            elif Unknown.has_uppercase.search(word):
                yield Unknown.label.uppercase, tag
    
            elif any(word.endswith(suffix) for suffix in Unknown.suffix.noun):
                yield Unknown.label.noun, tag
    
            elif any(word.endswith(suffix) for suffix in Unknown.suffix.verb):
                yield Unknown.label.verb, tag
    
            elif any(word.endswith(suffix) for suffix in Unknown.suffix.adjective):
                yield Unknown.label.adjective, tag
    
            elif any(word.endswith(suffix) for suffix in Unknown.suffix.adverb):
                yield Unknown.label.adverb, tag
            else:
                yield Unknown.label.unknown, tag
        return
    
  • The Call
    def __call__(self, tuples: list) -> list:
        """preprocesses the words and tags
    
        Args:
         tuples: list of words and tags to process
    
        Returns:
         preprocessed version of words, tags
        """
        processed = self.split_tuples(tuples)
        processed = self.handle_empty(processed)
        processed = [(word, tag) for (word, tag) in self.label_unknowns(processed)]
        return processed
    

The Data Pre-Processor

@attr.s(auto_attribs=True)
class DataPreprocessor:
    """A pre-processor for the data

    Args:
     vocabulary: holder of our known words
     empty_token: what to use if a line is an empty string
    """
    vocabulary: dict
  • Handle Empty Lines
    def handle_empty(self, words: list):
        """replace empty strings withh empty_token
    
        Args:
         words: list to process
         empty_token: what to replace empty strings with
    
        Yields:
         processed words
        """
        for word in words:
            if not word.strip():
                yield Empty.word
            else:
                yield word
        return
    
  • Label Unknowns
    def label_unknowns(self, words: list) -> str:
        """
        Assign tokens to unknown words
    
        Args:
         words: iterable of words to check
    
        Yields:
         word or label for the word if unknown
        """
        for word in words:
            if word in self.vocabulary:
                yield word
    
            elif Unknown.has_digit.search(word):
                yield Unknown.label.digit
    
            elif not Unknown.punctuation.isdisjoint(set(word)):
                yield Unknown.label.punctuation
    
            elif Unknown.has_uppercase.search(word):
                yield Unknown.label.uppercase
    
            elif any(word.endswith(suffix) for suffix in Unknown.suffix.noun):
                yield Unknown.label.noun
    
            elif any(word.endswith(suffix) for suffix in Unknown.suffix.verb):
                yield Unknown.label.verb
    
            elif any(word.endswith(suffix) for suffix in Unknown.suffix.adjective):
                yield Unknown.label.adjective
    
            elif any(word.endswith(suffix) for suffix in Unknown.suffix.adverb):
                yield Unknown.label.adverb
            else:
                yield Unknown.label.unknown
        return
    
  • The Call
    def __call__(self, words: list) -> list:
        """preprocesses the words
    
        Args:
         words: list of words to process
    
        Returns:
         preprocessed version of words
        """
        processed = (word.strip() for word in words)
        processed = self.handle_empty(processed)
        processed = [word for word in self.label_unknowns(processed)]
        return processed
    

The Data Loader

@attr.s(auto_attribs=True)
class DataLoader:
    """Loads the training and test data

    Args:
     environment: namespace with keys for the environment to load paths
    """
    environment: Namespace=Environment
    _preprocess: DataPreprocessor=None
    _vocabulary_words: list=None
    _vocabulary: dict=None
    _training_corpus: list=None
    _training_data: dict=None
    _processed_training: list=None
    _test_data_raw: list=None
    _test_data_tuples: list=None
    _test_data: dict=None
    _test_words: list=None

The Preprocessor

@property
def preprocess(self) -> DataPreprocessor:
    """The Preprocessor for the data"""
    if self._preprocess is None:
        self._preprocess = DataPreprocessor(self.vocabulary)
    return self._preprocess

The Vocabulary Words

@property
def vocabulary_words(self) -> list:
    """The list of vocabulary words for training

    Note:
     This is ``hmm_vocab.txt``
    """
    if self._vocabulary_words is None:
        words = self.load(os.environ[self.environment.vocabulary])
        self._vocabulary_words = [THEIRS_TO_MINE.get(word, word) for word in words]
    return self._vocabulary_words

The Vocabulary

@property
def vocabulary(self) -> dict:
    """Converts the vocabulary list of words to a dict

    Returns:
     word : index of word in vocabulary words> dictionary
    """
    if self._vocabulary is None:
        self._vocabulary = {
            word: index
            for index, word in enumerate(sorted(self.vocabulary_words))}
    return self._vocabulary

The Training Corpus

@property
def training_corpus(self) -> list:
    """The corpus for training

    Note:
     ``WSJ_02_20.pos`` lines (<word><tab><pos> all as one string)
    """
    if self._training_corpus is None:
        self._training_corpus = self.load(os.environ[self.environment.training_corpus])
    return self._training_corpus

The Training Data

@property
def training_data(self) -> dict:
    """The word-tag training data

    Note:
     this is the ``training_corpus`` converted to a <word>:<pos> dictionary
    """
    if self._training_data is None:
        words_tags = (line.split() for line in self.training_corpus)
        words_tags = (tokens for tokens in words_tags if len(tokens) == 2)        
        self._training_data = {word: tag for word, tag in words_tags}
    return self._training_data

Processed Training

@property
def processed_training(self) -> list:
    """Pre-processes the training corpus

    Note:
     ``training_corpus`` converted to (word, tag) tuples with unknown tags added
    """
    if self._processed_training is None:
        processor = CorpusProcessor(self.vocabulary)
        self._processed_training = processor(self.training_corpus)
    return self._processed_training

The Raw Test Corpus

@property
def test_data_raw(self) -> list:
    """The lines for testing

    Note:
     The assignment expects the lines to be un-processed so this is just the
    raw lines from ``WSJ_24.pos``
    """
    if self._test_data_raw is None:
        self._test_data_raw = self.load(os.environ[self.environment.test_corpus])
    return self._test_data_raw

The Raw Test Tuples

@property
def test_data_tuples(self) -> list:
    """The test data lines split into tuples

    Note:
     this is the test_data_raw with the lines split
    """
    if self._test_data_tuples is None:
        self._test_data_tuples = [line.split() for line in self.test_data_raw]
    return self._test_data_tuples

Test Words

@property
def test_words(self) -> list:
    """The pre-processed test words

    Note:
     ``test.words`` with some pre-processing done
    """
    if self._test_words is None:
        self._test_words = self.load(os.environ[self.environment.test_words])
        self._test_words = self.preprocess(self._test_words)
    return self._test_words

Load Method

def load(self, path: str, keep_empty: bool=True) -> list:
    """Loads the strings from the file

    Args:
     path: path to the text file
     keep_empty: keep empty lines if true

    Returns:
     list of lines from the file
    """
    with open(path) as reader:
        lines = [line for line in reader.read().split("\n") if keep_empty or line]
    return lines

Test it Out

from neurotic.nlp.parts_of_speech.preprocessing import Environment, DataLoader

loader = DataLoader()
for theirs, mine in zip(vocabulary_words, loader.vocabulary_words):
    assert theirs == mine
for theirs, mine in zip(vocabulary, loader.vocabulary):
    assert vocabulary[theirs] == loader.vocabulary[mine]
loader = DataLoader(Environment)

print(f"{len(loader.vocabulary_words):,}")
print(random.sample(loader.vocabulary_words, 2))
assert len(loader.vocabulary_words) == len(vocabulary_words)

print(f"\n{len(loader.training_corpus):,}")
assert (len(loader.training_corpus)) == len(training_corpus)
print(random.sample(loader.training_corpus, 2))

print(f"\n{len(loader.vocabulary):,}")
assert len(loader.vocabulary) == len(vocabulary)

print(f"\n{len(loader.test_corpus):,}")
assert len(loader.test_corpus) == len(test_corpus)
print(random.sample(loader.test_corpus, 2))


print(f"\n{len(loader.test_words):,}")
print(random.sample(loader.test_words, 2))
23,777
['Island', 'Gibbons']

989,861
['nine\tCD', 'in\tIN']

23,777

34,200
['at\tIN', 'to\tTO']

34,200
['the', ';']

Re-Test

I'm trying to troubleshoot differences between my output and the original notebook.

punct = set(string.punctuation)

noun_suffix = ["action", "age", "ance", "cy", "dom", "ee", "ence", "er", "hood", "ion", "ism", "ist", "ity", "ling", "ment", "ness", "or", "ry", "scape", "ship", "ty"]
verb_suffix = ["ate", "ify", "ise", "ize"]
adj_suffix = ["able", "ese", "ful", "i", "ian", "ible", "ic", "ish", "ive", "less", "ly", "ous"]
adv_suffix = ["ward", "wards", "wise"]
def preprocess(vocab, data_fp):
    """
    Preprocess data
    """
    orig = []
    prep = []

    # Read data
    with open(data_fp, "r") as data_file:

        for cnt, word in enumerate(data_file):

            # End of sentence
            if not word.split():
                orig.append(word.strip())
                word = "--n--"
                prep.append(word)
                continue

            # Handle unknown words
            elif word.strip() not in vocab:
                orig.append(word.strip())
                # this seems like a bug
                word = assign_unk(word)

                # this makes it work better, but isn't in their code
                # word = assign_unk(word.strip())
                prep.append(word)
                continue

            else:
                orig.append(word.strip())
                prep.append(word.strip())

    assert(len(orig) == len(open(data_fp, "r").readlines()))
    assert(len(prep) == len(open(data_fp, "r").readlines()))

    return orig, prep
def assign_unk(tok):
    """
    Assign unknown word tokens
    """
    # Digits
    if any(char.isdigit() for char in tok):
        return "--unk_digit--"

    # Punctuation
    elif any(char in punct for char in tok):
        return "--unk_punct--"

    # Upper-case
    elif any(char.isupper() for char in tok):
        return "--unk_upper--"

    # Nouns
    elif any(tok.endswith(suffix) for suffix in noun_suffix):
        return "--unk_noun--"

    # Verbs
    elif any(tok.endswith(suffix) for suffix in verb_suffix):
        return "--unk_verb--"

    # Adjectives
    elif any(tok.endswith(suffix) for suffix in adj_suffix):
        return "--unk_adj--"

    # Adverbs
    elif any(tok.endswith(suffix) for suffix in adv_suffix):
        return "--unk_adv--"

    return "--unk--"
loader = DataLoader(Environment)

The Vocabulary

UNKNOWN = "--unknown-{}--"
THEIR_UNKNOWN = "--unk_{}--"

THEIRS_TO_MINE = {
    "--unk--": "--unknown--",
    THEIR_UNKNOWN.format("digit"):UNKNOWN.format("digit"),
    THEIR_UNKNOWN.format("punct"):UNKNOWN.format("punctuation"),
    THEIR_UNKNOWN.format("upper"):UNKNOWN.format("uppercase"),
    THEIR_UNKNOWN.format("noun"):UNKNOWN.format("noun"),    
    THEIR_UNKNOWN.format("verb"):UNKNOWN.format("verb"),
    THEIR_UNKNOWN.format("adj"): UNKNOWN.format("adjective"),
    THEIR_UNKNOWN.format("adv"):UNKNOWN.format("adverb"),    
}

with open(os.environ[Environment.vocabulary]) as reader:
    voc_l = reader.read().split("\n")

index = 0
for theirs, mine in zip(voc_l, loader.vocabulary_words):
    assert THEIRS_TO_MINE.get(theirs, theirs)==mine, (index, theirs, mine)
    index += 1

Vocabulary Word

They include empty strings so the dictionaries won't match exactly.

vocab = {}
for index, word in enumerate(sorted(voc_l)):
    vocab[word] = index

for word in vocab:
    assert THEIRS_TO_MINE.get(word, word) in loader.vocabulary, (word)

The Preprocessed Test Words

original, prep = preprocess(vocab, os.environ[Environment.test_words])

index = 0
for theirs, mine in zip(prep, loader.test_words):
    if THEIRS_TO_MINE.get(theirs, theirs) != mine:
        token = original[index]
        print(index, theirs, mine, token, assign_unk(token))
    index += 1
9 --unk-- --unknown-noun-- vantage --unk_noun--
228 --unk-- --unknown-verb-- exacerbate --unk_verb--
845 --unk-- --unknown-adjective-- relentless --unk_adj--
896 --unk-- --unknown-adjective-- slickly --unk_adj--
906 --unk-- --unknown-adjective-- cognoscenti --unk_adj--
919 --unk-- --unknown-noun-- depiction --unk_noun--
1076 --unk-- --unknown-adjective-- perfidious --unk_adj--
1160 --unk-- --unknown-adjective-- innocently --unk_adj--
1196 --unk-- --unknown-adjective-- loutish --unk_adj--
1244 --unk-- --unknown-verb-- premise --unk_verb--
1268 --unk-- --unknown-adjective-- conspicuously --unk_adj--
1331 --unk-- --unknown-noun-- ignition --unk_noun--
1347 --unk-- --unknown-noun-- obligatory --unk_noun--
1448 --unk-- --unknown-adjective-- lavishly --unk_adj--
1468 --unk-- --unknown-adjective-- palatable --unk_adj--
1564 --unk-- --unknown-adjective-- inconsiderable --unk_adj--
1678 --unk-- --unknown-adjective-- discreetly --unk_adj--
1774 --unk-- --unknown-adjective-- marvelously --unk_adj--
1855 --unk-- --unknown-adjective-- passable --unk_adj--
1970 --unk-- --unknown-noun-- diaper --unk_noun--
3248 --unk-- --unknown-adjective-- comparably --unk_adj--
3672 --unk-- --unknown-noun-- imperialism --unk_noun--
3791 --unk-- --unknown-noun-- livelier --unk_noun--
3793 --unk-- --unknown-noun-- bolder --unk_noun--
4132 --unk-- --unknown-noun-- intimidation --unk_noun--
4648 --unk-- --unknown-adjective-- politic --unk_adj--
4661 --unk-- --unknown-verb-- mutate --unk_verb--
4682 --unk-- --unknown-noun-- aspersion --unk_noun--
4777 --unk-- --unknown-adjective-- utopian --unk_adj--
4802 --unk-- --unknown-adjective-- psychic --unk_adj--
4892 --unk-- --unknown-noun-- coercion --unk_noun--
5043 --unk-- --unknown-noun-- centrist --unk_noun--
5090 --unk-- --unknown-noun-- schooling --unk_noun--
5131 --unk-- --unknown-noun-- voucher --unk_noun--
5270 --unk-- --unknown-noun-- recruitment --unk_noun--
5316 --unk-- --unknown-noun-- expansionism --unk_noun--
5429 --unk-- --unknown-verb-- palate --unk_verb--
5440 --unk-- --unknown-noun-- engorgement --unk_noun--
5557 --unk-- --unknown-noun-- patronage --unk_noun--
5912 --unk-- --unknown-noun-- volunteerism --unk_noun--
5990 --unk-- --unknown-adjective-- utopian --unk_adj--
6709 --unk-- --unknown-noun-- citizenship --unk_noun--
6738 --unk-- --unknown-noun-- abomination --unk_noun--
6793 --unk-- --unknown-adjective-- reflexively --unk_adj--
6809 --unk-- --unknown-noun-- regimentation --unk_noun--
6876 --unk-- --unknown-noun-- compulsory --unk_noun--
7150 --unk-- --unknown-adjective-- arguably --unk_adj--
7192 --unk-- --unknown-noun-- compulsion --unk_noun--
7208 --unk-- --unknown-adjective-- unenforceable --unk_adj--
7374 --unk-- --unknown-noun-- recruitment --unk_noun--
7468 --unk-- --unknown-adjective-- challengeable --unk_adj--
7591 --unk-- --unknown-adjective-- unprecedentedly --unk_adj--
7657 --unk-- --unknown-adjective-- cooperatively --unk_adj--
8182 --unk-- --unknown-noun-- reticence --unk_noun--
8772 --unk-- --unknown-noun-- render --unk_noun--
11071 --unk-- --unknown-noun-- breather --unk_noun--
11942 --unk-- --unknown-verb-- unify --unk_verb--
12859 --unk-- --unknown-verb-- frigate --unk_verb--
12907 --unk-- --unknown-noun-- disarmament --unk_noun--
12936 --unk-- --unknown-noun-- affinity --unk_noun--
13240 --unk-- --unknown-verb-- patriarchate --unk_verb--
13438 --unk-- --unknown-noun-- underselling --unk_noun--
13650 --unk-- --unknown-adjective-- wrathful --unk_adj--
13725 --unk-- --unknown-noun-- gist --unk_noun--
14081 --unk-- --unknown-noun-- segmentation --unk_noun--
14110 --unk-- --unknown-adjective-- brightly --unk_adj--
15331 --unk-- --unknown-noun-- connector --unk_noun--
17846 --unk-- --unknown-noun-- readjustment --unk_noun--
18397 --unk-- --unknown-noun-- observation --unk_noun--
18410 --unk-- --unknown-noun-- ticker --unk_noun--
18566 --unk-- --unknown-noun-- trickery --unk_noun--
18750 --unk-- --unknown-adjective-- supersonic --unk_adj--
18854 --unk-- --unknown-noun-- existance --unk_noun--
18885 --unk-- --unknown-adjective-- thankfully --unk_adj--
19011 --unk-- --unknown-noun-- coherence --unk_noun--
19304 --unk-- --unknown-adjective-- autocratic --unk_adj--
19305 --unk-- --unknown-noun-- pseudosocialism --unk_noun--
19326 --unk-- --unknown-noun-- encircling --unk_noun--
19389 --unk-- --unknown-noun-- miscegenation --unk_noun--
19470 --unk-- --unknown-adjective-- ostensible --unk_adj--
19479 --unk-- --unknown-adjective-- purportedly --unk_adj--
19554 --unk-- --unknown-noun-- accuser --unk_noun--
19568 --unk-- --unknown-noun-- embezzler --unk_noun--
19961 --unk-- --unknown-adjective-- unwittingly --unk_adj--
19970 --unk-- --unknown-noun-- platter --unk_noun--
20129 --unk-- --unknown-noun-- centrist --unk_noun--
20384 --unk-- --unknown-adjective-- improbable --unk_adj--
20472 --unk-- --unknown-adjective-- glaringly --unk_adj--
20493 --unk-- --unknown-noun-- clarity --unk_noun--
20496 --unk-- --unknown-noun-- rectification --unk_noun--
20539 --unk-- --unknown-noun-- wiser --unk_noun--
21215 --unk-- --unknown-adjective-- concurrently --unk_adj--
21310 --unk-- --unknown-verb-- revolutionize --unk_verb--
22257 --unk-- --unknown-noun-- compensatory --unk_noun--
22270 --unk-- --unknown-noun-- respiratory --unk_noun--
23603 --unk-- --unknown-noun-- milion --unk_noun--
23928 --unk-- --unknown-noun-- poultry --unk_noun--
23947 --unk-- --unknown-noun-- cessation --unk_noun--
24005 --unk-- --unknown-noun-- mothballing --unk_noun--
24168 --unk-- --unknown-noun-- synthesizer --unk_noun--
24195 --unk-- --unknown-adjective-- distinctly --unk_adj--
24280 --unk-- --unknown-noun-- synthesizer --unk_noun--
24293 --unk-- --unknown-adjective-- robotic --unk_adj--
24294 --unk-- --unknown-noun-- flatness --unk_noun--
24861 --unk-- --unknown-noun-- carnage --unk_noun--
25248 --unk-- --unknown-adjective-- frantic --unk_adj--
25252 --unk-- --unknown-noun-- hammer --unk_noun--
26001 --unk-- --unknown-noun-- whisper --unk_noun--
27468 --unk-- --unknown-noun-- prophecy --unk_noun--
27538 --unk-- --unknown-noun-- encircling --unk_noun--
27716 --unk-- --unknown-verb-- realestate --unk_verb--
27994 --unk-- --unknown-noun-- insolvency --unk_noun--
28110 --unk-- --unknown-adjective-- forceful --unk_adj--
28195 --unk-- --unknown-adjective-- zealous --unk_adj--
28485 --unk-- --unknown-adjective-- belatedly --unk_adj--
29370 --unk-- --unknown-verb-- duplicate --unk_verb--
29902 --unk-- --unknown-adjective-- reptilian --unk_adj--
30078 --unk-- --unknown-noun-- industrialization --unk_noun--
31292 --unk-- --unknown-adjective-- vociferous --unk_adj--
31303 --unk-- --unknown-adjective-- powerless --unk_adj--
31549 --unk-- --unknown-noun-- provocation --unk_noun--
31648 --unk-- --unknown-adjective-- archaic --unk_adj--
31700 --unk-- --unknown-noun-- lament --unk_noun--
32149 --unk-- --unknown-adjective-- tantalizingly --unk_adj--
32393 --unk-- --unknown-noun-- hammer --unk_noun--
33208 --unk-- --unknown-adjective-- defiantly --unk_adj--
33238 --unk-- --unknown-noun-- rickety --unk_noun--
33280 --unk-- --unknown-noun-- unionist --unk_noun--
33336 --unk-- --unknown-noun-- dapper --unk_noun--
33721 --unk-- --unknown-adjective-- repressive --unk_adj--
33832 --unk-- --unknown-noun-- disillusionment --unk_noun--
33861 --unk-- --unknown-noun-- agitation --unk_noun--
for word in (line for line in loader.vocabulary_words if line.startswith("--u")):
    print(word)
--unknown--
--unknown-adjective--
--unknown-adverb--
--unknown-digit--
--unknown-noun--
--unknown-punctuation--
--unknown-uppercase--
--unknown-verb--

Parts-of-Speech Tagging: Numpy

Beginning

Imports

# python
from functools import partial
from itertools import product

import math
# pypi
from tabulate import tabulate

import numpy
import pandas

Set Up

The Parts-of-Speech Decoder

URL = "https://www.ling.upenn.edu/courses/Fall_2003/ling001/penn_treebank_pos.html"
data = pandas.read_html(URL, header=0)[0]

TRANSLATOR = {row.Tag:row.Description for row in data.itertuples()}

Tabulate

TABLE = partial(tabulate, tablefmt="orgtbl", headers="keys")

Middle

The Tags

We're only going to use three tags.

tags = ['RB', 'NN', 'TO']
for tag in tags:
    print(f" - {tag} ({TRANSLATOR[tag]})")
  • RB (Adverb)
  • NN (Noun, singular or mass)
  • TO (to)

Start with a Dictionary

  • transition_counts is a dictionary with (previous tag, this tag) tuples as keys and the number of times these tags appeared together as the values.
transition_counts = {
    ('NN', 'NN'): 16241,
    ('RB', 'RB'): 2263,
    ('TO', 'TO'): 2,
    ('NN', 'TO'): 5256,
    ('RB', 'TO'): 855,
    ('TO', 'NN'): 734,
    ('NN', 'RB'): 2431,
    ('RB', 'NN'): 358,
    ('TO', 'RB'): 200
}

We're going to need the individual tags later on.

tags = list(zip(*transition_counts))
tags = sorted(set(tags[0] + tags[1]))

I don't know what the source is, presumably the Wall Street Journal file that we used in the previous exercise.

A Transition Matrix

We're going to make a transition matrix for the transition_counts keys.

tag_count = len(tags)

transition_matrix = numpy.zeros((tag_count, tag_count), dtype=int)

for row, column in product(range(tag_count), range(tag_count)):
        transition_matrix[row, column] = transition_counts[
            (tags[row],
             tags[column])
        ]

transitions = pandas.DataFrame(transition_matrix, index=tags, columns=tags)
print(TABLE(transitions))
  NN RB TO
NN 16241 2431 5256
RB 358 2263 855
TO 734 200 2

Normalization

We're going to normalize each row so that each value is equal to \(\frac{value}{\textit{sum of row}}\).

row_sums = transitions.sum(axis="rows")
normalized = transitions/row_sums
print(TABLE(normalized))
  NN RB TO
NN 0.936999 0.496731 0.859807
RB 0.0206542 0.462403 0.139866
TO 0.042347 0.0408664 0.000327172

:END:

Log Sum

Now we'll add the log of the sum of the current row to the current value along the diagonal.

diagonal = numpy.diagonal(transitions)
diagonal = diagonal + numpy.log(diagonal)
values = transitions.values.astype("float64")
row, column = numpy.diag_indices_from(values)
values[row, column] = diagonal

diagonalized = pandas.DataFrame(values, index=tags, columns=tags)
print(TABLE(diagonalized))
  NN RB TO
NN 16277.7 2431 5256
RB 358 2291.73 855
TO 734 200 2.69315

Brute Force Check

rows, columns = numpy.diag_indices_from(transitions.values)
indices = set(zip(rows, columns))
for row, column in product(range(len(tags)),
                           range(len(tags))):
    expected = transitions.iloc[row, column]
    if (row, column) in indices:
        expected += numpy.log(transitions.iloc[row, column])
    actual = diagonalized.iloc[row, column]
    assert math.isclose(expected, actual), f"({row, column}) expected: {expected}, actual: {actual} {expected - actual}"

Parts-of-Speech Tagging: Creating a Vocabulary

Beginning

Imports

# python
from argparse import Namespace
from collections import Counter
from functools import partial

import os
import random
import re
import string

# from pypi
from dotenv import load_dotenv
import pandas

Set Up

load_dotenv("posts/nlp/.env")

Middle

Reading Text Data

We're going to start with a pre-tagged dataset taken from the Wall Street Journal.

path = os.environ["WALL_STREET_JOURNAL_POS"]
with open(path) as reader:
    lines = reader.read().split("\n")

Here's what the head of the file looks like.

for line in lines[:5]:
    print(line)
In      IN
an      DT
Oct.    NNP
19      CD
review  NN

It's a two-column (tab-separated) file with no header, but we're told that the first column is the word being tagged for its part-of-speech and the second column is the tag itself.

Word Counts

Here we'll count the number of times a word appears in our data set and filter out words that only appear once.

words = [line.split("\t")[0] for line in lines]
print(f"Pre-Filtered word count: {len(words):,}")
counts = Counter(words)
words = [key for key, value in counts.items() if value > 1]
print(f"Filtered Word Count: {len(words):,}")
Pre-Filtered word count: 989,861
Filtered Word Count: 23,768

Just a quick check to make sure the counts are right.

grab_count = lambda pair: pair[1]
COUNT = 1

remaining = len(words)
kept = counts.most_common(remaining)
assert min(kept, key=grab_count)[COUNT] > 1

rejected = counts.most_common()[remaining:]
assert max(rejected, key=grab_count)[COUNT] < 2

Now, a sorted version.

words = sorted(words)
assert type(words) is list

And a peek at some of the values.

for word in random.sample(words, 5):
    print(word)
shifts
solvency
downbeat
reassurance
UFOs

Known Unknowns

We have a labeled vocabulary, but any new documents we encounter might have words that aren't in our vocabulary, in case we will label them as "unknown", but there are some unknowns that we can classify based on certain conditions (like their suffix).

Known Stuff

  • Suffixes

    #+begin_src python :results none Suffixes = Namespace( noun = ["action", "age", "ance", "cy", "dom", "ee", "ence", "er", "hood", "ion", "ism", "ist", "ity", "ling", "ment", "ness", "or", "ry", "scape", "ship", "ty"], verb = ["ate", "ify", "ise", "ize"], adjective = ["able", "ese", "ful", "i", "ian", "ible", "ic", "ish", "ive", "less", "ly", "ous"], adverb = ["ward", "wards", "wise"] )

  • Labels for the Unknowns
    UNKNOWN = "--unknown-{}--"
    Label = Namespace(
        digit=UNKNOWN.format("digit"),
        punctuation=UNKNOWN.format("punctuation"),
        uppercase=UNKNOWN.format("uppercase"),
        noun=UNKNOWN.format("noun"),    
        verb=UNKNOWN.format("verb"),
        adjective=UNKNOWN.format("adjective"),
        adverb=UNKNOWN.format("adverb"),
        unknown="--unknown--"
    )
    
  • Bundle Them Up
    Unknown = Namespace(
        punctuation = set(string.punctuation),
        suffix = Suffixes,
        label=Label,
        has_digit=re.compile(r"\d"),
        has_uppercase=re.compile("[A-Z]")
    )
    

Label the Unknowns

def label_unknown(word: str) -> str:
    """
    Assign tokens to unknown words

    Args:
     word: word not in our vocabulary

    Returns:
     label for the word
    """
    if Unknown.has_digit.search(word):
        return Unknown.label.digit

    if not Unknown.punctuation.isdisjoint(set(word)):
        return Unknown.label.punctuation

    if Unknown.has_uppercase.search(word):
        return Unknown.label.uppercase

    if any(word.endswith(suffix) for suffix in Unknown.suffix.noun):
        return Unknown.label.noun

    if any(word.endswith(suffix) for suffix in Unknown.suffix.verb):
        return Unknown.label.verb

    if any(word.endswith(suffix) for suffix in Unknown.suffix.adjective):
        return Unknown.label.adjective

    if any(word.endswith(suffix) for suffix in Unknown.suffix.adverb):
        return Unknown.label.adverb

    return Unknown.label.unknown
print(f"{label_unknown('cow2pig')}")
print(label_unknown("cow,pig"))
print(label_unknown("cowPig"))
print(label_unknown(f"cowpig{random.choice(Unknown.suffix.noun)}"))
print(label_unknown(f"cowpig{random.choice(Unknown.suffix.verb)}"))
print(label_unknown(f"cowpig{random.choice(Unknown.suffix.adjective)}"))
print(label_unknown(f"cowpig{random.choice(Unknown.suffix.adverb)}"))
print(label_unknown("cowdog"))
--unknown-digit--
--unknown-punctuation--
--unknown-uppercase--
--unknown-noun--
--unknown-verb--
--unknown-adjective--
--unknown-adverb--
--unknown--

Getting Tags

I don't know what the Coursera example is for - they check to see if an already tagged word is in our vocabulary and then clobber the word with an unknown tag if it isn't and return the original tag. There must be a reason for this, but it isn't explained in the notebook so I'm going to do something different. I'm going to assume that the word isn't tagged and we want to tag it.

POS Tag Interpreter

The notebook doesn't say whose tagging system is being used so I'm going to assume that it's the Penn Treebank P.O.S. system. I'll make an interpreter for the tags, since I have no idea what some of them mean.

URL = "https://www.ling.upenn.edu/courses/Fall_2003/ling001/penn_treebank_pos.html"
data = pandas.read_html(URL, header=0)[0]

TRANSLATOR = {row.Tag:row.Description for row in data.itertuples()}
cleaned = (line for line in lines if line)
pairs = (line.split("\t") for line in cleaned)
VOCABULARY = {key:value for key, value in pairs} 
EMPTY_LINE = "--n--"
TAG_FOR_EMPTY_LINE = "--s--"
DESCRIPTION_FOR_EMPTY_LINE = "--d--"
def tag_word(word: str, vocabulary: set, translator: dict) -> tuple:
    """gets the part-of-speech tag for the word

    Args:
     word: the word to tag
     vocabulary: word to tag dictionary
     translator: part of speech tag description

    Returns:
     word, part-of-speech tag, description
    """
    if not word:
        return EMPTY_LINE, TAG_FOR_EMPTY_LINE, DESCRIPTION_FOR_EMPTY_LINE
    if word not in vocabulary:
        return word, label_unknown(word), Unknown.label.unknown
    return word, vocabulary[word], translator.get(vocabulary[word], Unknown.label.unknown)

tagger = partial(tag_word, vocabulary=VOCABULARY, translator=TRANSLATOR)

Special Character

print(tagger("\n"))
('\n', '--unknown--', '--unknown--')

Empty String

print(tagger(""))
('--n--', '--s--', '--d--')

Known Preposition

print(tagger("In"))
('In', 'IN', 'Preposition or subordinating conjunction')

Nouns

Noun

print(tagger("bicycle"))
('bicycle', 'NN', 'Noun, singular or mass')
print(tagger("flatulence"))
('flatulence', '--unknown-noun--', '--unknown--')

Unknown Unknown

print(tagger("tardigrade"))
('tardigrade', '--unknown--', '--unknown--')

Verbs

print(tagger("scrutinize"))
('scrutinize', 'VB', 'Verb, base form')
print(tagger("euthanize"))
('euthanize', '--unknown-verb--', '--unknown--')

Adjectives

print(tagger("venerable"))
('venerable', 'JJ', 'Adjective')
print(tagger("malodorous"))
('malodorous', '--unknown-adjective--', '--unknown--')

Adverbs

print(tagger("backwards"))
('backwards', 'RB', 'Adverb')
print(tagger("bitwise"))
('bitwise', '--unknown-verb--', '--unknown--')

End

So, there you have it, a rudimentary way to handle tagging parts of speech for words outside of our vocabulary.

Autocorrect: Minimum Edit Distance Backtrace

Beginning

This is the last post in a series about Autocorrect that's started in this post. In the previous post we implemented some code to find the minimum edit distance between two strings using Dynamic Programming. Now we'll implement the Backtrace Algorithim to find the shortest path through to transform one string to another.

Imports

# python
from argparse import Namespace
from functools import partial
from string import ascii_uppercase

import random

# pypi
from nltk.metrics.distance import edit_distance_align
from tabulate import tabulate

import holoviews
import hvplot.pandas
import matplotlib.pyplot as pyplot
import numpy
import pandas
import seaborn

# this repository
from neurotic.nlp.autocorrect.distance import MinimumEdits

# my other stuff
from graeae import EmbedHoloviews

Set Up

Plotting

This sets up some convenience code for plotting.

SLUG = "autocorrect-minimum-edit-distance-backtrace"
FOLDER = f"files/posts/nlp/{SLUG}/" 
Embed = partial(EmbedHoloviews, folder_path=FOLDER)

Plot = Namespace(
    width=990,
    height=780,
    fontscale=2,
    tan="#ddb377",
    blue="#4687b7",
    red="#ce7b6d",
    color_map="Plasma",
    path_color_map="blues",
)
get_ipython().run_line_magic('matplotlib', 'inline')
get_ipython().run_line_magic('config', "InlineBackend.figure_format = 'retina'")
seaborn.set(style="whitegrid",
            rc={"axes.grid": False,
                "font.family": ["sans-serif"],
                "font.sans-serif": ["Open Sans", "Latin Modern Sans", "Lato"],
                "figure.figsize": (8, 6)},
            font_scale=1)
FIGUE_SIZE = (12, 10)

Tabulate

This sets up a pretty-printer for path-tables.

PATH = partial(tabulate, tablefmt="orgtbl", headers=["row", "column"])

Middle

Backtrace

The backtrace algorithm traces a path through a minimum-edit-distance table to help us to optimally align substrings. How? Let's break that question up. First, how do you do it? We'll just use a greedy search that minimizes the cost. Starting at the last cell in the table (the minimum edit distance cell) we look at the cells directly above, directly to the left, and directly diagonal to the cell and move to the one of those three that has the lowest cost. We keep doing this until we reach the origin (0,0) cell and then we reverse the order of the cells we visited.

def backtrace(distances: numpy.ndarray) -> list:
    """finds the path for string alignment using backtrace

    Args:
     distances: array of mimimum edit distances
    """
    # start at the bottom right cell
    current_row, current_column = len(distances) - 1, len(distances[0]) - 1
    path = [(current_row, current_column)]
    while (current_row, current_column) != (0, 0):
        one_row_back = current_row - 1
        one_column_up = current_column - 1
        edits = (
            # insert
            (distances[one_row_back, current_column], (one_row_back, current_column)),
            # delete
            (distances[current_row, one_column_up], (current_row, one_column_up)),
            # substitute
            (distances[one_row_back, one_column_up], (one_row_back, one_column_up))
        )
        minimum_edit_distance, cell_coordinates = min(edits)
        path.append(cell_coordinates)
        current_row, current_column = cell_coordinates
    return list(reversed(path))

Simple Examples

  • One Letter

    We'll start with the simplest case, two strings with the same letter. Here's the distance table.

    editor = MinimumEdits("a", "a")
    print(editor)
    
      # a
    # 0 1
    a 1 0

    And here's the path through the table.

    print(PATH(backtrace(editor.distance_table)))
    
    row column
    0 0
    1 1

    So, we start at the top-left and move to the bottom-right. Not too excitingā€¦

    Now let's try adding a second letter to the target word.

    editor = MinimumEdits("a", "at")
    print(editor)
    
      # a t
    # 0 1 2
    a 1 0 1
    path = backtrace(editor.distance_table)
    print(PATH(path))
    
    row column
    0 0
    1 1
    1 2

    So we move from the top left then diagonally down and then laterally to the right. This gives us the first example of how the path is telling us to align the strings. Whenever the path moves horizontally (the row doesn't change) then that means you want to skip the character in the source.

Alignment

The rules for skipping characters as we move through the cells in the path are:

  1. If the current row is the same as the previous one, skip the character in the source, but add the character from the target.
  2. If the current column is the same as the previous one, skip the character in the target but add the character from the source.
def alignment(path: list, source: str, target: str,
              empty_token: str="*") -> None:
    """Prints the alignment for the path

    Args:
     path: list of (row, column) tuples
     source: the source string
     target: the target string
     empty_token: token to insert for skipped characters
    """
    previous_row = previous_column = None
    source_tokens = []
    target_tokens = []
    source = empty_token + source
    target = empty_token + target
    for current_row, current_column in path[1:]:
        source_token = source[current_row] if current_row != previous_row else empty_token
        target_token = target[current_column] if current_column != previous_column else empty_token

        source_tokens.append(source_token)
        target_tokens.append(target_token)

        previous_row, previous_column = current_row, current_column

    for tokens in (source_tokens, target_tokens):
        print(f"|{'|'.join(tokens)}|")
    return

Our alignment for the previous path looks like this.

alignment(path, "a", "at")
a *
a t

Where the * means skip that character. Okay, that might be obvious, but what if we have to skip the first letter?

editor = MinimumEdits("t", "at")
print(editor)
  # a t
# 0 1 2
t 1 2 1
path = backtrace(editor.distance_table)
print(PATH(path))
row column
0 0
0 1
1 2

So in the first two cells the row doesn't change meaning that we skip the first letter in the source.

a t
* t

A Little More Interesting Example

Let's try a slightly more interesting example, aligning "drats" and "maths". First, the distance table.

SOURCE, TARGET = "drats", "maths"
editor = MinimumEdits(SOURCE, TARGET)
print(editor)
  # m a t h s
# 0 1 2 3 4 5
d 1 2 3 4 5 6
r 2 3 4 5 6 7
a 3 4 3 4 5 6
t 4 5 4 3 4 5
s 5 6 5 4 5 4

Let's plot a heat map for it. If we plot the table as-is it ends up with the rows reversed, so we'll have to reverse the rows before plotting it.

reversed_table = editor.distance_frame.iloc[::-1]

plot = reversed_table.hvplot.heatmap(cmap=Plot.color_map).opts(
    title="Minimum Edit Distances",
    width=Plot.width, height=Plot.height, fontscale=Plot.fontscale
)
plot *= holoviews.Labels(plot)
outcome = Embed(plot=plot, file_name="drats_maths_distance_table")()
print(outcome)

Figure Missing

Now we can take a look at the path.

path = backtrace(editor.distance_table)
print(PATH(path))
row column
0 0
1 0
2 1
3 2
4 3
4 4
5 5

Now it's getting a little harder to see what's going on so let's plot the path along with the heatmap.

table = numpy.zeros(editor.distance_table.shape)
for row, column in path:
    table[row, column] = 10
table = pandas.DataFrame(table, index=list("#drats"), columns=list("#maths"))
table = table.iloc[::-1]
path_plot = table.hvplot.heatmap(colorbar=False, cmap=Plot.path_color_map).opts(
    title="Path For Alignment", width=1000, height=300)

distance_plot = reversed_table.hvplot.heatmap(cmap=Plot.color_map).opts(
    title="Minimum Edit Distances", width=1000, height=300
)
plot = (path_plot + distance_plot).cols(1).opts(
    width=800,
    height=600,
    fontscale=2,
)

outcome = Embed(plot=plot, file_name="drats_maths_alignment")()
print(outcome)

Figure Missing

In the top plot the dark-blue rectangles are the ones chosen by the backtrace and the lower plot is a heatmap of the distances for each cell in the distance-table. You can sort of see that the path matches the cooler (smaller distance) cells in the distance heat map as you work from the top-left cell to the bottom-right cell (the minimum edit distance).

To interpret the path: where the column repeats you skip a character in the target and where the row repeats you skip a character in the source so our alignment looks like this.

alignment(path, SOURCE, TARGET)
d r a t * s
* m a t h s

Examples From Dan Jurasky

These are examples from Dan Jurasky's CS 124 lecture slides.

Nucleotides

Using the bokeh backend for heatmaps doesn't let you use column and index names that repeat, and I haven't figured out how to set x-ticks and y-ticks explicitly so I'll do it in matplotlib instead.

HEIGHT, WIDTH = 300, 1000

SOURCE, TARGET = "AGGCTATCACCTGACCTCCAGGCCGATGCCC", "TAGCTATCACGACCGCGGTCGATTTGCCCGAC"
editor = MinimumEdits(SOURCE, TARGET)

path = backtrace(editor.distance_table)

The plotting didn't work for this set so I'm not showing it (it scrambled the order and reduced the number of characters).

alignment(path, SOURCE, TARGET)
* A G G C T A T C A C C T G A C C T C C A G G * C C G A T * * G C C C * * *
T A G * C T A T C A C * * G A C C G C * * G G T C * G A T T T G C C C G A C

Intention and Execution

SOURCE, TARGET = "INTENTION", "EXECUTION"
editor = MinimumEdits(SOURCE, TARGET)
path = backtrace(editor.distance_table)
alignment(path, SOURCE, TARGET)
I N T E * * * N T I O N
* * * E X E C U T I O N

The output given by the presentation is

I N T E * N T I O N
* E X E C U T I O N

But I can't find anyplace where he documents how he derives these alignments so I don't know how to get this form.

figure, axis = pyplot.subplots()
grid = seaborn.heatmap(editor.distance_frame, ax=axis)
figure.savefig(FOLDER + "intention.png")

intention.png

What About Sentences?

SOURCE = "he was big and bold and tall but old"
TARGET = "he is big i'm told but old"
editor = MinimumEdits(SOURCE, TARGET)
path = backtrace(editor.distance_table)
alignment(path, SOURCE, TARGET)
h e   w a s   b i g   a n d   b o l d   a n d   t a l l   b u t   o l d
h e   * i s   b i g   i ' m   t o l d   * * * * * * * * * b u t   o l d

Sort of, but I'm sure that's not the right way to do it.

End

NLTK

The NLTK has a function that will get the path for us. It sort of hides the table from us (there's a private(ish) function that you can call to make the path if you have the table, otherwise they build the table and return the path). I couldn't find a direct link to it, but the it's in the metrics.distance module and is called edit_distance_align.

Let's see what it does with our last example.

nltk_path = edit_distance_align("drats", "maths")
print(f"|Ours| NLTK's|")
print(f"|-+-|")
for theirs, ours in zip(path, nltk_path):
    print(f"|{ours}|{theirs}|")
Ours NLTK's
(0, 0) (0, 0)
(1, 1) (1, 0)
(2, 2) (2, 1)
(3, 2) (3, 2)
(4, 3) (4, 3)
(5, 4) (4, 4)
(5, 5) (5, 5)

So, you can see that ours doesn't really agree with theirs - which one of us is wrong?

help(edit_distance_align)
Help on function edit_distance_align in module nltk.metrics.distance:

edit_distance_align(s1, s2, substitution_cost=1)
    Calculate the minimum Levenshtein edit-distance based alignment
    mapping between two strings. The alignment finds the mapping
    from string s1 to s2 that minimizes the edit distance cost.
    For example, mapping "rain" to "shine" would involve 2
    substitutions, 2 matches and an insertion resulting in
    the following mapping:
    [(0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (4, 5)]
    NB: (0, 0) is the start state without any letters associated
    See more: https://web.stanford.edu/class/cs124/lec/med.pdf
    
    In case of multiple valid minimum-distance alignments, the
    backtrace has the following operation precedence:
    1. Skip s1 character
    2. Skip s2 character
    3. Substitute s1 and s2 characters
    The backtrace is carried out in reverse string order.
    
    This function does not support transposition.
    
    :param s1, s2: The strings to be aligned
    :type s1: str
    :type s2: str
    :type substitution_cost: int
    :rtype List[Tuple(int, int)]

Well, it looks like their substitution cost is 1 by default, not 2 like we're using. Take two.

nltk_align = partial(edit_distance_align, substitution_cost=2)
nltk_path = nltk_align("drats", "maths")
print(f"|Ours| NLTK's|")
print(f"|-+-|")
for ours, theirs in zip(path, nltk_path):
    print(f"|{ours}|{theirs}|")
    assert ours == theirs
Ours NLTK's
(0, 0) (0, 0)
(1, 0) (1, 0)
(2, 1) (2, 1)
(3, 2) (3, 2)
(4, 3) (4, 3)
(4, 4) (4, 4)
(5, 5) (5, 5)

So, if we're wrong, we're at least as wrong as NLTK is. Maybe.

Bundling It Up

Imports

# from pypi
import attr

# this repo
from neurotic.nlp.autocorrect.distance import MinimumEdits

The Aligner

@attr.s(auto_attribs=True)
class Aligner:
    """Create the alignment path

    Args: 
     source: the source string to align
     target: the target string to align
     empty_token: character to use to fill in alignments
    """
    source: str
    target: str
    empty_token: str="*"
    _source_alignment: list=None
    _target_alignment: list=None
    _table: str=None
    _editor: MinimumEdits=None
    _path: list=None

The Editor

@property
def editor(self) -> MinimumEdits:
    """object to figure out the minimum edit distance"""
    if self._editor is None:
        self._editor = MinimumEdits(self.source, self.target)
    return self._editor

The Alignment Path

@property
def path(self) -> list:
    """An optimal path through the distance table"""
    if self._path is None:
        distances = self.editor.distance_table
        # start at the bottom right cell
        current_row, current_column = (len(distances) - 1,
                                       len(distances[0]) - 1)
        path = [(current_row, current_column)]
        while (current_row, current_column) != (0, 0):
            one_row_back = current_row - 1
            one_column_up = current_column - 1
            edits = (
                # insert
                (distances[one_row_back, current_column], (one_row_back, current_column)),
                # delete
                (distances[current_row, one_column_up], (current_row, one_column_up)),
                # substitute
                (distances[one_row_back, one_column_up], (one_row_back, one_column_up))
            )
            minimum_edit_distance, cell_coordinates = min(edits)
            path.append(cell_coordinates)
            current_row, current_column = cell_coordinates
        self._path = list(reversed(path))
    return self._path

Source Alignment

@property
def source_alignment(self) -> list:
    """the aligned source tokens

    Warning:
     this doesn't create them, call the object to do that
    """
    return self._source_alignment

Target Alignment

@property
def target_alignment(self) -> list:
    """The aligned target tokens

    Warning:
     this doesn't create them, the __call__ does
    """
    return self._target_alignment

The Table

@property
def table(self) -> str:
    """The alignments as an orgtable"""
    if self._table is None:
        if self.source_alignment is None or self.target_alignment is None:
            self()
        self._table = (f"|{'|'.join(self.source_alignment)}|\n"
                       f"|{'|'.join(self.target_alignment)}|")
    return self._table

The Call

def __call__(self) -> tuple:
    """Sets the source and target token alignments

    Note:
     as a side-effect also sets source_alignment and target_alignment

    Returns:
     tuple of source and target tokens after alignment
    """
    previous_row = previous_column = None
    source_tokens = []
    target_tokens = []
    source = self.empty_token + self.source
    target = self.empty_token + self.target
    for current_row, current_column in self.path[1:]:
        source_token = (
            source[current_row] if current_row != previous_row
            else self.empty_token)
        target_token = (
            target[current_column] if current_column != previous_column
            else self.empty_token)

        source_tokens.append(source_token)
        target_tokens.append(target_token)

        previous_row, previous_column = current_row, current_column

    self._source_alignment = source_tokens
    self._target_alignment = target_tokens
    return source_tokens, target_tokens

The String Representation

def __str__(self) -> str:
    """pass-through for the table"""
    return self.table

Test It

from neurotic.nlp.autocorrect.alignment import Aligner

align = Aligner("source", "target")
print(align.editor)
  # t a r g e t
# 0 1 2 3 4 5 6
s 1 2 3 4 5 6 7
o 2 3 4 5 6 7 8
u 3 4 5 6 7 8 9
r 4 5 6 5 6 7 8
c 5 6 7 6 7 8 9
e 6 7 8 7 8 7 8
print(PATH(align.path))
row column
0 0
1 0
2 1
3 2
4 3
5 4
6 5
6 6
print(align())
(['s', 'o', 'u', 'r', 'c', 'e', '*'], ['*', 't', 'a', 'r', 'g', 'e', 't'])
print(align.table)
s o u r c e *
* t a r g e t
print(align)
s o u r c e *
* t a r g e t
align = Aligner("drafts", "maths")
nltk_path = nltk_align("drafts", "maths")
for ours, theirs in zip(align.path, nltk_path):
    assert ours == theirs, f"{theirs}\t{ours}"

print(align)
d r a f t * s
* m a * t h s