Karatsuba Multiplication

Imports and Setup

Imports

# from python
from __future__ import annotations
from functools import partial
from collections import namedtuple

import math
import random
import sys


# from pypi
from joblib import Parallel, delayed
from expects import (
    be_a,
    contain_exactly,
    equal,
    expect,
    raise_error
)

import altair
import pandas

from graeae import Timer
from graeae.visualization.altair_helpers import output_path, save_chart

Set Up

TIMER = Timer()
SLUG = "karatsuba-multiplication"
OUTPUT_PATH = output_path(SLUG)
save_it = partial(save_chart, output_path=OUTPUT_PATH)

MultiplicationOutput = namedtuple("MultiplicationOutput", ["product", "count"])
PlotMultiplicationOutput = namedtuple(
    "PlotOutput",
    ["product", "count", "digits", "factor_1", "factor_2"])

The Algorithms

Grade School Multiplication

Grade-School multiplication is how most of us are taught to multiply numbers with more than one digit each. Each digit in one number is multiplied by each digit in the other to create a partial product. Once we've gone through all the digits in the first number we sum up all the partial products we calculated to get our final answer.

\begin{algorithm}
\caption{Grade-School}
\begin{algorithmic}
\REQUIRE The input arrays are of the same length ($n$)
\INPUT Two arrays representing digits in integers ($a, b$)
\OUTPUT The product of the inputs

\PROCEDURE{GradeSchool}{$number_1, number_2$}

\FOR {$j \in \{0 \ldots n - 1\}$}
  \STATE $carry \gets 0$
  \FOR {$i \in \{0 \ldots n - 1\}$}
    \STATE $product \gets a[i] \times b[j] + carry$
    \STATE $partial[j][i + j] \gets product \bmod 10$
    \STATE $carry \gets product/10$
  \ENDFOR
  \STATE $partial[j][n + j] \gets carry$
\ENDFOR

\STATE $carry \gets 0$

\FOR {$i \in \{0 \ldots 2n - 1\}$}
  \STATE $sum \gets carry$
  \FOR {$j \in \{0 \ldots n - 1\}$}
    \STATE $sum \gets sum + partial[j][i]$
  \ENDFOR
  \STATE $result[i] \gets sum \bmod 10$
  \STATE $carry \gets sum/10$
\ENDFOR

\STATE $result[2n] \gets carry$
\RETURN result
\ENDPROCEDURE
\end{algorithmic}
\end{algorithm}

Karatsuba Multiplication

Karatsuba Multiplication improves on the Grade-School algorithm using a trick from Frederick Gauss, which is a little too much of a diversion. With all these improvements in multiplication it seems like you need a background in number theory that not all Computer Scientist's have (or at least I don't). Maybe take it on faith for now.

\begin{algorithm}
\caption{Karatsuba}
\begin{algorithmic}
\REQUIRE The input arrays are of the same length
\INPUT Two arrays representing digits in integers
\OUTPUT The product of the inputs

\PROCEDURE{Karatsuba}{$number_1, number_2$}

\STATE $\textit{digits} \gets $ \textsc{Length}($number_1$)

\IF {$\textit{digits} = 1$}
  \RETURN $number_1 \times number_2$
\ENDIF

\STATE $middle \gets \left\lfloor \frac{\textit{digits}}{2} \right\rfloor$

\STATE \\
\STATE $MostSignificant_1, LeastSignificant_1 \gets $ \textsc{Split}($number_1, middle$)
\STATE $MostSignificant_2, LeastSignificant_2 \gets $ \textsc{Split}($number_2, middle$)
\STATE \\
\STATE $MostPlusLeast_1 \gets MostSignificant_1 + LeastSignificant_1$
\STATE $MostPlusLeast_2 \gets MostSignificant_2 + LeastSignificant_2$
\STATE \\

\STATE \textit{left} $\gets $ \textsc{Karatsuba}($MostSignificant_1, MostSignificant_2$)
\STATE \textit{summed} $\gets $ \textsc{Karatsuba}($MostPlusLeast_1, MostPlusLeast_2$)
\STATE \textit{right} $\gets $ \textsc{Karatsuba}($LeastSignificant_1, LeastSignificant_2$)
\STATE \\
\STATE \textit{center} $\gets$ (\textit{summed} - \textit{left} - \textit{right})
\STATE \\
\RETURN \textit{left} $\times 10^{\textit{digits}} + \textit{center} \times 10^{\textit{middle}} + \textit{right}$
\ENDPROCEDURE
\end{algorithmic}
\end{algorithm}

This is a mashup of the Wikipedia version and the Algorithms Illuminated version. It's a little tricky in that we're dealing with integers, in theory, but we have to know the number of digits and how to split it up so in code we're going to have to work with a collection instead, but hopefully this will be clearer in code.

An IntList

class IntegerDigits:
    """A hybrid integer and digits list

    Args:
     integer: the number to store
     padding: number of 0's to add to the left of the digits
    """
    def __init__(self, integer: int) -> None:
        self.integer = integer
        self._digits = None
        return

    @property
    def digits(self) -> list:
        """The digits for the given integer

        Raises:
         ValueError if the given integer isn't really an integer

        Returns:
         zero-padded list of digits
        """
        if self._digits is None:
            digits = [int(digit) for digit in str(self.integer)]
            length = len(digits)
            power_of_two = 2**math.ceil(math.log2(length))
            padding = power_of_two - length
            self._digits = [0] * padding + digits
        return self._digits

    def add_padding(self, padding: int) -> None:
        """Add more zeros to the left of the digits

        Args:
         padding: number of zeros to add to the left of the digits
        """
        self._digits = [0] * padding + self.digits
        return

    def set_length(self, target: int) -> None:
        """Set the total length of the digit list

        Args:
         target: total number of digits to have

        Raises:
         RuntimeError: target is less than the current number of digits
        """
        if target < len(self):
            raise RuntimeError(f"target {target} is less than current {len(self.digits)} digits")

        padding = target - len(self)
        self.add_padding(padding)
        return

    def set_equal_length(self, other: IntegerDigits) -> None:
        """Set both self and other to have the same number of digits"""
        target = max(len(self), len(other))
        self.set_length(target)
        other.set_length(target)
        return

    def reset(self) -> None:
        """Clean out any generated attributes"""
        self._digits = None
        return

    # collection methods

    def __len__(self) -> int:
        """The number of digits"""
        return len(self.digits)

    def __getitem__(self, key) -> IntegerDigits:
        """Slice the digits"""
        sliced = self.digits[key]
        if type(sliced) is int:
            sliced = [sliced]
        gotten = IntegerDigits(sum((value * 10**(len(sliced) - 1 - index)
                                    for index, value in enumerate(sliced))))
        # preserve any padding
        gotten._digits = sliced
        return gotten
    # integer operations

    def __add__(self, value) -> IntegerDigits:
        """Add an integer or IntegerDigits to this integer"""
        return IntegerDigits(self.integer + value if type(value) is int
                             else self.integer + value.integer)

    def __sub__(self, value) -> IntegerDigits:
        """Subtract an integer or IntegerDigits from this integer"""
        return IntegerDigits(self.integer - value if type(value) is int
                             else self.integer - value.integer)

    def __mul__(self, value) -> IntegerDigits:
        """multiply integer by integer or IntegerDigits"""
        return IntegerDigits(self.integer * value if type(value) is int
                             else self.integer * value.integer)

    # comparisons
    def __eq__(self, other) -> bool:
        """Compare to integer or IntegerDigits"""
        return other == self.integer

    def __lt__(self, other) -> bool:
        return self.integer < other

    def __gt__(self, other) -> bool:
        return self.integer > other

    def __ge__(self, other) -> bool:
        return self.integer >= other

    def __repr__(self) -> str:
        return f"<IntegerDigits: {self.integer}>"

Test it

test = IntegerDigits(567)
# build the digits padded to power of 2
expect(len(test.digits)).to(equal(4))

# implement the length dunder method
expect(len(test)).to(equal(4))

# add slicing
expect(test[0]).to(equal(0))
expect(test[-1]).to(equal(7))
expect(test[:2].digits).to(contain_exactly(0, 5))

# multiplication
product = test * 2
expect(product.integer).to(equal(567 * 2))
test_2 = IntegerDigits(2)
expect(len(test_2)).to(equal(1))
product = test * test_2
expect(product.integer).to(equal(2 * 567))

# addition
sum_ = test + 10
expect(sum_.integer).to(equal(577))

sum_ = test + test_2
expect(sum_.integer).to(equal(569))

# subtraction
difference = test - 20
expect(difference.integer).to(equal(547))

difference = test_2 - test
expect(difference.integer).to(equal(-565))

An Implementation

Karatsuba Multiplication

def karatsuba(integer_1: IntegerDigits,
              integer_2: IntegerDigits) -> MultiplicationOutput:
    """Multiply integer_1 and integer_2

    Args:
     integer_1, integer_2: arrays with equal number of digits

    Returns:
     product of the integers, count
    """
    digits = len(integer_1)
    if digits == 1:
        return MultiplicationOutput(integer_1 * integer_2, 1)
    middle = digits//2

    most_significant_1, least_significant_1 = integer_1[:middle], integer_1[middle:]
    most_significant_2, least_significant_2 = integer_2[:middle], integer_2[middle:]

    most_plus_least_1 = most_significant_1 + least_significant_1
    most_plus_least_2 = most_significant_2 + least_significant_2

    # a hack to keep them the same number of digits after the addition
    most_plus_least_1.set_equal_length(most_plus_least_2)

    left, count_left = karatsuba(most_significant_1, most_significant_2)
    summed, count_summed = karatsuba(most_plus_least_1, most_plus_least_2)
    right, count_right  = karatsuba(least_significant_1, least_significant_2)

    center = summed - left - right

    output = left * 10**digits + center * 10**middle + right

    if output < 0:
        raise RuntimeError(f"left: {left} center: {center} right: {right}")

    return MultiplicationOutput(output, count_left + count_summed + count_right)
def karatsuba_multiplication(integer_1: int,
                             integer_2: int,
                             count_padding: bool=True) -> PlotMultiplicationOutput:
    """Sets up and runs the Karatsuba Multiplication

    Args:
     integer_1, integer_2: the two values to multiply
     count_padding: whether the digit count should include the padding

    Returns:
     product, count, digits
    """
    assert integer_1 >=0
    assert integer_2 >= 0

    integer_1 = IntegerDigits(integer_1)
    integer_2 = IntegerDigits(integer_2)
    if not count_padding:
        for index, digit in enumerate(integer_1.digits):
            if digit > 0:
                original_1 = len(integer_1.digits[index:])
                break
        for index, digit in enumerate(integer_2.digits):
            if digit > 0:
                original_2 = len(integer_2.digits[index:])
                break
        original_digits = max(original_1, original_2)

    # make them have the same number of digits
    integer_1.set_equal_length(integer_2)

    if count_padding:
        original_digits = len(integer_1)
    output = karatsuba(integer_1, integer_2)
    return PlotMultiplicationOutput(product=output.product,
                                    count=output.count,
                                    digits=original_digits,
                                    factor_1=integer_1.integer,
                                    factor_2=integer_2.integer)

Test

a, b = 2, 3
output = karatsuba_multiplication(a, b)
expect(output.product).to(equal(a * b))
expect(output.digits).to(equal(1))

a = 222
output = karatsuba_multiplication(a, b, True)
expect(output.product).to(equal(666))
expect(output.digits).to(equal(4))

Test

def test_karatsuba(first: int, second: int):
    expected = first * second
    output = karatsuba_multiplication(first, second)
    expect(output.product).to(equal(expected))
    return
limit = int(sys.maxsize**0.5)
for digits in range(limit - 100, limit):
    a = random.randrange(digits - 1000, digits + 1000)
    b = random.randrange(digits - 1000, digits + 1000)
    try:
        test_karatsuba(a, b)
    except AssertionError as error:
        print(f"maxsize: {sys.maxsize}")
        print(f"a: {a}")
        print(f"b: {b}")
        print(f"a x b: {a * b}")
        print(f"maxsize - a * b: {sys.maxsize - a * b}")
        raise

Example values from the Algorithms Illuminated website.

a = 3141592653589793238462643383279502884197169399375105820974944592
b = 2718281828459045235360287471352662497757247093699959574966967627
test_karatsuba(a, b)

Run Time

Using the Master Method

Let's use the Master Method to find the theoretical upper bound for Karatsuba Multiplication.

The basic form of the Master Method is this:

\[ T(n) = a T(\frac{n}{b}) + O(n^d) \]

Variable Description Value
\(a\) Recursive calls within the function 3
\(b\) Amount the input is split up 2
\(d\) Exponent for the work done outside of the recursion 1

We make three recursive calls within the Karatsuba function and split the data in half within each call. The amount of work done outside the recursion is constant so \(O\left(n^d\right) = O\left(n^1\right)\). \(a > b^d\) so we have the case where the sub-problems grow faster than the input is reduced, giving us:

\begin{align} T(n) &= O\left(n^{\log_b a}\right) \\ &= O\left(n^{\log_2 3}\right) \end{align}

With Padding

Let's plot the base-case counts alongside the theoretical bounds we found using the Master Method.

First we'll create the numbers to multiply.

digit_supply = range(1, 101)
things_to_multiply = [(random.randrange(10**(digits - 1), 10**digits),
                        random.randrange(10**(digits - 1), 10**digits))
                        for digits in digit_supply]

Now we'll do the math, running the cases in parallel using Joblib.

with TIMER:
    karatsuba_outputs = Parallel(n_jobs=-1)(
        delayed(karatsuba_multiplication)(*thing_to_multiply)
        for thing_to_multiply in things_to_multiply)
Started: 2022-05-13 23:52:06.399789
Ended: 2022-05-13 23:52:09.347825
Elapsed: 0:00:02.948036

Now a little plotting.

frame = pandas.DataFrame.from_dict(
    {"Karatsuba Count": [output.count for output in karatsuba_outputs],
     "Digits": [output.digits for output in karatsuba_outputs],
     "digits^log2(3)": [output.digits**(math.log2(3)) for output in karatsuba_outputs],
     "6 x digits^log2(3)": [6 * output.digits**(math.log2(3)) for output in karatsuba_outputs]     
})

melted = frame.melt(id_vars=["Digits"],  value_vars=["Karatsuba Count",
                                                     "digits^log2(3)",
                                                     "6 x digits^log2(3)"],
                    var_name="Source", value_name="Multiplications")

chart = altair.Chart(melted).mark_line(point=altair.OverlayMarkDef()).encode(
    x="Digits", y="Multiplications",
    color="Source",
    tooltip=["Digits",
             altair.Tooltip("Multiplications", format=",")]).properties(
                 title="Basic Multiplications vs Digits (with Padding)",
                 width=800,
                 height=525)

save_it(chart, "karatsuba-multiplications")

Figure Missing

Since when I added the padding I made sure that the number of digits was a power of two, the numbers are bunched up around those powers of two (so there's a lot of wasted computation, maybe) but the multiplication counts still fall within a constant multiple of our theoretical runtime.

Without Padding

Since I didn't make the karatsuba work without padding this will just show the points spaced out, but the counts will still be based on there being padding.

unpadded = lambda a, b: karatsuba_multiplication(a, b, count_padding=False)

with TIMER:
    unpadded_outputs = Parallel(n_jobs=-1)(
        delayed(unpadded)(*thing_to_multiply)
        for thing_to_multiply in things_to_multiply)
Started: 2022-05-13 23:52:20.020179
Ended: 2022-05-13 23:52:22.052011
Elapsed: 0:00:02.031832
frame = pandas.DataFrame.from_dict(
    {"Karatsuba Count": [output.count for output in unpadded_outputs],
     "Digits (pre-padding)": [output.digits for output in unpadded_outputs],
     "digits^log2(3)": [output.digits**(math.log2(3)) for output in karatsuba_outputs],
     "6 x digits^log2(3)": [6 * output.digits**(math.log2(3)) for output in karatsuba_outputs],
     "6 x digits^log2(3) (no padding)": [6 * output.digits**(math.log2(3))
                                         for output in unpadded_outputs],
     "n^2 (no padding)": [output.digits**2
                          for output in unpadded_outputs],
})

melted = frame.melt(id_vars=["Digits (pre-padding)"],  value_vars=["Karatsuba Count",
                                                                   "digits^log2(3)",
                                                                   "6 x digits^log2(3)",
                                                                   "6 x digits^log2(3) (no padding)",
                                                                   "n^2 (no padding)"],
                    var_name="Source", value_name="Multiplications")

chart = altair.Chart(melted).mark_line().encode(
    x="Digits (pre-padding)", y="Multiplications",
    color="Source",
    tooltip=[altair.Tooltip("Digits (pre-padding)", type="quantitative"),
             altair.Tooltip("Multiplications", format=",")]).properties(
                 title="Basic Multiplications vs Digits (without Padding)",
                 width=800,
                 height=525)

save_it(chart, "karatsuba-multiplications-unpadded")

Figure Missing

Since I don't have an easy way to turn off using padding the Multiplication counts are still based on using padding, but this view spreads the digit-counts out so it's a little easier to see. The Multiplication counts are broken up into bands because the padding is based on keeping the number of digits a power of two.

Just for reference, here's the last product we multiplied.

output = karatsuba_outputs[-1]
print(f"{output.product.integer:,}")

expect(output.product).to(equal(output.factor_1 * output.factor_2))
56,913,917,723,202,495,576,238,408,244,650,506,926,406,731,625,206,370,840,517,493,281,396,538,892,710,818,017,869,257,379,987,881,688,195,601,612,438,838,803,669,047,089,313,679,236,814,971,999,554,405,895,121,583,263,228,500,933,878,783,310,375,258,385,063,631,332

Sources

Grade-School Multiplication

I took the grade-school algorithm from the Lecture 2 Notes on this course-site.

The Maximum-Subarray Problem

Imports and Setup

Imports

# python
from collections import namedtuple
from datetime import datetime
from functools import partial
from math import inf as infinity
from math import log2
from typing import Sequence, Tuple

import random

# pypi
import altair
import pandas

# my stuff
from graeae import Timer
from graeae.visualization.altair_helpers import output_path, save_chart

Set Up

TIMER = Timer()

SLUG = "the-maximum-subarray-problem"
OUTPUT_PATH = output_path(SLUG)
save_it = partial(save_chart, output_path=OUTPUT_PATH)

MAXIMUM_SUBARRAY = Tuple[int, int, float, int]
Case = namedtuple("Case", ["inputs", "left", "right", "max"])

Plot = namedtuple("Plot", ["height", "width"], defaults=[700, 1000])
PLOT = Plot()

Output = namedtuple("Output", ["start", "end", "gain", "runtime"])

Implementations

The motivation given by the text is that we have some data that varies over time and we want to find the pair of points that maximizes the gain from the first point to the second.

Test Case

CASE_ONE = Case([100, 113, 110, 85, 105, 102, 86, 63, 81,
                 101, 94, 106, 101, 79, 94, 90, 97],
                left=7,
                right=11,
                max=43)
x = CASE_ONE.inputs
changes = [0] + [x[index + 1] - x[index] for index in range(len(x) - 1)]

frame = pandas.DataFrame({"Numbers": x,
                          "Change": changes})
melted = frame.reset_index()
melted = melted.melt(id_vars=["index"], value_vars=["Numbers", "Change"] , var_name="Data Source", value_name="Data")
chart = altair.Chart(melted).mark_line().encode(
    x=altair.X("index", type="quantitative"),
    y=altair.Y("Data", type="quantitative"),
    color="Data Source").properties(
        title="Sample Input One", width=PLOT.width, height=525)

save_it(chart, "sample-input-one")

Figure Missing

Brute Force

The first way we're going to find the maximum gain is using Brute Force - we're going to calculate the gain for each pair of points and picking the pair with the best gain. Since this involves checking every pair we're essentially looking at a Handshake Problem so the number of comparisons should be \(\frac{n(n - 1)}{2}\) or \(\Theta(n^2)\).

def brute_force_maximum_subarray(source: Sequence) -> Output:
    """Finds the subarray with maximum gain

    Args:
     source: sequence with sub-array to maximize

    Returns:
     (left-max-index, right-max-index, gain, count)
    """
    best_left = best_right = None
    best_gain = -infinity
    count = 0
    for left in range(len(source) - 1):
        for right in range(left + 1, len(source)):
            count += 1
            gain = source[right] - source[left]
            if gain > best_gain:
                best_left, best_right, best_gain = left, right, gain
    return Output(best_left, best_right, best_gain, count)
n = len(CASE_ONE.inputs)
print(f"n = {n}")
output = brute_force_maximum_subarray(CASE_ONE.inputs)
assert output.start == CASE_ONE.left
assert output.end == CASE_ONE.right
assert output.gain == CASE_ONE.max
print(f"count = {output.runtime}")
print(f"n(n-1)/2 = {int((n * (n - 1))/2)}")
print(f"n^2 = {n**2}")
n = 17
count = 136
n(n-1)/2 = 136
n^2 = 289

Recursive

Looking at the brute-force solution you might think - "That was a check of point-pairs, why is this called 'max-subarray'?". Well, we're going to improve upon the brute-force implementation by doing a data-transformation. If you look at the data that we set up you might notice that there's the point values and then there's the change in value from one point to another. The changes are there because we can re-think our problem not as the maximum of increases between pair points, but instead as the maximum of point-change-sums.

In our CASE_ONE inputs, the biggest gain came from point 7 (with a value of 63) and point 11 (with a value of 106). If we look at the difference between consecutive points from points 7 to 11 we get -23, 18, 20, -7, 12 which sums to 20, which is the maximum sub-array total for this data set. To solve this problem we're going to need to find this sub-array. Since we stated that we're going to use a divide-and-conquer approach we know that we're going to repeatedly break the inputs up into (in this case) two separate sub-sets to solve. Every time we break the inputs into two groups we end up with three possibilities as to where the maximum sub-array lies:

  • in the left half
  • in the right half
  • crossing from the left half over the split to the right half

Max-Crossing-Subarray

This is a function that will find the best subarray in the left and in the right halves of the sub-array and combines them to find the gain across the mid-point of the subarray.

def find_max_crossing_subarray(source: Sequence,
                               low: int, mid: int,
                               high: int) -> Output:
    """Find the max subarray that crosses the mid-point

    Args:
     source: the array to search for the sub-sequence in
     low: the lower-bound for the indices to search within
     mid: the mid-index between low and high
     high: the upper-bound for the indices to search within

    Returns:
     left-index, right-index, gain, count
    """
    count, best_left, best_right= 0, None, None
    left_gain = right_gain = -infinity
    gain = 0
    for left in range(mid, low, -1):
        count += 1
        gain += source[left]
        if gain > left_gain:
            left_gain = gain
            best_left = left
    gain = 0
    for right in range(mid + 1, high):
        count += 1
        gain += source[right]
        if gain > right_gain:
            right_gain = gain
            best_right = right
    return Output(best_left, best_right, left_gain + right_gain, count)

Max-Subarray

The find_maximum_subarray function finds the start and end indices for the best gain.

def find_maximum_subarray(source: Sequence,
                          low: int, high: int) -> Output:
    """Find the sub-array that maximizes gain

    Args:
     source: sequence to maximize
     low: lower-bound for indices in source
     high: upper-bound for indices in source

    Returns:
     left-index, right-index, gain, count
    """
    if high == low:
        start, end, gain, count = low, high, source[low], 1

    else:
        mid = (low + high)//2

        left = find_maximum_subarray(source, low, mid)
        right = find_maximum_subarray(source, mid + 1, high)
        cross_mid = find_max_crossing_subarray(source, low, mid, high)

        count = left.runtime + right.runtime + cross_mid.runtime

        best_gain = max(left.gain, right.gain, cross_mid.gain)

        if left.gain == best_gain:
            start, end, gain, count = left.start, left.end, left.gain, count
        elif right.gain == best_gain:
            start, end, gain, count = right.start, right.end, right.gain, count
        else:
            start, end, gain, count = (cross_mid.start, cross_mid.end,
                                       cross_mid.gain, count)
    return Output(start, end, gain, count)

The maximum_subarray converts our list of values to a list of changes between consecutive values so that we can use our divide-and-conquer function.

def maximum_subarray(source: Sequence) -> Output:
    """Finds the sub-array with maximum gain

    Args:
     source: array to maximize

    Returns:
     left-index, right-index, gain, count
    """
    start, end = 0, len(source) - 1
    changes = [source[index + 1] - source[index] for index in range(end)]
    output = find_maximum_subarray(changes, start, end - 1)

    # our 'changes' has one fewer entry than the original list so up the right index by 1
    end = output.end + 1
    return Output(output.start, end, output.gain, output.runtime)
n = len(CASE_ONE.inputs)
print(f"n = {n}")
output = maximum_subarray(CASE_ONE.inputs)
assert output.start == CASE_ONE.left, f"Expected: {CASE_ONE.left}, Actual: {output.start}"
assert output.end == CASE_ONE.right, f"Expected: {CASE_ONE.right}, Actual: {output.end}"
assert output.gain == CASE_ONE.max, f"Expected: {CASE_ONE.max}, Actual: {output.gain}"
print(f"left: {output.start}, right: {output.end}, gain: {output.gain}")
print(f"count = {output.runtime}")
print(f"n(n-1)/2 = {int((n * (n - 1))/2)}")
print(f"n log n : {n * log2(n): 0.2f}")
print(f"n^2 = {n**2}")
n = 17
left: 7, right: 11, gain: 43
count = 50
n(n-1)/2 = 136
n log n :  69.49
n^2 = 289

By transforming the problem to one that lets us use divide-and-conquer we reduced the number of comparisons from 136 to 50. More generally, if we look at find_maximum_subarray we see that we're splitting the input in half before each of the recursive calls so we're going to make \(log_2 n\) splits and at each level of the recursions tree we're going to have \(2n\) inputs (left and right are each \(\frac{1}{2} n\) and cross_mid uses \(n\)) so we're going from \(\Theta(n^2)\) for the brute-force verios to \(\Theta(n \log n)\) for the divide-and-conquer version.

Alternate Version

Let's try another version that uses the idea of summing the changes between consecutive points but doesn't use recursion (see Wikipedia: Kardane's algorithm).

def max_subarray_2(source: Sequence) -> Output:
    """Gets the maximal subarray

    This is an alternate version that doesn't use recursion or brute-force

    Args:
     source: sequence to maximize

    Returns:
     left-index, right-index, gain, count
    """
    count = 1
    best_total = -infinity
    best_start = best_end = 0
    current_total = 0

    changes = [source[index + 1] - source[index] for index in range(len(source) - 1)]
    for here, value_here in enumerate(changes):
        count += 1
        if current_total <= 0:
            current_start = here
            current_total = value_here
        else:
            current_total += value_here

        if current_total > best_total:
            best_total = current_total
            best_start = current_start
            best_end = here + 1
    return Output(best_start, best_end, best_total, count)
n = len(CASE_ONE.inputs)
print(f"n = {n}")
left, right, gain, count = max_subarray_2(CASE_ONE.inputs)
assert left == CASE_ONE.left, f"Expected: {CASE_ONE.left}, Actual: {left}"
assert right == CASE_ONE.right, f"Expected: {CASE_ONE.right}, Actual: {right}"
assert gain == CASE_ONE.max, f"Expected: {CASE_ONE.max}, Actual: {gain}"
print(f"left: {left}, right: {right}, gain: {gain}")

print(f"Count: {count}")
n = 17
left: 7, right: 11, gain: 43
Count: 17

So, without using divide-and-conquer we get an even better runtime - it was the data transformation that was the most valuable part in improving the performance.

Comparing the Methods

def run_thing(thing, inputs, name):
    print(f"*** {name} ***")
    start = datetime.now()
    runtime = thing(inputs).runtime
    stop = datetime.now()
    print(f"\tElapsed Time: {stop - start}")
    return runtime

brutes = []
divided = []
linear = []
counts = []

UPPER = 6
for exponent in range(1, UPPER):
    count = 10**exponent
    title = f"n = {count:,}"
    underline = "=" * len(title)
    print(f"\n{title}")
    print(underline)
    inputs = list(range(count))
    inputs = random.choices(inputs, k=count)
    brutes.append(run_thing(brute_force_maximum_subarray, inputs, "Brute Force"))
    divided.append(run_thing(maximum_subarray, inputs, "Divide and Conquer"))
    linear.append(run_thing(max_subarray_2, inputs, "Linear"))
    counts.append(count)

n = 10
======
*** Brute Force ***
        Elapsed Time: 0:00:00.001054
*** Divide and Conquer ***
        Elapsed Time: 0:00:00.000249
*** Linear ***
        Elapsed Time: 0:00:00.000127

n = 100
=======
*** Brute Force ***
        Elapsed Time: 0:00:00.002403
*** Divide and Conquer ***
        Elapsed Time: 0:00:00.002198
*** Linear ***
        Elapsed Time: 0:00:00.000175

n = 1,000
=========
*** Brute Force ***
        Elapsed Time: 0:00:00.002527
*** Divide and Conquer ***
        Elapsed Time: 0:00:00.017359
*** Linear ***
        Elapsed Time: 0:00:00.002204

n = 10,000
==========
*** Brute Force ***
        Elapsed Time: 0:00:00.071900
*** Divide and Conquer ***
        Elapsed Time: 0:00:00.044104
*** Linear ***
        Elapsed Time: 0:00:00.000224

n = 100,000
===========
*** Brute Force ***
        Elapsed Time: 0:00:07.506323
*** Divide and Conquer ***
        Elapsed Time: 0:00:00.023113
*** Linear ***
        Elapsed Time: 0:00:00.001233

Plot It

runtimes = pandas.DataFrame({"Brute Force": brutes,
                             "Divide and Conquer": divided,
                             "Linear": linear,
                             "Input Size": counts})

melted = runtimes.melt(id_vars=["Input Size"],
                       value_vars=["Brute Force", "Divide and Conquer", "Linear"],
                       var_name="Algorithm", value_name="Comparisons")

chart = altair.Chart(melted).mark_line(point=altair.OverlayMarkDef()).encode(
    x=altair.X("Input Size", type="quantitative"),
    y=altair.Y("Comparisons", type="quantitative"),
    color="Algorithm"
).properties(
    title="Comparison Counts", width=PLOT.width, height=525)

save_it(chart, "algorithm_comparisons")

Figure Missing

Stepping up to a million with Brute Force takes too long (I've never let it run to the end to see how long). Let's see if the linear and divide and conquer can handle it, though.

linear_more = linear[:]
divided_more = divided[:]
counts_more = counts[:]

for exponent in range(6, 10):
    count = 10**exponent
    title = f"n = {count:,}"
    underline = "=" * len(title)
    print(f"\n{title}")
    print(underline)

    inputs = list(range(count))
    inputs = random.choices(inputs, k=count)

    linear_more.append(run_thing(max_subarray_2, inputs, "Linear"))
    divided_more.append(run_thing(maximum_subarray, inputs, "Divide and Conquer"))
    counts_more.append(count)

n = 1,000,000
=============
*** Linear ***
        Elapsed Time: 0:00:00.008010
*** Divide and Conquer ***
        Elapsed Time: 0:00:00.294771

n = 10,000,000
==============
*** Linear ***
        Elapsed Time: 0:00:00.169138
*** Divide and Conquer ***
        Elapsed Time: 0:00:02.018406

n = 100,000,000
===============
*** Linear ***
        Elapsed Time: 0:00:00.850640
*** Divide and Conquer ***
        Elapsed Time: 0:00:21.290975

n = 1,000,000,000
=================
*** Linear ***
        Elapsed Time: 0:00:07.666758
*** Divide and Conquer ***
        Elapsed Time: 0:03:32.097027

They do pretty well, it seems to be the brute force that dies out.

longtimes = pandas.DataFrame({"Linear": linear_more,
                              "Divide & Conquer": divided_more,
                              "Input Size": counts_more})

melted = longtimes.melt(id_vars=["Input Size"],
                       value_vars=["Divide & Conquer", "Linear"],
                       var_name="Algorithm", value_name="Comparisons")
chart = altair.Chart(melted).mark_line(point=altair.OverlayMarkDef()).encode(
    x=altair.X("Input Size", type="quantitative"),
    y=altair.Y("Comparisons", type="quantitative"),
    color="Algorithm").properties(
        title="Comparison Counts", width=PLOT.width, height=525)

save_it(chart, "longer_algorithm_comparisons")

Figure Missing

Shortest Paths: Dijkstra's Algorithm

Table of Contents

Setup

# python
from pprint import pprint
from queue import PriorityQueue

# pypi
from expects import be, equal, expect 
# this project
from bowling.data_structures.graphs.shortest_paths import (
    Edge,
    Graph,
    Vertex,
    initialize_single_source,
    relax,
    )

Dijkstra's Algorithm

def dijkstras_shortest_paths(graph: Graph, source: Vertex) -> None:
    """Find the shortest paths beginning at the source

    Args:
     graph: the vertices and edges to use
     source: the starting vertex for the paths
    """
    initialize_single_source(graph, source)
    shortest = set()
    queue = PriorityQueue()
    for vertex in graph.vertices:
        queue.put(vertex)
    while not queue.empty():
        vertex = queue.get()
        shortest.add(vertex)
        for edge in graph.adjacent[vertex]:
            relax(edge)
    return

Test It

nodes = dict()
for node in "stxyz":
    nodes[node] = Vertex(identifier=node)

graph = Graph()

graph.add_edge(Edge(nodes["s"], nodes["t"], 10))
graph.add_edge(Edge(nodes["s"], nodes["y"], 5))
graph.add_edge(Edge(nodes["t"], nodes["x"], 1))
graph.add_edge(Edge(nodes["t"], nodes["y"], 2))

graph.add_edge(Edge(nodes["x"], nodes["z"], 4))

graph.add_edge(Edge(nodes["y"], nodes["t"], 3))
graph.add_edge(Edge(nodes["y"], nodes["x"], 9))
graph.add_edge(Edge(nodes["y"], nodes["z"], 2))

graph.add_edge(Edge(nodes["z"], nodes["s"], 7))
graph.add_edge(Edge(nodes["z"], nodes["x"], 6))

dijkstras_shortest_paths(graph, nodes["s"])

pprint(graph.vertices)

expected = (("s", 0, None),
            ("y", 5, "s"),
            ("z", 7, "y"),
            ("t", 8, "y"),
            ("x", 9, "t"))

for node, path_estimate, predecessor in expected:
    parent = nodes[predecessor] if predecessor is not None else predecessor
    expect(nodes[node].path_estimate).to(equal(path_estimate))
    expect(nodes[node].predecessor).to(be(parent))
{s (path-estimate=0),
 y (path-estimate=5),
 z (path-estimate=7),
 t (path-estimate=8),
 x (path-estimate=9)}

Shortest Paths: Bellman-Ford

<<constants>>


<<the-vertex>>

    <<vertex-representation>>

    <<vertex-hash>>


<<the-edge>>

    <<edge-representation>>


<<the-graph>>

    <<add-edge>>


<<initialize-single-source>>


<<relax>>

Set Up

# python
from __future__ import annotations
from collections import defaultdict
from dataclasses import dataclass, field

INFINITE = INFINITY = float("inf")

The Vertex

@dataclass(order=True)
class Vertex:
    """A vertex for the single-source shortest paths problem

    Args:
     identifier: something to distinguish the vertex
     path_estimate: the estimate of the path weight
     predecessor: the vertex that precedes this one in the path
    """
    identifier: str=field(compare=False)
    path_estimate: float=INFINITE
    predecessor: Vertex=field(default=None, compare=False)

Object Representation

def __repr__(self) -> str:
    return f"{self.identifier} (path-estimate={self.path_estimate})"

Vertex Hash

def __hash__(self) -> int:
    return hash(self.identifier)

The Edge

class Edge:
    """A directed edge

    Args:
     source: tail vertex of the edge
     target: head vertex of the edge
     weight: the weight of the edge
    """
    def __init__(self, source: Vertex, target: Vertex, weight: float) -> None:
        self.source = source
        self.target = target
        self.weight = weight
        return

The Edge Representation

def __repr__(self) -> str:
    return f"{self.source} -- {self.weight} --> {self.target}"

The Graph

class Graph:
    """Directed Graph for shortest path problem"""
    def __init__(self) -> None:
        self.vertices = set()
        self.edges = set()
        self.adjacent = defaultdict(set)
        return

Add Edge

def add_edge(self, edge: Edge) -> None:
    """Add the edge to the graph

    Args:
     edge: the directed edge to add
    """
    self.edges.add(edge)
    self.vertices.add(edge.source)
    self.vertices.add(edge.target)
    self.adjacent[edge.source].add(edge)
    return

Initialize Single Source

def initialize_single_source(graph: Graph, source: Vertex) -> None:
    """Setup the vertices of the graphs for single-source shortest path

    Args:
     graph: graph with vertices to set up
     source: the vertex to use as the start of the shortest paths tree
    """
    for vertex in graph.vertices:
        vertex.path_estimate = INFINITY
        vertex.predecessor = None
    source.path_estimate = 0
    return

Relax

def relax(edge: Edge) -> None:
    """Check if target Vertex is improved using the source vertex

    Args:
     edge: directed edge with source, target, and weight
    """
    if edge.target.path_estimate > edge.source.path_estimate + edge.weight:
        edge.target.path_estimate = edge.source.path_estimate + edge.weight
        edge.target.predecessor = edge.source
    return

Bellman-Ford

Set Up

# python
from pprint import pprint

# pypi
from expects import be, be_true, equal, expect

# this project
from bowling.data_structures.graphs.shortest_paths import (
    Edge,
    Graph,
    Vertex,
    initialize_single_source,
    relax,
    )

SUCCEEDED, NEGATIVE_WEIGHT_CYCLE = True, False

The Function

def bellman_ford(graph: Graph, source: Vertex) -> bool:
    """Find the shortest paths using the Bellman-Ford algorithm

    Args:
     graph: the graph to process
     source: the vertex to start the paths from

    Returns:
     True if finished, False if there was a negtive-weight cycle in the grahp
    """
    initialize_single_source(graph, source)
    for _ in range(1, len(graph.vertices)):
        for edge in graph.edges:
            relax(edge)
    for edge in graph.edges:
        if edge.target.path_estimate > edge.source.path_estimate + edge.weight:
            return NEGATIVE_WEIGHT_CYCLE
    return SUCCEEDED

Test It

nodes = dict()
for label in "stxyz":
    nodes[label] = Vertex(label)

graph = Graph()
graph.add_edge(Edge(nodes["s"], nodes["t"], 6))
graph.add_edge(Edge(nodes["s"], nodes["y"], 7))
graph.add_edge(Edge(nodes["t"], nodes["x"], 5))
graph.add_edge(Edge(nodes["t"], nodes["y"], 8))
graph.add_edge(Edge(nodes["t"], nodes["z"], -4))
graph.add_edge(Edge(nodes["x"], nodes["t"], -2))
graph.add_edge(Edge(nodes["x"], nodes["t"], -2))
graph.add_edge(Edge(nodes["y"], nodes["x"], -3))
graph.add_edge(Edge(nodes["y"], nodes["z"], 9))
graph.add_edge(Edge(nodes["z"], nodes["x"], 7))
graph.add_edge(Edge(nodes["z"], nodes["s"], 2))

expect(bellman_ford(graph, nodes["s"])).to(be_true)
pprint(nodes)

expected = (("s", 0, None),
            ("t", 2, "x"),
            ("x", 4, "y"),
            ("y", 7, "s"),
            ("z", -2, "t")
)

for node, path_weight, predecessor in expected:
    expect(nodes[node].path_estimate).to(equal(path_weight))
    parent = nodes[predecessor] if predecessor is not None else predecessor
    expect(nodes[node].predecessor).to(be(parent))
{'s': s (path-estimate=0),
 't': t (path-estimate=2),
 'x': x (path-estimate=4),
 'y': y (path-estimate=7),
 'z': z (path-estimate=-2)}

Minimum Spanning Trees: Kruskal's Algorithm

Set Up

# python
from __future__ import annotations
from pprint import pprint

# pypi
from expects import be, be_true, contain_only, equal, expect

A Vertex

class Vertex:
    """A node in the graph

    Args:
     identifier: something to distinguish the node
     parent: The node pointing to this node
     rank: The height of the node
    """
    def __init__(self, identifier, parent: Vertex=None, rank: int=0) -> None:
        self.identifier = identifier
        self.parent = parent
        self.rank = rank
        return

    @property
    def is_root(self) -> bool:
        """Checks if this vertex is the root of a tree

        Returns:
         true if this is a root
        """
        return self.parent is self

    def __repr__(self) -> str:
        return f"{self.identifier}"

Edges

class Edge:
    """A weighted edge between two nodes

    Args:
     node_1: vertex at one end
     node_2: vertex at the other end
     weight: weight of the edge
    """
    def __init__(self, node_1: Vertex, node_2: Vertex, weight: float) -> None:
        self.node_1 = node_1
        self.node_2 = node_2
        self.weight = weight
        return

    def __repr__(self) -> str:
        return f"{self.node_1} --{self.weight}-- {self.node_2}"

A Graph

class Graph:
    """An Undirected Graph
    """
    def __init__(self) -> None:
        self.vertices = set()
        self.edges = set()
        return

    def add_edge(self, node_1: Vertex, node_2: Vertex, weight: float) -> None:
        """Add the nodes and an edge between them

        Note:
         although the graph is undirected, the (node_2, node_1) edge 
         isn't added, it's assumed that the order of the arguments 
         doesn't matter
        """
        self.vertices.add(node_1)
        self.vertices.add(node_2)
        self.edges.add(Edge(node_1, node_2, weight))
        return

Disjoint Sets

class Disjoint:
    """Methods to treat the tree as a set"""
    @classmethod
    def make_sets(cls, vertices: set) -> None:
        """Initializes the vertices as trees in a forest

        Args:
         vertices: collection of nodes to set up
        """
        for vertex in vertices:
            cls.make_set(vertex)
        return

    @classmethod
    def make_set(cls, vertex: Vertex) -> None:
        """Initialize the values

        Args:
         vertex: node to set up
        """
        vertex.parent = vertex
        vertex.rank = 0
        return

    @classmethod
    def find_set(cls, vertex: Vertex) -> Vertex:
        """Find the root of the set that vertex belongs to

        Args:
         vertex: member of set to find

        Returns:
         root of set that vertex belongs to
        """
        if not vertex.is_root:
            vertex.parent = cls.find_set(vertex.parent)            
        return vertex.parent

    @classmethod
    def union(cls, vertex_1: Vertex, vertex_2: Vertex) -> None:
        """merge the trees that the vertices belong to

        Args:
         vertex_1: member of first tree
         vertex_2: member of second tree
        """
        cls.link(cls.find_set(vertex_1), cls.find_set(vertex_2))
        return

    @classmethod
    def link(cls, root_1: Vertex, root_2: Vertex) -> None:
        """make lower-ranked tree root a child of higher-ranked

        Args:
         root_1: root of a tree
         root_2: root of a different tree
        """
        if root_1.rank > root_2.rank:
            root_2.parent = root_1
        else:
            root_1.parent = root_2
            if root_1.rank == root_2.rank:
                root_2.rank += 1
        return

Kruskal's Algorithm

def kruskal(graph: Graph) -> set:
    """Create a Minimum Spanning Tree out of Vertices

    Args:
     graph: the graph from which we create the MST

    Returns:
     set of edges making up the minimum spanning tree
    """
    spanning_tree = set()
    Disjoint.make_sets(graph.vertices)
    edges = sorted(graph.edges, key=lambda edge: edge.weight)
    for edge in edges:
        tree_1 = Disjoint.find_set(edge.node_1)
        tree_2 = Disjoint.find_set(edge.node_2)
        if (tree_1 is not tree_2):
            spanning_tree.add(edge)
            Disjoint.union(edge.node_1, edge.node_2)
    return spanning_tree

Try It Out

nodes = dict()

for identifier in "abcdefghi":
    nodes[identifier] = Vertex(identifier)
graph = Graph()
graph.add_edge(nodes["a"], nodes["b"], 4)
graph.add_edge(nodes["a"], nodes["h"], 8)
graph.add_edge(nodes["b"], nodes["h"], 11)
graph.add_edge(nodes["b"], nodes["c"], 8)
graph.add_edge(nodes["c"], nodes["d"], 7)
graph.add_edge(nodes["c"], nodes["i"], 2)
graph.add_edge(nodes["c"], nodes["f"], 4)
graph.add_edge(nodes["d"], nodes["e"], 9)
graph.add_edge(nodes["d"], nodes["f"], 14)
graph.add_edge(nodes["e"], nodes["f"], 10)
graph.add_edge(nodes["f"], nodes["g"], 2)
graph.add_edge(nodes["g"], nodes["h"], 1)
graph.add_edge(nodes["g"], nodes["i"], 6)
graph.add_edge(nodes["h"], nodes["i"], 7)

Disjoint.make_sets(graph.vertices)

expect(graph.vertices).to(contain_only(*nodes.values()))

for node in nodes.values():
    expect(node.parent).to(be(node))
    expect(node.rank).to(equal(0))

tree = kruskal(graph)
pprint(tree)
{a --4-- b,
 a --8-- h,
 c --7-- d,
 c --2-- i,
 c --4-- f,
 d --9-- e,
 f --2-- g,
 g --1-- h}

Note: I originally thought I could check the tree structure, but the disjoint-set methods use Path Compression, so all the nodes end up having the same parent (in this case node "h").

Graphs: Strongly Connected Components

Beginning

Imports For Testing

# pypi
from expects import (
    be_a,
    expect,
    contain_exactly,
    contain_only,
)
# this project
from bowling.data_structures.graphs.depth_first_search import (
    DepthFirstSearch,
    DFSVertex,
    DirectedGraph,
)

from bowling.data_structures.graphs.transpose import Transpose

The Graph Transpose

A transpose of a directed graph (\(G^T\)) contains the same nodes and the same edges but the direction of each edge is reversed.

Imports for the Transpose

# python
from collections import defaultdict

# this project
from bowling.data_structures.graphs.depth_first_search import (
    DepthFirstSearch,
    DFSVertex,
    DirectedGraph)

The Transpose

class Transpose:
    """Creates the transpose of a graph

    Args:
     graph: the directed graph to transpose
    """
    def __init__(self, graph: DirectedGraph) -> None:
        self.graph = graph
        self._adjacent = None
        self._vertices = None
        return

The Adjacency Sets

@property
def adjacent(self) -> dict:
    """Vertex: set of adjacente vertices dictionary"""
    if self._adjacent is None:
        self._adjacent = defaultdict(set)

        for vertex, neighbors in self.graph.adjacent.items():
            for neighbor in neighbors:
                print(f"Adding neighbor {vertex} to {self.vertices}")
                self._adjacent[neighbor].add(vertex)
    return self._adjacent

The Vertices

@property
def vertices(self) -> set:
    """The vertices in the directed graph"""
    if self._vertices is None:
        self._vertices = self.graph.vertices
    return self._vertices

Testing the Transpose

graph = DirectedGraph()
vertex_a = DFSVertex("a")
vertex_b = DFSVertex("b")
vertex_e = DFSVertex("e")
vertex_f = DFSVertex("f")

graph.add(vertex_a, vertex_b)
graph.add(vertex_b, vertex_e)
graph.add(vertex_b, vertex_f)
graph.add(vertex_e, vertex_a)
graph.add(vertex_e, vertex_f)

transpose = Transpose(graph)

expect(transpose.vertices).to(contain_only(vertex_a, vertex_b,
                                           vertex_e, vertex_f))

expect(transpose.adjacent[vertex_a]).to(contain_exactly(vertex_e))
expect(transpose.adjacent[vertex_b]).to(contain_exactly(vertex_a))
expect(transpose.adjacent[vertex_e]).to(contain_exactly(vertex_b))
expect(transpose.adjacent[vertex_f]).to(contain_only(vertex_b, vertex_e))

Strongly Connected Components

A Strongly Connected Component of a Directed Graph is a sub-graph where each pair of vertices has an edge in both directions (if a has an edge to b then b has an edge to a).

The Steps to Build Them

  1. Call DFS(G) to compute the finishing times.
  2. Compute \(G^T\)
  3. Call DFS(\(G^T\) ), going through the vertices in decreasing finish time in the main loop.
  4. Output the Depth-First forest created in step 3 as the Strongly Connected Components.

The Builder

class StronglyConnectedComponents:
    """Build the strongly connected components sub-trees

    Args:
     graph: directed graph to process
    """
    def __init__(self, graph: DirectedGraph) -> None:
        self.graph = graph
        self._transpose = None
        self._forest = None
        return

The Transpose

@property
def transpose(self) -> Transpose:
    """The transpose of the original graph"""
    if self._transpose is None:
        self._transpose = Transpose(self.graph)
    return self._transpose

Find the Child

Unlike with a Binary Search Tree, our vertices don't keep track of their child nodes, just the predecessor node so this is a helper to get all the children of a node.

def find_child(self, vertices: set, predecessor: DFSVertex,
               path: list) -> list:
    """Find the child nodes of the given predecessor

    Note:
     this is a helper to find the children in a Strongly Connected Component
     For it to make sense the vertices should have the forest already built

    Args:
     vertices: source of nodes
     predecessor: node in the graph to find the child of
     path: list to append child nodes to
    """
    for vertex in vertices:
        if vertex.predecessor is predecessor:
            path.append(vertex)
            self.find_child(vertices, vertex, path)
    return path

The Call

@property
def forest(self) -> dict:
    """creates the forest with the strongly connected components

    Returns:
     adjacency dict for the strongly connected components
    """
    if self._forest is None:
        # do the search to get the finish times
        searcher = DepthFirstSearch(self.graph)
        searcher()

        # change the transpose vertices to be in reversed finish order
        vertices = sorted(self.graph.vertices,
                          key=lambda vertex: vertex.finished,
                          reverse=True)
        self.transpose._vertices = dict(zip(vertices, (None,) * len(vertices)))

        # do a depth first search on the graph transpose
        searcher.graph = self.transpose
        searcher()

        # at this point the strongly connected components are set up in 
        # self.transpose, but you have to do some figuring to get the trees

        # get the roots of the trees in the forest
        roots = (vertex for vertex in self.transpose.vertices
                 if vertex.predecessor is None)

        # build a forest for each root by finding its children
        forest = (self.find_child(self.transpose.vertices, root, [root])
                  for root in roots)

        # build a new adjacency dict
        self._forest = dict()

        # the neighbors are all the original adjacent nodes without the 
        # tree nodes
        for tree in forest:
            neighbors = set()
            for node in tree:
                neighbors = neighbors.union(self.graph.adjacent[node])
            neighbors = neighbors - set(tree)
            self._forest[tuple(tree)] = neighbors
    return self._forest

Testing it

from bowling.data_structures.graphs.transpose import StronglyConnectedComponents

graph = DirectedGraph()
vertex_a = DFSVertex("a")
vertex_b = DFSVertex("b")
vertex_c = DFSVertex("c")
vertex_d = DFSVertex("d")
vertex_e = DFSVertex("e")
vertex_f = DFSVertex("f")
vertex_g = DFSVertex("g")
vertex_h = DFSVertex("h")

graph.add(vertex_a, vertex_b)
graph.add(vertex_b, vertex_c)
graph.add(vertex_b, vertex_e)
graph.add(vertex_b, vertex_f)
graph.add(vertex_c, vertex_d)
graph.add(vertex_c, vertex_g)
graph.add(vertex_d, vertex_c)
graph.add(vertex_d, vertex_h)
graph.add(vertex_e, vertex_a)
graph.add(vertex_e, vertex_f)
graph.add(vertex_f, vertex_g)
graph.add(vertex_g, vertex_f)
graph.add(vertex_g, vertex_h)
graph.add(vertex_h, vertex_h)

components = StronglyConnectedComponents(graph)
forest = components.forest

Let's see what's in the forest.

Since the Depth-First Search created a forest of strongly connected components, we can see how many trees are in the forest by finding the roots (the nodes with no predecessor).

for root in forest:
    print(root)
abe
cd
fg
h
[autoreload of bowling.data_structures.graphs.transpose failed: Traceback (most recent call last):
  File "/home/bravo/.virtualenvs/Bowling-For-Data/site-packages/IPython/extensions/autoreload.py", line 245, in check
    superreload(m, reload, self.old_objects)
  File "/home/bravo/.virtualenvs/Bowling-For-Data/site-packages/IPython/extensions/autoreload.py", line 394, in superreload
    module = reload(module)
  File "/usr/lib/pypy3/lib-python/3/imp.py", line 314, in reload
    return importlib.reload(module)
  File "/usr/lib/pypy3/lib-python/3/importlib/__init__.py", line 169, in reload
    _bootstrap._exec(spec, module)
  File "<frozen importlib._bootstrap>", line 639, in _exec
  File "<builtin>/frozen importlib._bootstrap_external", line 737, in exec_module
  File "<builtin>/frozen importlib._bootstrap_external", line 873, in get_code
  File "<builtin>/frozen importlib._bootstrap_external", line 804, in source_to_code
  File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
  File "/home/bravo/Bowling-For-Data/bowling/data_structures/graphs/transpose.py", line 126
    self._forest[tuple(tree])] = neighbors
                           ^
SyntaxError: closing parenthesis ']' does not match opening parenthesis '('
]

So We have four Strongly Connected Components.


End

Graphs: Topological Ordering

Table of Contents

Beginning

# python
from functools import partial

# this project
from bowling.data_structures.graphs.depth_first_search import (
    DFSVertex,
    DepthFirstSearch,
    DirectedGraph)

Middle

underpants = DFSVertex("underpants")
pants = DFSVertex("pants")
belt = DFSVertex("belt")
socks = DFSVertex("socks")
watch = DFSVertex("watch")
shoes = DFSVertex("shoes")
shirt = DFSVertex("shirt")
tie = DFSVertex("tie")
jacket = DFSVertex("jacket")

graph = DirectedGraph()

graph.add(underpants, pants)
graph.add(underpants, shoes)
graph.add(socks, shoes)
graph.add(pants, shoes)
graph.add(pants, belt)
graph.add(shirt, belt)
graph.add(shirt, tie)
graph.add(tie, jacket)
graph.add(belt, jacket)
graph.vertices.add(watch)
search = DepthFirstSearch(graph)
search()

topologically_sorted = reversed(sorted(graph.vertices, key=lambda v: v.finished))
for node in topologically_sorted:
    predecessor = node.predecessor.identifier if node.predecessor is not None else "None"
    print(f"{node.identifier} \tfinished: {node.finished}\t predecessor: {predecessor}")
watch   finished: 18     predecessor: None
shirt   finished: 16     predecessor: None
tie     finished: 15     predecessor: shirt
socks   finished: 12     predecessor: None
underpants      finished: 10     predecessor: None
pants   finished: 9      predecessor: underpants
belt    finished: 8      predecessor: pants
jacket  finished: 7      predecessor: belt
shoes   finished: 4      predecessor: pants

This differs from the CLRS solution. The ordering depends on the order in which the edges are encountered (in this case it means the order in which the edges are added to the graph) so the sort won't always be the same. The important feature is that no item later in the list needs to come before any that precede it.

Graphs: Depth-First-Search

Imports and Setup

# python
from __future__ import annotations
from enum import Enum

# from pypi
from attrs import define

# this projects
from bowling.data_structures.graphs import graph

DFS Vertex

The Vertices used in the Depth-First Search also keep track of the "times" so it has two more attributes than our original vertex - discovered and finished. Although, as with color and all the other attributes, we could just let the program stick the attributes into the object, I thought it'd be better to create a new class to make it more obvious that there's a difference. I couldn't get the sets to add the vertices when I used inheritance so I'm going to start from scratch instead of inheriting from the original Vertex.

@define(eq=False)
class DFSVertex(graph.Vertex):
    """The Depth-First Search Vertex

    Args:
     identifier: something to identify the node
     color: the 'discovery' state of the node
     distance: The number of edges to the root
     predecessor: The 'parent' of the node in a tree
     discovered: when the node was discovered
     finished: when the node's adjacent nodes where checked
    """
    discovered: int = None
    finished: int = None

    def __str__(self) -> str:
        return (super().__str__() + 
        f", discovered: {self.discovered}, finished: {self.finished}")

Directed Graph

class DirectedGraph(graph.Graph):
    """A Directed Graph"""    

Add

def add(self, source: DFSVertex, target: DFSVertex) -> None:
    """Add an edge from source to target

    Args:
     source: the vertex where the edge starts
     target: the vertex where the edge ends
    """
    self.vertices.add(source)
    self.vertices.add(target)
    self.adjacent[source].add(target)
    return

Depth First Search

The Class

@define
class DepthFirstSearch:
    """depth-first-searcher

    Args:
     graph: the graph to process
    """
    graph: graph.Graph
    time: int=0

The Call

def __call__(self) -> None:
    """Performs the depth-first-search"""
    for vertex in self.graph.vertices:
        vertex.color = graph.Color.WHITE
        vertex.predecessor = None
    self.time = 0
    for vertex in self.graph.vertices:
        if vertex.color is graph.Color.WHITE:
            self.visit_adjacent(vertex)
    return

Visiting Adjacent Nodes

def visit_adjacent(self, vertex: DFSVertex) -> None:
    """Visit the nodes adjacent to the given node

    Args:
     vertex: vertex whose adjacency to visit
    """
    self.time += 1
    vertex.discovered = self.time
    vertex.color = graph.Color.GRAY
    for neighbor in self.graph.adjacent[vertex]:
        if neighbor.color is graph.Color.WHITE:
            neighbor.predecessor = vertex
            self.visit_adjacent(neighbor)
    vertex.color = graph.Color.BLACK
    self.time += 1
    vertex.finished = self.time
    return

Testing It

# pypi
from expects import be, equal, expect
# software under test
from bowling.data_structures.graphs import graph
from bowling.data_structures.graphs.depth_first_search import (
    DepthFirstSearch,
    DFSVertex,
    DirectedGraph
)

graph_1 = DirectedGraph()
node_u = DFSVertex(identifier="u")
node_v = DFSVertex(identifier="v")
node_w = DFSVertex(identifier="w")
node_x = DFSVertex(identifier="x")
node_y = DFSVertex(identifier="y")
node_z = DFSVertex(identifier="z")

graph_1.add(node_u, node_v)
graph_1.add(node_u, node_x)
graph_1.add(node_x, node_v)
graph_1.add(node_v, node_y)
graph_1.add(node_y, node_x)
graph_1.add(node_w, node_y)
graph_1.add(node_w, node_z)
graph_1.add(node_z, node_z)

searcher = DepthFirstSearch(graph=graph_1)
searcher()
for node in graph_1.vertices:
    expect(node.color).to(be(graph.Color.BLACK))

expect(node_u.discovered).to(equal(1))
expect(node_u.finished).to(equal(8))

expect(node_v.discovered).to(equal(2))
expect(node_v.finished).to(equal(7))

expect(node_w.discovered).to(equal(9))
expect(node_w.finished).to(equal(12))

expect(node_x.discovered).to(equal(4))
expect(node_x.finished).to(equal(5))

expect(node_y.discovered).to(equal(3))
expect(node_y.finished).to(equal(6))

expect(node_z.discovered).to(equal(10))
expect(node_z.finished).to(equal(11))

expect(node_u.predecessor).to(be(None))
expect(node_v.predecessor).to(be(node_u))
expect(node_y.predecessor).to(be(node_v))
expect(node_x.predecessor).to(be(node_y))
expect(node_z.predecessor).to(be(node_w))

expect(node_w.predecessor).to(be(None))

Graphs: Breadth-First-Search

Breadth First Search

Imports

# python
from __future__ import annotations
from queue import Queue

# pypi
from attrs import define

# this project
from bowling.data_structures.graphs.graph import Graph, Vertex
from bowling.data_structures.graphs import graph

Constants

INFINITY = float("inf")

The Class

@define
class BreadthFirstSearch:
    """Creates a shortest-path tree from a source node to all reachable nodes

    Note:
     The vertices should be BFSVertex instances

    Args:
     graph: the graph with the adjacency dict for all the vertices
    """
    graph: Graph

The Call

def __call__(self, source: BFSVertex) -> None:
    """Does the breadth first search to create the shortest paths tree"""
    # reset all the search attributes
    for vertex in set(self.graph.adjacent) - {source}:
        vertex.color, vertex.distance, vertex.predecessor = (
            graph.Color.WHITE, INFINITY, None)

    source.color, source.distance, source.predecessor = (
        graph.Color.GRAY, 0, None)
    queue = Queue()
    queue.put(source)

    while not queue.empty():
        predecessor = queue.get()
        for vertex in self.graph.adjacent[predecessor]:
            if vertex.color is graph.Color.WHITE:
                vertex.color, vertex.distance, vertex.predecessor = (
                    graph.Color.GRAY, predecessor.distance + 1, predecessor)
                queue.put(vertex)
        predecessor.color = graph.Color.BLACK
    return

Testing

# pypi
from collections import defaultdict
from expects import be, be_none, equal, expect
from pathlib import Path

import networkx

# software under test
from bowling.data_structures.graphs import graph
from bowling.data_structures.graphs.breadth_first_search import (
    BreadthFirstSearch)
import bowling.data_structures.graphs.breadth_first_search as bfs

test = graph.Graph()
source = BFSVertex(identifier="s")
v1 = BFSVertex(identifier=1)
v2 = BFSVertex(identifier=2)
v3 = BFSVertex(identifier=3)

test.add(v1, v2)
test.add(v2, source)
test.add(v3, source)

search = BreadthFirstSearch(test)
search(source)

for node in (source, v1, v2, v3):
    expect(node.color).to(be(graph.Color.BLACK))
expect(source.predecessor).to(be_none)
expect(source.distance).to(equal(0))

expect(v3.predecessor).to(be(source))
expect(v3.distance).to(be(1))

expect(v2.predecessor).to(be(source))
expect(v2.distance).to(be(1))

expect(v1.predecessor).to(be(v2))
expect(v1.distance).to(be(2))

CLRS Example

We'll start with the same Graph from the previous example and add more nodes.

v4 = BFSVertex(identifier=4)
v5 = BFSVertex(identifier=5)
v6 = BFSVertex(identifier=6)
v7 = BFSVertex(identifier=7)

test.add(v3, v4)
test.add(v3, v5)
test.add(v4, v5)
test.add(v4, v6)
test.add(v5, v6)
test.add(v5, v7)
test.add(v6, v7)
plot_adjacent = {key.identifier: item for key, item in test.adjacent.items()}
for key, adjacents in plot_adjacent.items():
    plot_adjacent[key] = [adjacent.identifier for adjacent in adjacents]

plot_graph = networkx.Graph(plot_adjacent)
pydot_graph = networkx.nx_pydot.to_pydot(plot_graph)

SLUG = "graphs-breadth-first-search"
PATH = Path(f"files/posts/{SLUG}/")
if not PATH.is_dir():
    PATH.mkdir()

pydot_graph.write_png(PATH/"clrs_example.png")

nil

search(source)
nodes = networkx.Graph()
for node in test.adjacent.keys():
    if node.predecessor is not None:        
        nodes.add_edge(node.identifier, node.predecessor.identifier)
pydot_graph = networkx.nx_pydot.to_pydot(nodes)
pydot_graph.write_png(PATH/"clrs_example_searched.png")

nil