The Maximum-Subarray Problem
Table of Contents
Imports and Setup
Imports
# python
from collections import namedtuple
from datetime import datetime
from functools import partial
from math import inf as infinity
from math import log2
from typing import Sequence, Tuple
import random
# pypi
import altair
import pandas
# my stuff
from graeae import Timer
from graeae.visualization.altair_helpers import output_path, save_chart
Set Up
TIMER = Timer()
SLUG = "the-maximum-subarray-problem"
OUTPUT_PATH = output_path(SLUG)
save_it = partial(save_chart, output_path=OUTPUT_PATH)
MAXIMUM_SUBARRAY = Tuple[int, int, float, int]
Case = namedtuple("Case", ["inputs", "left", "right", "max"])
Plot = namedtuple("Plot", ["height", "width"], defaults=[700, 1000])
PLOT = Plot()
Output = namedtuple("Output", ["start", "end", "gain", "runtime"])
Implementations
The motivation given by the text is that we have some data that varies over time and we want to find the pair of points that maximizes the gain from the first point to the second.
Test Case
CASE_ONE = Case([100, 113, 110, 85, 105, 102, 86, 63, 81,
101, 94, 106, 101, 79, 94, 90, 97],
left=7,
right=11,
max=43)
x = CASE_ONE.inputs
changes = [0] + [x[index + 1] - x[index] for index in range(len(x) - 1)]
frame = pandas.DataFrame({"Numbers": x,
"Change": changes})
melted = frame.reset_index()
melted = melted.melt(id_vars=["index"], value_vars=["Numbers", "Change"] , var_name="Data Source", value_name="Data")
chart = altair.Chart(melted).mark_line().encode(
x=altair.X("index", type="quantitative"),
y=altair.Y("Data", type="quantitative"),
color="Data Source").properties(
title="Sample Input One", width=PLOT.width, height=525)
save_it(chart, "sample-input-one")
Brute Force
The first way we're going to find the maximum gain is using Brute Force - we're going to calculate the gain for each pair of points and picking the pair with the best gain. Since this involves checking every pair we're essentially looking at a Handshake Problem so the number of comparisons should be \(\frac{n(n - 1)}{2}\) or \(\Theta(n^2)\).
def brute_force_maximum_subarray(source: Sequence) -> Output:
"""Finds the subarray with maximum gain
Args:
source: sequence with sub-array to maximize
Returns:
(left-max-index, right-max-index, gain, count)
"""
best_left = best_right = None
best_gain = -infinity
count = 0
for left in range(len(source) - 1):
for right in range(left + 1, len(source)):
count += 1
gain = source[right] - source[left]
if gain > best_gain:
best_left, best_right, best_gain = left, right, gain
return Output(best_left, best_right, best_gain, count)
n = len(CASE_ONE.inputs)
print(f"n = {n}")
output = brute_force_maximum_subarray(CASE_ONE.inputs)
assert output.start == CASE_ONE.left
assert output.end == CASE_ONE.right
assert output.gain == CASE_ONE.max
print(f"count = {output.runtime}")
print(f"n(n-1)/2 = {int((n * (n - 1))/2)}")
print(f"n^2 = {n**2}")
n = 17 count = 136 n(n-1)/2 = 136 n^2 = 289
Recursive
Looking at the brute-force solution you might think - "That was a check of point-pairs, why is this called 'max-subarray'?". Well, we're going to improve upon the brute-force implementation by doing a data-transformation. If you look at the data that we set up you might notice that there's the point values and then there's the change in value from one point to another. The changes are there because we can re-think our problem not as the maximum of increases between pair points, but instead as the maximum of point-change-sums.
In our CASE_ONE
inputs, the biggest gain came from point 7 (with a value of 63) and point 11 (with a value of 106). If we look at the difference between consecutive points from points 7 to 11 we get -23, 18, 20, -7, 12 which sums to 20, which is the maximum sub-array total for this data set. To solve this problem we're going to need to find this sub-array. Since we stated that we're going to use a divide-and-conquer approach we know that we're going to repeatedly break the inputs up into (in this case) two separate sub-sets to solve. Every time we break the inputs into two groups we end up with three possibilities as to where the maximum sub-array lies:
- in the left half
- in the right half
- crossing from the left half over the split to the right half
Max-Crossing-Subarray
This is a function that will find the best subarray in the left and in the right halves of the sub-array and combines them to find the gain across the mid-point of the subarray.
def find_max_crossing_subarray(source: Sequence,
low: int, mid: int,
high: int) -> Output:
"""Find the max subarray that crosses the mid-point
Args:
source: the array to search for the sub-sequence in
low: the lower-bound for the indices to search within
mid: the mid-index between low and high
high: the upper-bound for the indices to search within
Returns:
left-index, right-index, gain, count
"""
count, best_left, best_right= 0, None, None
left_gain = right_gain = -infinity
gain = 0
for left in range(mid, low, -1):
count += 1
gain += source[left]
if gain > left_gain:
left_gain = gain
best_left = left
gain = 0
for right in range(mid + 1, high):
count += 1
gain += source[right]
if gain > right_gain:
right_gain = gain
best_right = right
return Output(best_left, best_right, left_gain + right_gain, count)
Max-Subarray
The find_maximum_subarray
function finds the start and end indices for the best gain.
def find_maximum_subarray(source: Sequence,
low: int, high: int) -> Output:
"""Find the sub-array that maximizes gain
Args:
source: sequence to maximize
low: lower-bound for indices in source
high: upper-bound for indices in source
Returns:
left-index, right-index, gain, count
"""
if high == low:
start, end, gain, count = low, high, source[low], 1
else:
mid = (low + high)//2
left = find_maximum_subarray(source, low, mid)
right = find_maximum_subarray(source, mid + 1, high)
cross_mid = find_max_crossing_subarray(source, low, mid, high)
count = left.runtime + right.runtime + cross_mid.runtime
best_gain = max(left.gain, right.gain, cross_mid.gain)
if left.gain == best_gain:
start, end, gain, count = left.start, left.end, left.gain, count
elif right.gain == best_gain:
start, end, gain, count = right.start, right.end, right.gain, count
else:
start, end, gain, count = (cross_mid.start, cross_mid.end,
cross_mid.gain, count)
return Output(start, end, gain, count)
The maximum_subarray
converts our list of values to a list of changes between consecutive values so that we can use our divide-and-conquer function.
def maximum_subarray(source: Sequence) -> Output:
"""Finds the sub-array with maximum gain
Args:
source: array to maximize
Returns:
left-index, right-index, gain, count
"""
start, end = 0, len(source) - 1
changes = [source[index + 1] - source[index] for index in range(end)]
output = find_maximum_subarray(changes, start, end - 1)
# our 'changes' has one fewer entry than the original list so up the right index by 1
end = output.end + 1
return Output(output.start, end, output.gain, output.runtime)
n = len(CASE_ONE.inputs)
print(f"n = {n}")
output = maximum_subarray(CASE_ONE.inputs)
assert output.start == CASE_ONE.left, f"Expected: {CASE_ONE.left}, Actual: {output.start}"
assert output.end == CASE_ONE.right, f"Expected: {CASE_ONE.right}, Actual: {output.end}"
assert output.gain == CASE_ONE.max, f"Expected: {CASE_ONE.max}, Actual: {output.gain}"
print(f"left: {output.start}, right: {output.end}, gain: {output.gain}")
print(f"count = {output.runtime}")
print(f"n(n-1)/2 = {int((n * (n - 1))/2)}")
print(f"n log n : {n * log2(n): 0.2f}")
print(f"n^2 = {n**2}")
n = 17 left: 7, right: 11, gain: 43 count = 50 n(n-1)/2 = 136 n log n : 69.49 n^2 = 289
By transforming the problem to one that lets us use divide-and-conquer we reduced the number of comparisons from 136 to 50. More generally, if we look at find_maximum_subarray
we see that we're splitting the input in half before each of the recursive calls so we're going to make \(log_2 n\) splits and at each level of the recursions tree we're going to have \(2n\) inputs (left
and right
are each \(\frac{1}{2} n\) and cross_mid
uses \(n\)) so we're going from \(\Theta(n^2)\) for the brute-force verios to \(\Theta(n \log n)\) for the divide-and-conquer version.
Alternate Version
Let's try another version that uses the idea of summing the changes between consecutive points but doesn't use recursion (see Wikipedia: Kardane's algorithm).
def max_subarray_2(source: Sequence) -> Output:
"""Gets the maximal subarray
This is an alternate version that doesn't use recursion or brute-force
Args:
source: sequence to maximize
Returns:
left-index, right-index, gain, count
"""
count = 1
best_total = -infinity
best_start = best_end = 0
current_total = 0
changes = [source[index + 1] - source[index] for index in range(len(source) - 1)]
for here, value_here in enumerate(changes):
count += 1
if current_total <= 0:
current_start = here
current_total = value_here
else:
current_total += value_here
if current_total > best_total:
best_total = current_total
best_start = current_start
best_end = here + 1
return Output(best_start, best_end, best_total, count)
n = len(CASE_ONE.inputs)
print(f"n = {n}")
left, right, gain, count = max_subarray_2(CASE_ONE.inputs)
assert left == CASE_ONE.left, f"Expected: {CASE_ONE.left}, Actual: {left}"
assert right == CASE_ONE.right, f"Expected: {CASE_ONE.right}, Actual: {right}"
assert gain == CASE_ONE.max, f"Expected: {CASE_ONE.max}, Actual: {gain}"
print(f"left: {left}, right: {right}, gain: {gain}")
print(f"Count: {count}")
n = 17 left: 7, right: 11, gain: 43 Count: 17
So, without using divide-and-conquer we get an even better runtime - it was the data transformation that was the most valuable part in improving the performance.
Comparing the Methods
def run_thing(thing, inputs, name):
print(f"*** {name} ***")
start = datetime.now()
runtime = thing(inputs).runtime
stop = datetime.now()
print(f"\tElapsed Time: {stop - start}")
return runtime
brutes = []
divided = []
linear = []
counts = []
UPPER = 6
for exponent in range(1, UPPER):
count = 10**exponent
title = f"n = {count:,}"
underline = "=" * len(title)
print(f"\n{title}")
print(underline)
inputs = list(range(count))
inputs = random.choices(inputs, k=count)
brutes.append(run_thing(brute_force_maximum_subarray, inputs, "Brute Force"))
divided.append(run_thing(maximum_subarray, inputs, "Divide and Conquer"))
linear.append(run_thing(max_subarray_2, inputs, "Linear"))
counts.append(count)
n = 10 ====== *** Brute Force *** Elapsed Time: 0:00:00.001054 *** Divide and Conquer *** Elapsed Time: 0:00:00.000249 *** Linear *** Elapsed Time: 0:00:00.000127 n = 100 ======= *** Brute Force *** Elapsed Time: 0:00:00.002403 *** Divide and Conquer *** Elapsed Time: 0:00:00.002198 *** Linear *** Elapsed Time: 0:00:00.000175 n = 1,000 ========= *** Brute Force *** Elapsed Time: 0:00:00.002527 *** Divide and Conquer *** Elapsed Time: 0:00:00.017359 *** Linear *** Elapsed Time: 0:00:00.002204 n = 10,000 ========== *** Brute Force *** Elapsed Time: 0:00:00.071900 *** Divide and Conquer *** Elapsed Time: 0:00:00.044104 *** Linear *** Elapsed Time: 0:00:00.000224 n = 100,000 =========== *** Brute Force *** Elapsed Time: 0:00:07.506323 *** Divide and Conquer *** Elapsed Time: 0:00:00.023113 *** Linear *** Elapsed Time: 0:00:00.001233
Plot It
runtimes = pandas.DataFrame({"Brute Force": brutes,
"Divide and Conquer": divided,
"Linear": linear,
"Input Size": counts})
melted = runtimes.melt(id_vars=["Input Size"],
value_vars=["Brute Force", "Divide and Conquer", "Linear"],
var_name="Algorithm", value_name="Comparisons")
chart = altair.Chart(melted).mark_line(point=altair.OverlayMarkDef()).encode(
x=altair.X("Input Size", type="quantitative"),
y=altair.Y("Comparisons", type="quantitative"),
color="Algorithm"
).properties(
title="Comparison Counts", width=PLOT.width, height=525)
save_it(chart, "algorithm_comparisons")
Stepping up to a million with Brute Force takes too long (I've never let it run to the end to see how long). Let's see if the linear and divide and conquer can handle it, though.
linear_more = linear[:]
divided_more = divided[:]
counts_more = counts[:]
for exponent in range(6, 10):
count = 10**exponent
title = f"n = {count:,}"
underline = "=" * len(title)
print(f"\n{title}")
print(underline)
inputs = list(range(count))
inputs = random.choices(inputs, k=count)
linear_more.append(run_thing(max_subarray_2, inputs, "Linear"))
divided_more.append(run_thing(maximum_subarray, inputs, "Divide and Conquer"))
counts_more.append(count)
n = 1,000,000 ============= *** Linear *** Elapsed Time: 0:00:00.008010 *** Divide and Conquer *** Elapsed Time: 0:00:00.294771 n = 10,000,000 ============== *** Linear *** Elapsed Time: 0:00:00.169138 *** Divide and Conquer *** Elapsed Time: 0:00:02.018406 n = 100,000,000 =============== *** Linear *** Elapsed Time: 0:00:00.850640 *** Divide and Conquer *** Elapsed Time: 0:00:21.290975 n = 1,000,000,000 ================= *** Linear *** Elapsed Time: 0:00:07.666758 *** Divide and Conquer *** Elapsed Time: 0:03:32.097027
They do pretty well, it seems to be the brute force that dies out.
longtimes = pandas.DataFrame({"Linear": linear_more,
"Divide & Conquer": divided_more,
"Input Size": counts_more})
melted = longtimes.melt(id_vars=["Input Size"],
value_vars=["Divide & Conquer", "Linear"],
var_name="Algorithm", value_name="Comparisons")
chart = altair.Chart(melted).mark_line(point=altair.OverlayMarkDef()).encode(
x=altair.X("Input Size", type="quantitative"),
y=altair.Y("Comparisons", type="quantitative"),
color="Algorithm").properties(
title="Comparison Counts", width=PLOT.width, height=525)
save_it(chart, "longer_algorithm_comparisons")