Parts-of-Speech Tagging: Hidden Markov Model

Beginning

Now you will build something more context specific. Concretely, you will be implementing a Hidden Markov Model (HMM) with a Viterbi decoder

  • The HMM is one of the most commonly used algorithms in Natural Language Processing, and is a foundation to many deep learning techniques you will see in this specialization.
  • In addition to parts-of-speech tagging, HMM is used in speech recognition, speech synthesis, etc.
  • By completing this part of the assignment you will get a 95% accuracy on the same dataset you used in Part 1.

The Markov Model contains a number of states and the probability of transition between those states.

  • In this case, the states are the parts-of-speech.
  • A Markov Model utilizes a transition matrix, A.
  • A Hidden Markov Model adds an observation or emission matrix B which describes the probability of a visible observation when we are in a particular state.
  • In this case, the emissions are the words in the corpus
  • The state, which is hidden, is the POS tag of that word.

Imports

# python
from argparse import Namespace
from functools import partial

import math

# pypi
from dotenv import load_dotenv

import holoviews
import hvplot.pandas
import numpy
import pandas

# this repository
from neurotic.nlp.parts_of_speech import DataLoader, TheTrainer

# my other stuff
from graeae import EmbedHoloviews

Set Up

The Data

load_dotenv("posts/nlp/.env")
loader = DataLoader()
trainer = TheTrainer(loader.processed_training)

Plotting

SLUG = "parts-of-speech-tagging-hidden-markov-model"
FOLDER = f"files/posts/nlp/{SLUG}/" 
Embed = partial(EmbedHoloviews, folder_path=FOLDER)

Plot = Namespace(
    width=990,
    height=780,
    fontscale=2,
    tan="#ddb377",
    blue="#4687b7",
    red="#ce7b6d",
    color_map="OrRd",
    path_color_map="blues",
)

Middle

Generating Matrices

Creating the 'A' transition probabilities matrix

Now that you have your `emission_counts`, `transition_counts`, and `tag_counts`, you will start implementing the Hidden Markov Model.

This will allow you to quickly construct the

  • A transition probabilities matrix.
  • and the B emission probabilities matrix.

You will also use some smoothing when computing these matrices.

Here is an example of what the A transition matrix would look like (it is simplified to 5 tags for viewing. It is a 46x46 matrix in this assignment.

A RBS RP SYM TO UH
RBS 2.217069e-06 2.217069e-06 2.217069e-06 0.008870 2.217069e-06
RP 3.756509e-07 7.516775e-04 3.756509e-07 0.051089 3.756509e-07
SYM 1.722772e-05 1.722772e-05 1.722772e-05 0.000017 1.722772e-05
TO 4.477336e-05 4.472863e-08 4.472863e-08 0.000090 4.477336e-05
UH 1.030439e-05 1.030439e-05 1.030439e-05 0.061837 3.092348e-02

Note that the matrix above was computed with smoothing.

Each cell gives you the probability to go from one part of speech to another.

  • In other words, there is a 4.47e-8 chance of going from parts-of-speech TO to RP.
  • The sum of each row has to equal 1, because we assume that the next POS tag must be one of the available columns in the table.

The smoothing was done as follows:

\[ P(t_i | t_{i-1}) = \frac{C(t_{i-1}, t_{i}) + \alpha }{C(t_{i-1}) +\alpha * N}\tag{3} \]

  • \(N\) is the total number of tags
  • \(C(t_{i-1}, t_{i})\) is the count of the tuple (previous POS, current POS) in `transition_counts` dictionary.
  • \(C(t_{i-1})\) is the count of the previous POS in the `tag_counts` dictionary.
  • \(\alpha\) is a smoothing parameter.
def create_transition_matrix(alpha: float, tag_counts: dict,
                             transition_counts: dict) -> numpy.ndarray:
    """Transition Matrix for the Hidden Markov Model

    Args: 
      ``alpha``: number used for smoothing
      ``tag_counts``: a dictionary mapping each tag to its respective count
      ``transition_counts``: transition count for the previous word and tag

    Returns:
      ``A``: matrix of dimension (``num_tags``,``num_tags``)
    """
    # Get a sorted list of unique POS tags
    all_tags = sorted(tag_counts.keys())

    # Count the number of unique POS tags
    num_tags = len(all_tags)

    # Initialize the transition matrix 'A'
    A = numpy.zeros((num_tags,num_tags))

    # Get the unique transition tuples (previous POS, current POS)
    trans_keys = set(transition_counts.keys())

    # Go through each row of the transition matrix A
    for i in range(num_tags):

        # Go through each column of the transition matrix A
        for j in range(num_tags):

            # Initialize the count of the (prev POS, current POS) to zero
            count = 0

            # Define the tuple (prev POS, current POS)
            # Get the tag at position i and tag at position j (from the all_tags list)
            key = (all_tags[i], all_tags[j])

            # Check if the (prev POS, current POS) tuple 
            # exists in the transition counts dictionary
            if key in transition_counts: #complete this line

                # Get count from the transition_counts dictionary 
                # for the (prev POS, current POS) tuple
                count = transition_counts[key]

            # Get the count of the previous tag (index position i) from tag_counts
            count_prev_tag = tag_counts[all_tags[i]]

            # Apply smoothing using count of the tuple, alpha, 
            # count of previous tag, alpha, and total number of tags
            A[i,j] = (count + alpha)/(count_prev_tag + alpha * num_tags)
    return A
# setup some values
alpha = 0.001
states = sorted(trainer.tag_counts.keys())

A = create_transition_matrix(alpha,
                             trainer.tag_counts,
                             trainer.transition_counts)
# Testing your function
expected = 0.000007040
actual = A[0,0]

print(f"A at row 0, col 0: {actual:.9f}")
assert math.isclose(expected, actual, abs_tol=1e-6), (expected, actual)

expected = 0.1691
actual = A[3,1]
print(f"A at row 3, col 1: {actual:.4f}")
assert math.isclose(expected, actual, abs_tol=1e-4)

print("View a subset of transition matrix A")
actual = A[30:35,30:35]
A_sub = pandas.DataFrame(actual, index=states[30:35], columns = states[30:35] )
print(A_sub)

expected = numpy.array([
 [2.217069e-06, 2.217069e-06, 2.217069e-06, 0.008870, 2.217069e-06],
 [3.756509e-07, 7.516775e-04, 3.756509e-07, 0.051089, 3.756509e-07],
 [1.722772e-05, 1.722772e-05, 1.722772e-05, 0.000017, 1.722772e-05],
 [4.477336e-05, 4.472863e-08, 4.472863e-08, 0.000090, 4.477336e-05],
 [1.030439e-05, 1.030439e-05, 1.030439e-05, 0.061837, 3.092348e-02],
])

assert numpy.allclose(expected, actual, atol=1e-5)
A at row 0, col 0: 0.000007040
A at row 3, col 1: 0.1691
View a subset of transition matrix A
              RBS            RP           SYM        TO            UH
RBS  2.217069e-06  2.217069e-06  2.217069e-06  0.008870  2.217069e-06
RP   3.756509e-07  7.516775e-04  3.756509e-07  0.051089  3.756509e-07
SYM  1.722772e-05  1.722772e-05  1.722772e-05  0.000017  1.722772e-05
TO   4.477336e-05  4.472863e-08  4.472863e-08  0.000090  4.477336e-05
UH   1.030439e-05  1.030439e-05  1.030439e-05  0.061837  3.092348e-02
plotter = pandas.DataFrame(A, index=states, columns=states)[::-1]
plot = plotter.hvplot.heatmap(cmap=Plot.color_map).opts(
    title="Emission Matrix A",
    width=Plot.width, height=Plot.height, fontscale=Plot.fontscale,
    xrotation=90,
)
#plot *= holoviews.Labels(plot)
outcome = Embed(plot=plot, file_name="emission_matrix_a")()
print(outcome)

Figure Missing

Looking at the plot you can see that there are a few transitions that are very likely. Maybe we can look at them to see if they're helpful.

URL = "https://www.ling.upenn.edu/courses/Fall_2003/ling001/penn_treebank_pos.html"
data = pandas.read_html(URL, header=0)[0]

TRANSLATOR = {row.Tag:row.Description for row in data.itertuples()}
print("|Tag| Description|")
print("|-+-|")
for tag in states:
    print(f"|{tag}|{TRANSLATOR.get(tag, 'unknown')}|")
Tag Description
# unknown
$ unknown
'' unknown
( unknown
) unknown
, unknown
. unknown
: unknown
CC Coordinating conjunction
CD Cardinal number
DT Determiner
EX Existential there
FW Foreign word
IN Preposition or subordinating conjunction
JJ Adjective
JJR Adjective, comparative
JJS Adjective, superlative
LS List item marker
MD Modal
NN Noun, singular or mass
NNP Proper noun, singular
NNPS Proper noun, plural
NNS Noun, plural
PDT Predeterminer
POS Possessive ending
PRP Personal pronoun
PRP$ Possessive pronoun
RB Adverb
RBR Adverb, comparative
RBS Adverb, superlative
RP Particle
SYM Symbol
TO to
UH Interjection
VB Verb, base form
VBD Verb, past tense
VBG Verb, gerund or present participle
VBN Verb, past participle
VBP Verb, non-3rd person singular present
VBZ Verb, 3rd person singular present
WDT Wh-determiner
WP Wh-pronoun
WP$ Possessive wh-pronoun
WRB Wh-adverb
`` unknown

The highest probabilities are at the bottom of the table (the red blocks) where it show that there's a 99% probability that a $ or a # will be followed by a Cardinal Number, which seems to make sense, especially since the original source was the Wall Street Journal, and you might expect there to be references to dollars. The next highest probability is that a Predeterminer will be followed by a Determiner, which makes sense based on the names, although I have no idea what those things are. And then the notion that a . will be followed by a --s--- (a period will be followed by the start of a new statement). So, at least for the most common cases it looks fairly intuitive.

Create the 'B' emission probabilities matrix

Now you will create the B transition matrix which computes the emission probability.

You will use smoothing as defined below:

\[ P(w_i | t_i) = \frac{C(t_i, word_i)+ \alpha}{C(t_{i}) +\alpha * N} \]

  • \(C(t_i, word_i)\) is the number of times\$word_i\) was associated with \(tag_i\) in the training data (stored in emission_counts dictionary).
  • \(C(t_i)\) is the number of times \(tag_i\) was in the training data (stored in tag_counts dictionary).
  • \(N\) is the number of words in the vocabulary
  • \(\alpha\) is a smoothing parameter.

The matrix B is of dimension (num_tags, N), where num_tags is the number of possible parts-of-speech tags.

Here is an example of the matrix, only a subset of tags and words are shown:

B 725 adroitly engineers promoted synergy
  8.201296e-05 2.732854e-08 2.732854e-08 2.732854e-08 2.732854e-08
NN 7.521128e-09 7.521128e-09 7.521128e-09 7.521128e-09 2.257091e-05
NNS 1.670013e-08 1.670013e-08 4.676203e-04 1.670013e-08 1.670013e-08
VB 3.779036e-08 3.779036e-08 3.779036e-08 3.779036e-08 3.779036e-08
RB 3.226454e-08 6.456135e-05 3.226454e-08 3.226454e-08 3.226454e-08
RP 3.723317e-07 3.723317e-07 3.723317e-07 3.723317e-07 3.723317e-07

We're now going to implement the create_emission_matrix that computes the B emission probabilities matrix. Your function takes in \(\alpha\), the smoothing parameter, tag_counts, which is a dictionary mapping each tag to its respective count, the emission_counts dictionary where the keys are (tag, word) and the values are the counts. Your task is to output a matrix that computes equation 4 for each cell in matrix B.

Create Emission Matrix

def create_emission_matrix(alpha: float,
                           tag_counts: dict,
                           emission_counts: dict,
                           vocab: dict) -> numpy.ndarray:
    """Create Matrix B

    Args: 
       ``alpha``: tuning parameter used in smoothing 
       ``tag_counts``: a dictionary mapping each tag to its respective count
       ``emission_counts``: a dictionary where the keys are (tag, word) and the values are the counts
       ``vocab``: a dictionary where keys are words in vocabulary and value is an index.
              within the function it'll be treated as a list
    Returns:
       ``B``: a matrix of dimension ``(num_tags, len(vocab))``
    """

    # get the number of POS tag
    num_tags = len(tag_counts)

    # Get a list of all POS tags
    all_tags = sorted(tag_counts.keys())

    # Get the total number of unique words in the vocabulary
    num_words = len(vocab)

    # Initialize the emission matrix B with places for
    # tags in the rows and words in the columns
    B = numpy.zeros((num_tags, num_words))

    # Get a set of all (POS, word) tuples 
    # from the keys of the emission_counts dictionary
    emis_keys = set(list(emission_counts.keys()))

    ### START CODE HERE (Replace instances of 'None' with your code) ###

    # Go through each row (POS tags)
    for i in range(num_tags): # complete this line

        # Go through each column (words)
        for j in range(num_words): # complete this line

            # Initialize the emission count for the (POS tag, word) to zero
            count = 0

            # Define the (POS tag, word) tuple for this row and column
            key =  (all_tags[i], vocab[j])

            # check if the (POS tag, word) tuple exists as a key in emission counts
            if key in emission_counts: # complete this line

                # Get the count of (POS tag, word) from the emission_counts d
                count = emission_counts[key]

            # Get the count of the POS tag
            count_tag = tag_counts[all_tags[i]]

            # Apply smoothing and store the smoothed value 
            # into the emission matrix B for this row and column
            B[i,j] = (count + alpha)/(count_tag + alpha * num_words)

    ### END CODE HERE ###
    return B
# creating your emission probability matrix. this takes a few minutes to run.
vocab = loader.vocabulary
B = create_emission_matrix(alpha,
                           trainer.tag_counts,
                           trainer.emission_counts,
                           list(vocab))

actual = B[0,0]
expected = 0.000006032
print(f"View Matrix position at row 0, column 0: {actual:.9f}")
assert math.isclose(actual, expected, abs_tol=1e-6)

actual = B[3,1]
expected = 0.000000720
print(f"View Matrix position at row 3, column 1: {actual:.9f}")
assert math.isclose(actual, expected, abs_tol=1e-7)

# Try viewing emissions for a few words in a sample dataframe
cidx  = ['725','adroitly','engineers', 'promoted', 'synergy']

# Get the integer ID for each word
cols = [vocab[a] for a in cidx]

# Choose POS tags to show in a sample dataframe
rvals =['CD','NN','NNS', 'VB','RB','RP']

# For each POS tag, get the row number from the 'states' list
rows = [states.index(a) for a in rvals]

# Get the emissions for the sample of words, and the sample of POS tags
actual = B[numpy.ix_(rows,cols)]
B_sub = pandas.DataFrame(actual, index=rvals, columns = cidx )
print(B_sub)
expected = ([
 [8.201296e-05, 2.732854e-08, 2.732854e-08, 2.732854e-08, 2.732854e-08],
 [7.521128e-09, 7.521128e-09, 7.521128e-09, 7.521128e-09, 2.257091e-05],
 [1.670013e-08, 1.670013e-08, 4.676203e-04, 1.670013e-08, 1.670013e-08],
 [3.779036e-08, 3.779036e-08, 3.779036e-08, 3.779036e-08, 3.779036e-08],
 [3.226454e-08, 6.456135e-05, 3.226454e-08, 3.226454e-08, 3.226454e-08],
 [3.723317e-07, 3.723317e-07, 3.723317e-07, 3.723317e-07, 3.723317e-07],
])

assert numpy.allclose(expected, actual, atol=1e-5)
View Matrix position at row 0, column 0: 0.000006032
View Matrix position at row 3, column 1: 0.000000720
              725      adroitly     engineers      promoted       synergy
CD   8.201296e-05  2.732854e-08  2.732854e-08  2.732854e-08  2.732854e-08
NN   7.521128e-09  7.521128e-09  7.521128e-09  7.521128e-09  2.257091e-05
NNS  1.670013e-08  1.670013e-08  4.676203e-04  1.670013e-08  1.670013e-08
VB   3.779036e-08  3.779036e-08  3.779036e-08  3.779036e-08  3.779036e-08
RB   3.226454e-08  6.456135e-05  3.226454e-08  3.226454e-08  3.226454e-08
RP   3.723317e-07  3.723317e-07  3.723317e-07  3.723317e-07  3.723317e-07
plotter = B_sub[::-1]
plot = plotter.hvplot.heatmap(cmap=Plot.color_map).opts(
    title="Emission Matrix B",
    width=Plot.width, height=Plot.height, fontscale=Plot.fontscale,
    xrotation=90,
)
outcome = Embed(plot=plot, file_name="emission_matrix_b")()
print(outcome)

Figure Missing

There's 23,777 words in the vocabulary so I'm not plotting the whole thing.

End

Bundle It Up

The Imports

# pypi
import attr
import numpy

The Matrices

@attr.s(auto_attribs=True)
class Matrices:
    """The matrices for the hidden markov model

    Args:
     ``transition_counts``: dictionary of counts of adjacent POS tags
     ``emission_counts``: dictionary of (word, POS) counts
     ``tag_counts``: dictionary of POS tag-counts
     ``words``: list of words in the vocabulary
     ``alpha``: The smoothing value
    """
    transition_counts: dict
    emission_counts: dict
    tag_counts: dict
    # all the lists have to be sorted for the matrices to match
    words: list=attr.ib(converter=sorted)
    alpha: float=0.001
    _tags: list=None
    _tag_count: int=None
    _word_count: int=None
    _transition: numpy.ndarray=None
    _emission: numpy.ndarray=None

The Tags

@property
def tags(self) -> list:
    """Sorted list of the POS tags"""
    if self._tags is None:
        self._tags = sorted(self.tag_counts)
    return self._tags

The Tag Count

@property
def tag_count(self) -> int:
    """Number of tags"""
    if self._tag_count is None:
        self._tag_count = len(self.tags)
    return self._tag_count

The Word Count

@property
def word_count(self) -> int:
    """Number of words in the vocabulary"""
    if self._word_count is None:
        self._word_count = len(self.words)
    return self._word_count

The Transition Matrix

@property
def transition(self) -> numpy.ndarray:
    """The Transition Matrix"""
    if self._transition is None:
        self._transition = numpy.zeros((self.tag_count, self.tag_count))
        for row in range(self.tag_count):
            for column in range(self.tag_count):
                key = (self.tags[row], self.tags[column])
                count = self.transition_counts[key] if key in self.transition_counts else 0
                previous_tag_count = self.tag_counts[self.tags[row]]
                self._transition[row, column] = (
                    (count + self.alpha)
                    /(previous_tag_count + self.alpha * self.tag_count))
    return self._transition

The Emission Matrix

@property
def emission(self) -> numpy.ndarray:
    """The Emission Matrix"""
    if self._emission is None:
        self._emission = numpy.zeros((self.tag_count, self.word_count))
        for row in range(self.tag_count):
            for column in range(self.word_count):
                key = (self.tags[row], self.words[column])
                emission_count = self.emission_counts[key] if key in self.emission_counts else 0
                tag_count = self.tag_counts[self.tags[row]]
                self._emission[row, column] = (
                    (emission_count + self.alpha)
                    /(tag_count + self.alpha * self.word_count)                
                )
    return self._emission

Test It Out

from neurotic.nlp.parts_of_speech.matrices import Matrices
load_dotenv("posts/nlp/.env")
loader = DataLoader()
trainer = TheTrainer(loader.processed_training)

matrices = Matrices(transition_counts=trainer.transition_counts,
                    emission_counts=trainer.emission_counts,
                    tag_counts=trainer.tag_counts,
                    words=loader.vocabulary_words,
                    alpha=0.001)

The Transition Matrix

transition = matrices.transition
expected = 0.000007040
actual = transition_matrix[0,0]

print(f"Transition Matrix at row 0, col 0: {actual:.9f}")
assert math.isclose(expected, actual, abs_tol=1e-6), (expected, actual)

expected = 0.1691
actual = transition_matrix[3,1]
print(f"Transition Matrix at row 3, col 1: {actual:.4f}")
assert math.isclose(expected, actual, abs_tol=1e-4)

print("View a subset of the transition matrix")
actual = transition_matrix[30:35,30:35]
A_sub = pandas.DataFrame(actual, index=states[30:35], columns = states[30:35] )
print(A_sub)

expected = numpy.array([
 [2.217069e-06, 2.217069e-06, 2.217069e-06, 0.008870, 2.217069e-06],
 [3.756509e-07, 7.516775e-04, 3.756509e-07, 0.051089, 3.756509e-07],
 [1.722772e-05, 1.722772e-05, 1.722772e-05, 0.000017, 1.722772e-05],
 [4.477336e-05, 4.472863e-08, 4.472863e-08, 0.000090, 4.477336e-05],
 [1.030439e-05, 1.030439e-05, 1.030439e-05, 0.061837, 3.092348e-02],
])

assert numpy.allclose(expected, actual, atol=1e-5)
Transition Matrix at row 0, col 0: 0.000007040
Transition Matrix at row 3, col 1: 0.1691
View a subset of the transition matrix
              RBS            RP           SYM        TO            UH
RBS  2.217069e-06  2.217069e-06  2.217069e-06  0.008870  2.217069e-06
RP   3.756509e-07  7.516775e-04  3.756509e-07  0.051089  3.756509e-07
SYM  1.722772e-05  1.722772e-05  1.722772e-05  0.000017  1.722772e-05
TO   4.477336e-05  4.472863e-08  4.472863e-08  0.000090  4.477336e-05
UH   1.030439e-05  1.030439e-05  1.030439e-05  0.061837  3.092348e-02

The Emission Matrix

matrices = Matrices(transition_counts=trainer.transition_counts,
                    emission_counts=trainer.emission_counts,
                    words=loader.vocabulary_words,
                    tag_counts=trainer.tag_counts,
                    alpha=0.001)
emission = matrices.emission

actual = emission[0,0]
expected = 0.000006032
print(f"Emission Matrix position at row 0, column 0: {actual:.9f}")
assert math.isclose(actual, expected, abs_tol=1e-6)

actual = emission[3,1]
expected = 0.000000720
print(f"Emission Matrix position at row 3, column 1: {actual:.9f}")
assert math.isclose(actual, expected, abs_tol=1e-7)

# Try viewing emissions for a few words in a sample dataframe
columns  = ['725','adroitly','engineers', 'promoted', 'synergy']

# Get the integer ID for each word
column_ids = [loader.vocabulary[column] for column in columns]

# Choose POS tags to show in a sample dataframe
rows =['CD','NN','NNS', 'VB','RB','RP']

# For each POS tag, get the row number from the 'states' list
row_numbers = [matrices.tags.index(a) for a in rows]

# Get the emissions for the sample of words, and the sample of POS tags
actual = emission[numpy.ix_(row_numbers,column_ids)]
B_sub = pandas.DataFrame(actual, index=rows, columns = columns)
print(B_sub)
expected = ([
 [8.201296e-05, 2.732854e-08, 2.732854e-08, 2.732854e-08, 2.732854e-08],
 [7.521128e-09, 7.521128e-09, 7.521128e-09, 7.521128e-09, 2.257091e-05],
 [1.670013e-08, 1.670013e-08, 4.676203e-04, 1.670013e-08, 1.670013e-08],
 [3.779036e-08, 3.779036e-08, 3.779036e-08, 3.779036e-08, 3.779036e-08],
 [3.226454e-08, 6.456135e-05, 3.226454e-08, 3.226454e-08, 3.226454e-08],
 [3.723317e-07, 3.723317e-07, 3.723317e-07, 3.723317e-07, 3.723317e-07],
])

assert numpy.allclose(expected, actual, atol=1e-5)
Emission Matrix position at row 0, column 0: 0.000006032
Emission Matrix position at row 3, column 1: 0.000000720
              725      adroitly     engineers      promoted       synergy
CD   8.201296e-05  2.732854e-08  2.732854e-08  2.732854e-08  2.732854e-08
NN   7.521128e-09  7.521128e-09  7.521128e-09  7.521128e-09  2.257091e-05
NNS  1.670013e-08  1.670013e-08  4.676203e-04  1.670013e-08  1.670013e-08
VB   3.779036e-08  3.779036e-08  3.779036e-08  3.779036e-08  3.779036e-08
RB   3.226454e-08  6.456135e-05  3.226454e-08  3.226454e-08  3.226454e-08
RP   3.723317e-07  3.723317e-07  3.723317e-07  3.723317e-07  3.723317e-07

Note: I was getting the wrong values because I switched to the DataLoader.vocabulary_words list, which isn't sorted (because I was trying to follow the example). The vocabulary is sorted, but it's a dict so then you have to convert it to a list… in the future just sort everything.