The Maximum-Subarray Problem

Imports and Setup

Imports

# python
from collections import namedtuple
from datetime import datetime
from functools import partial
from math import inf as infinity
from math import log2
from typing import Sequence, Tuple

import random

# pypi
import altair
import pandas

# my stuff
from graeae import Timer
from graeae.visualization.altair_helpers import output_path, save_chart

Set Up

TIMER = Timer()

SLUG = "the-maximum-subarray-problem"
OUTPUT_PATH = output_path(SLUG)
save_it = partial(save_chart, output_path=OUTPUT_PATH)

MAXIMUM_SUBARRAY = Tuple[int, int, float, int]
Case = namedtuple("Case", ["inputs", "left", "right", "max"])

Plot = namedtuple("Plot", ["height", "width"], defaults=[700, 1000])
PLOT = Plot()

Output = namedtuple("Output", ["start", "end", "gain", "runtime"])

Implementations

The motivation given by the text is that we have some data that varies over time and we want to find the pair of points that maximizes the gain from the first point to the second.

Test Case

CASE_ONE = Case([100, 113, 110, 85, 105, 102, 86, 63, 81,
                 101, 94, 106, 101, 79, 94, 90, 97],
                left=7,
                right=11,
                max=43)
x = CASE_ONE.inputs
changes = [0] + [x[index + 1] - x[index] for index in range(len(x) - 1)]

frame = pandas.DataFrame({"Numbers": x,
                          "Change": changes})
melted = frame.reset_index()
melted = melted.melt(id_vars=["index"], value_vars=["Numbers", "Change"] , var_name="Data Source", value_name="Data")
chart = altair.Chart(melted).mark_line().encode(
    x=altair.X("index", type="quantitative"),
    y=altair.Y("Data", type="quantitative"),
    color="Data Source").properties(
        title="Sample Input One", width=PLOT.width, height=525)

save_it(chart, "sample-input-one")

Figure Missing

Brute Force

The first way we're going to find the maximum gain is using Brute Force - we're going to calculate the gain for each pair of points and picking the pair with the best gain. Since this involves checking every pair we're essentially looking at a Handshake Problem so the number of comparisons should be \(\frac{n(n - 1)}{2}\) or \(\Theta(n^2)\).

def brute_force_maximum_subarray(source: Sequence) -> Output:
    """Finds the subarray with maximum gain

    Args:
     source: sequence with sub-array to maximize

    Returns:
     (left-max-index, right-max-index, gain, count)
    """
    best_left = best_right = None
    best_gain = -infinity
    count = 0
    for left in range(len(source) - 1):
        for right in range(left + 1, len(source)):
            count += 1
            gain = source[right] - source[left]
            if gain > best_gain:
                best_left, best_right, best_gain = left, right, gain
    return Output(best_left, best_right, best_gain, count)
n = len(CASE_ONE.inputs)
print(f"n = {n}")
output = brute_force_maximum_subarray(CASE_ONE.inputs)
assert output.start == CASE_ONE.left
assert output.end == CASE_ONE.right
assert output.gain == CASE_ONE.max
print(f"count = {output.runtime}")
print(f"n(n-1)/2 = {int((n * (n - 1))/2)}")
print(f"n^2 = {n**2}")
n = 17
count = 136
n(n-1)/2 = 136
n^2 = 289

Recursive

Looking at the brute-force solution you might think - "That was a check of point-pairs, why is this called 'max-subarray'?". Well, we're going to improve upon the brute-force implementation by doing a data-transformation. If you look at the data that we set up you might notice that there's the point values and then there's the change in value from one point to another. The changes are there because we can re-think our problem not as the maximum of increases between pair points, but instead as the maximum of point-change-sums.

In our CASE_ONE inputs, the biggest gain came from point 7 (with a value of 63) and point 11 (with a value of 106). If we look at the difference between consecutive points from points 7 to 11 we get -23, 18, 20, -7, 12 which sums to 20, which is the maximum sub-array total for this data set. To solve this problem we're going to need to find this sub-array. Since we stated that we're going to use a divide-and-conquer approach we know that we're going to repeatedly break the inputs up into (in this case) two separate sub-sets to solve. Every time we break the inputs into two groups we end up with three possibilities as to where the maximum sub-array lies:

  • in the left half
  • in the right half
  • crossing from the left half over the split to the right half

Max-Crossing-Subarray

This is a function that will find the best subarray in the left and in the right halves of the sub-array and combines them to find the gain across the mid-point of the subarray.

def find_max_crossing_subarray(source: Sequence,
                               low: int, mid: int,
                               high: int) -> Output:
    """Find the max subarray that crosses the mid-point

    Args:
     source: the array to search for the sub-sequence in
     low: the lower-bound for the indices to search within
     mid: the mid-index between low and high
     high: the upper-bound for the indices to search within

    Returns:
     left-index, right-index, gain, count
    """
    count, best_left, best_right= 0, None, None
    left_gain = right_gain = -infinity
    gain = 0
    for left in range(mid, low, -1):
        count += 1
        gain += source[left]
        if gain > left_gain:
            left_gain = gain
            best_left = left
    gain = 0
    for right in range(mid + 1, high):
        count += 1
        gain += source[right]
        if gain > right_gain:
            right_gain = gain
            best_right = right
    return Output(best_left, best_right, left_gain + right_gain, count)

Max-Subarray

The find_maximum_subarray function finds the start and end indices for the best gain.

def find_maximum_subarray(source: Sequence,
                          low: int, high: int) -> Output:
    """Find the sub-array that maximizes gain

    Args:
     source: sequence to maximize
     low: lower-bound for indices in source
     high: upper-bound for indices in source

    Returns:
     left-index, right-index, gain, count
    """
    if high == low:
        start, end, gain, count = low, high, source[low], 1

    else:
        mid = (low + high)//2

        left = find_maximum_subarray(source, low, mid)
        right = find_maximum_subarray(source, mid + 1, high)
        cross_mid = find_max_crossing_subarray(source, low, mid, high)

        count = left.runtime + right.runtime + cross_mid.runtime

        best_gain = max(left.gain, right.gain, cross_mid.gain)

        if left.gain == best_gain:
            start, end, gain, count = left.start, left.end, left.gain, count
        elif right.gain == best_gain:
            start, end, gain, count = right.start, right.end, right.gain, count
        else:
            start, end, gain, count = (cross_mid.start, cross_mid.end,
                                       cross_mid.gain, count)
    return Output(start, end, gain, count)

The maximum_subarray converts our list of values to a list of changes between consecutive values so that we can use our divide-and-conquer function.

def maximum_subarray(source: Sequence) -> Output:
    """Finds the sub-array with maximum gain

    Args:
     source: array to maximize

    Returns:
     left-index, right-index, gain, count
    """
    start, end = 0, len(source) - 1
    changes = [source[index + 1] - source[index] for index in range(end)]
    output = find_maximum_subarray(changes, start, end - 1)

    # our 'changes' has one fewer entry than the original list so up the right index by 1
    end = output.end + 1
    return Output(output.start, end, output.gain, output.runtime)
n = len(CASE_ONE.inputs)
print(f"n = {n}")
output = maximum_subarray(CASE_ONE.inputs)
assert output.start == CASE_ONE.left, f"Expected: {CASE_ONE.left}, Actual: {output.start}"
assert output.end == CASE_ONE.right, f"Expected: {CASE_ONE.right}, Actual: {output.end}"
assert output.gain == CASE_ONE.max, f"Expected: {CASE_ONE.max}, Actual: {output.gain}"
print(f"left: {output.start}, right: {output.end}, gain: {output.gain}")
print(f"count = {output.runtime}")
print(f"n(n-1)/2 = {int((n * (n - 1))/2)}")
print(f"n log n : {n * log2(n): 0.2f}")
print(f"n^2 = {n**2}")
n = 17
left: 7, right: 11, gain: 43
count = 50
n(n-1)/2 = 136
n log n :  69.49
n^2 = 289

By transforming the problem to one that lets us use divide-and-conquer we reduced the number of comparisons from 136 to 50. More generally, if we look at find_maximum_subarray we see that we're splitting the input in half before each of the recursive calls so we're going to make \(log_2 n\) splits and at each level of the recursions tree we're going to have \(2n\) inputs (left and right are each \(\frac{1}{2} n\) and cross_mid uses \(n\)) so we're going from \(\Theta(n^2)\) for the brute-force verios to \(\Theta(n \log n)\) for the divide-and-conquer version.

Alternate Version

Let's try another version that uses the idea of summing the changes between consecutive points but doesn't use recursion (see Wikipedia: Kardane's algorithm).

def max_subarray_2(source: Sequence) -> Output:
    """Gets the maximal subarray

    This is an alternate version that doesn't use recursion or brute-force

    Args:
     source: sequence to maximize

    Returns:
     left-index, right-index, gain, count
    """
    count = 1
    best_total = -infinity
    best_start = best_end = 0
    current_total = 0

    changes = [source[index + 1] - source[index] for index in range(len(source) - 1)]
    for here, value_here in enumerate(changes):
        count += 1
        if current_total <= 0:
            current_start = here
            current_total = value_here
        else:
            current_total += value_here

        if current_total > best_total:
            best_total = current_total
            best_start = current_start
            best_end = here + 1
    return Output(best_start, best_end, best_total, count)
n = len(CASE_ONE.inputs)
print(f"n = {n}")
left, right, gain, count = max_subarray_2(CASE_ONE.inputs)
assert left == CASE_ONE.left, f"Expected: {CASE_ONE.left}, Actual: {left}"
assert right == CASE_ONE.right, f"Expected: {CASE_ONE.right}, Actual: {right}"
assert gain == CASE_ONE.max, f"Expected: {CASE_ONE.max}, Actual: {gain}"
print(f"left: {left}, right: {right}, gain: {gain}")

print(f"Count: {count}")
n = 17
left: 7, right: 11, gain: 43
Count: 17

So, without using divide-and-conquer we get an even better runtime - it was the data transformation that was the most valuable part in improving the performance.

Comparing the Methods

def run_thing(thing, inputs, name):
    print(f"*** {name} ***")
    start = datetime.now()
    runtime = thing(inputs).runtime
    stop = datetime.now()
    print(f"\tElapsed Time: {stop - start}")
    return runtime

brutes = []
divided = []
linear = []
counts = []

UPPER = 6
for exponent in range(1, UPPER):
    count = 10**exponent
    title = f"n = {count:,}"
    underline = "=" * len(title)
    print(f"\n{title}")
    print(underline)
    inputs = list(range(count))
    inputs = random.choices(inputs, k=count)
    brutes.append(run_thing(brute_force_maximum_subarray, inputs, "Brute Force"))
    divided.append(run_thing(maximum_subarray, inputs, "Divide and Conquer"))
    linear.append(run_thing(max_subarray_2, inputs, "Linear"))
    counts.append(count)

n = 10
======
*** Brute Force ***
        Elapsed Time: 0:00:00.001054
*** Divide and Conquer ***
        Elapsed Time: 0:00:00.000249
*** Linear ***
        Elapsed Time: 0:00:00.000127

n = 100
=======
*** Brute Force ***
        Elapsed Time: 0:00:00.002403
*** Divide and Conquer ***
        Elapsed Time: 0:00:00.002198
*** Linear ***
        Elapsed Time: 0:00:00.000175

n = 1,000
=========
*** Brute Force ***
        Elapsed Time: 0:00:00.002527
*** Divide and Conquer ***
        Elapsed Time: 0:00:00.017359
*** Linear ***
        Elapsed Time: 0:00:00.002204

n = 10,000
==========
*** Brute Force ***
        Elapsed Time: 0:00:00.071900
*** Divide and Conquer ***
        Elapsed Time: 0:00:00.044104
*** Linear ***
        Elapsed Time: 0:00:00.000224

n = 100,000
===========
*** Brute Force ***
        Elapsed Time: 0:00:07.506323
*** Divide and Conquer ***
        Elapsed Time: 0:00:00.023113
*** Linear ***
        Elapsed Time: 0:00:00.001233

Plot It

runtimes = pandas.DataFrame({"Brute Force": brutes,
                             "Divide and Conquer": divided,
                             "Linear": linear,
                             "Input Size": counts})

melted = runtimes.melt(id_vars=["Input Size"],
                       value_vars=["Brute Force", "Divide and Conquer", "Linear"],
                       var_name="Algorithm", value_name="Comparisons")

chart = altair.Chart(melted).mark_line(point=altair.OverlayMarkDef()).encode(
    x=altair.X("Input Size", type="quantitative"),
    y=altair.Y("Comparisons", type="quantitative"),
    color="Algorithm"
).properties(
    title="Comparison Counts", width=PLOT.width, height=525)

save_it(chart, "algorithm_comparisons")

Figure Missing

Stepping up to a million with Brute Force takes too long (I've never let it run to the end to see how long). Let's see if the linear and divide and conquer can handle it, though.

linear_more = linear[:]
divided_more = divided[:]
counts_more = counts[:]

for exponent in range(6, 10):
    count = 10**exponent
    title = f"n = {count:,}"
    underline = "=" * len(title)
    print(f"\n{title}")
    print(underline)

    inputs = list(range(count))
    inputs = random.choices(inputs, k=count)

    linear_more.append(run_thing(max_subarray_2, inputs, "Linear"))
    divided_more.append(run_thing(maximum_subarray, inputs, "Divide and Conquer"))
    counts_more.append(count)

n = 1,000,000
=============
*** Linear ***
        Elapsed Time: 0:00:00.008010
*** Divide and Conquer ***
        Elapsed Time: 0:00:00.294771

n = 10,000,000
==============
*** Linear ***
        Elapsed Time: 0:00:00.169138
*** Divide and Conquer ***
        Elapsed Time: 0:00:02.018406

n = 100,000,000
===============
*** Linear ***
        Elapsed Time: 0:00:00.850640
*** Divide and Conquer ***
        Elapsed Time: 0:00:21.290975

n = 1,000,000,000
=================
*** Linear ***
        Elapsed Time: 0:00:07.666758
*** Divide and Conquer ***
        Elapsed Time: 0:03:32.097027

They do pretty well, it seems to be the brute force that dies out.

longtimes = pandas.DataFrame({"Linear": linear_more,
                              "Divide & Conquer": divided_more,
                              "Input Size": counts_more})

melted = longtimes.melt(id_vars=["Input Size"],
                       value_vars=["Divide & Conquer", "Linear"],
                       var_name="Algorithm", value_name="Comparisons")
chart = altair.Chart(melted).mark_line(point=altair.OverlayMarkDef()).encode(
    x=altair.X("Input Size", type="quantitative"),
    y=altair.Y("Comparisons", type="quantitative"),
    color="Algorithm").properties(
        title="Comparison Counts", width=PLOT.width, height=525)

save_it(chart, "longer_algorithm_comparisons")

Figure Missing