The Selection Problem
Table of Contents
What is the Selection Problem?
The Selection Problem involves finding the kth smallest item in a collection of n items. This item is called an Order Statistic.
Selection Vs Sorting
If the collection is already sorted then you can just pull the order statistic out directly (assuming it's accessible by its index) and if it isn't you could sort it first and then retrieve the order statistic. Since Mergesort and Heapsort have a runtime of \(\Theta(n \log n)\) we know that this would be the runtime using this method, but since we are only choosing one item we don't need to sort the collection, so we can find the item with less computation, and thus selection is its own problem.
Familiar Cases
While you might search for any ranked item (the third item, say), there are three special cases that are common enough to have been given names.
- the Minimum
- the Maximum
- the Median
Minimum and Maximum
The Minimum and Maximum are probably self-explanatory, but to be complete: the Minimum is the order statistic when \(k = 1\) and the Maximum is the order statistic when \(k=n\). As an example of the Selection Problem requiring fewer comparisons than the Sorting Problem we can look at finding the Minimum by traversing the array.
\begin{algorithm} \caption{Minimum} \begin{algorithmic} \INPUT An array of comparable items \OUTPUT The smallest item \PROCEDURE{Minimum}{$A$} \STATE minimum $\gets$ A[0] \FOR{$i \in \{ 1 \ldots A.length - 1\}$} \IF {minimum > A[i]} \STATE minimum $\gets$ A[i] \ENDIF \ENDFOR \ENDPROCEDURE \end{algorithmic} \end{algorithm}
Since we're traversing the collection once it will always have a runtime of \(\Theta(n)\). The Maximum can be found the same way but with the greater than sign in the if-statement.
The Median
While most people know what the median is (I once worked with an engineer who thought it was the middle item of the unsorted items, not the item whose value was in the middle, so it might not be true that all people know what it is) but since we're talking about Order Statistics and not Statistics like in "How to Lie With Statistics" we'll make it a little more confusing by using a similar but not exactly the same definition.
\[ k = \left \lceil \frac{n}{2} \right \rceil \]
This definition comes from Levitin. In his description he defines the median as the value with half the remaining values less than it and half the remaining values above it, which suggests that he's defining the case where there are an odd number of items only.
CLRS gives a two-part definition with the odd case:
\[ k = \frac{n + 1}{2} \]
and the case where there are an even number of items broken up into two:
\[ \textbf{Lower Median} = \left \lfloor \frac{n + 1}{2} \right \rfloor \\ \textbf{Upper Median} = \left \lceil \frac{n + 1}{2} \right \rceil \\ \]
With their median being the Lower Median. Despite the differences if you run through some numbers you can see that they're basically the same (adding 1 and then taking the floor after dividing by 2 is the same as the ceiling of dividing by 2 without adding 1).
def ceiling(a: int, b: int) -> int:
return -(a//-b)
for n in range(1, 10):
print(f"n={n}: \tLevitin: {ceiling(n, 2)}\tLower Median: {(n + 1)//2}")
n=1: Levitin: 1 Lower Median: 1 n=2: Levitin: 1 Lower Median: 1 n=3: Levitin: 2 Lower Median: 2 n=4: Levitin: 2 Lower Median: 2 n=5: Levitin: 3 Lower Median: 3 n=6: Levitin: 3 Lower Median: 3 n=7: Levitin: 4 Lower Median: 4 n=8: Levitin: 4 Lower Median: 4 n=9: Levitin: 5 Lower Median: 5
So we've defined the median but actually finding the median is a little more complicated than finding the minimum or maximum so I'll save that for later.
Heap!
Table of Contents
Beginning
What's a Heap?
This is a post that looks at Heaps. According to Introduction To Algorithms (CLRS), a heap is an array that can be thought of as a nearly complete binary tree which satisfies the Heap Property. There are actually two heap properties, one for a Max Heap and one for a Min Heap.
- Max-Heap Property: \(A[Parent(i)]) \ge A[i]\)
- Min-Heap Property: \(A[Parent(i)]) \le A[i]\)
Which means that for any node in a MaxHeap, its value is greater than or equal to that of any of its children, and for any node in a MinHeap, the node's value is less than or equal to its children.
The heap has (at least) two attributes.
- A.length: the number of elements in the array
- A.heap-size: the number of elements in the heap (not all elements in the array need to be in the heap)
The first element in the heap is the root (for the Max-Heap it is the largest element, for the Min-Heap it is the smallest).
Some Functions
For any Node in the Heap located at i:
\begin{algorithm} \caption{Parent} \begin{algorithmic} \INPUT The index of a Child Node \OUTPUT The index of the Child's Parent \PROCEDURE{Parent}{$i$} \RETURN {\(\left \lfloor \frac{i}{2} \right \rfloor\)} \ENDPROCEDURE \end{algorithmic} \end{algorithm}
\begin{algorithm} \caption{Left} \begin{algorithmic} \INPUT The index of a Parent Node \OUTPUT The index of the Parent's left child node \PROCEDURE{Left}{$i$} \RETURN 2i \ENDPROCEDURE \end{algorithmic} \end{algorithm}
\begin{algorithm} \caption{Right} \begin{algorithmic} \INPUT The index of a Parent Node \OUTPUT The index of the Parent's right child node \PROCEDURE{Right}{$i$} \RETURN 2i + 1 \ENDPROCEDURE \end{algorithmic} \end{algorithm}
These Functions show the relationship between the location of a node and its children in the array that makes up the heap.
Some Requirements
There are also two requirements:
- Shape Requirement: All levels but the last have to be complete (only the rightmost leaves can be incomplete)
- Dominance Requirement: Every node is greater than or equal to its children (in the Max-Heap, less than or equal for the Min-Heap)
That Dominance Requirement is also covered by the Heap Property.
Heap Properties
These are from Introduction to the Design & Analysis of Algorithms (Levitin).
- There exists exactly one essentially complete binary tree with n nodes. It's a binary tree so its height is \(\lfloor \log_2 n\rfloor\).
- The root of a (Max) Heap always has the largest element.
- A node of a heap with all its descendants is also a heap.
- A heap implemented as an array with the heap's elements stored at \(1 \ldots n\) can be constructed using:
- the parental node keys will be the first \(\lfloor \frac{n}{2} \rfloor\) elements while the leaf nodes will be in the last \(\lceil \frac{n}{2} \rceil\) positions
- The childern of a parent node at position i will be in positions 2i and 2i + 1.
- The parent of a child node at position i will be in position \(\lfloor \frac{i}{2}\rfloor\).
Property four is what was defined in the functions up above. Levitin also gives a slightly different variation on stating the heap property.
\[ H[i] \ge \max{H[2i], H[2i + 1]} \textrm{ for } i= 1, \ldots, \lfloor \frac{n}{2} \rfloor. \]
This is sort of an inversion of the Max-Heap Property stated above. While the Max-Heap Property was given as every node is less than or equal to its parent, this says that every parent is greater than or equal to its children.
The Max Heap
The Imports
# python
from collections.abc import MutableSequence
from functools import partial
from typing import Generator
import random
# pypi
from expects import contain_exactly, equal, expect, raise_error
# software under test
from bowling.data_structures.heap import MaxHeap
from graeae import EmbedHoloviews
SLUG = "max-heap"
path = f"files/posts/{SLUG}/"
Embed = partial(EmbedHoloviews, folder_path=path)
The Class
Imports
# pypi
# https://www.attrs.org/en/stable/index.html
from attrs import define
The Definition
Besides declaring the class definition, the MaxHeap will hold some constants to hopefully make the code easier to read.
@define
class MaxHeap:
"""Build and maintain a max-heap
If you pass in the heap as a list pre-pend it with Infinity
Otherwise use ~heap = MaxHeap.from_list(elements)~ to build it
"""
INFINITY = float("inf")
NEGATIVE_INFINITY = -INFINITY
ROOT_NODE = 1
heap: list = [INFINITY]
_size: int = None
From List
This is a class method to create the list for the heap. The calculations for the locations in the array of parent and child nodes is easier if we have a 1-based list so I'll pad the list being passed in. Additionally, I'll set the first value to \(\infty\) so that the Heap Property will pass for the root node without needing any special consideration for what its parent is.
@classmethod
def from_list(cls, heap: list):
"""Builds a max-heap instance from the starter list
Args:
heap: list of elements to dump on the heap
Returns:
MaxHeap instance with the heap list added
"""
return cls(heap = [cls.INFINITY] + heap)
The Heap Size
Since we padded the list holding the heap the length of the list will never be the same as the number of nodes in the heap. Additionally we'll sometimes manipulate things so things that were in the heap are later excluded but still in the list so the size
property will help us to keep track of how big we think our heap is.
@property
def size(self) -> int:
"""The size of the max heap"""
if self._size is None:
self._size = len(self.heap) - 1
return self._size
@size.setter
def size(self, new_size) -> int:
"""Set the size of the max heap
Args:
new_size: how much of the list is in the heap
Raises:
AssertionError if the size is out of bounds for the list
"""
assert 0 <= new_size <= self.length
self._size = new_size
return
Length
This is the length of the array, not necessarily of the heap. This one's a little tricky, since the padding throws it off by one it would seem that it should be lowered by one, but if you use it to figure out the last index that will mess it up a little. Since we already have the size for the number of nodes I'll just pass the length of the list on and see what happens.
I was debating whether to use __len__
but I decided that this is really an internal measure and size
is meant to be the attribute to use. I'm mostly keeping this around so that it matches the CLRS attributes.
@property
def length(self) -> int:
"""The size of the array for the heap
Warning:
This includes the padding at the beginning of the list
"""
return len(self.heap)
Maximum
Since this is a max-heap the largest element is in the root, this just makes getting it more explicit.
@property
def maximum(self):
"""The value in the root node"""
return self.heap[self.ROOT_NODE]
Finding the Parent, Left-Child, and Right-Child of a Node
These are the implementations of the functions at the start of the post.
def parent(self, node: int) -> int:
"""Find the parent of a node
Args:
node: the index of the node to check
Returns:
the index of the parent of the node
"""
return node//2
def left_child(self, parent: int) -> int:
"""Find the left child of a parent
Args:
parent: the index of the parent node
Returns:
index of the left child of the parent
"""
return 2 * parent
def right_child(self, parent: int) -> int:
"""Find the right child of a parent
Args:
parent: the index of the parent node
Returns:
index of the right child of the parent
"""
return 2 * parent + 1
Heapify A Sub Tree
CLRS just calls this MaxHeapify. But then the bottoms-up heapification of the tree seemed more like it should be called heapify so I called it heapify_subtree
to note that it starts at a specific node which might not be the root.
def heapify_subtree(self, node: int):
"""Heapify the tree rooted at the node
Args:
node: index of the node to compare to its descendants
"""
left, right = self.left_child(node), self.right_child(node)
largest = node
if left <= self.size and self.heap[left] > self.heap[largest]:
# the left child was larger than the current parent node
largest = left
if right <= self.size and self.heap[right] > self.heap[largest]:
# the right child is larger than the left and the current parent
largest = right
if largest != node:
# the current parent is out of place, swap it with the larger child
self.heap[node], self.heap[largest] = (self.heap[largest],
self.heap[node])
# after the swap the item at "largest" is the value from the
# "node" we started with so try it again with this new location
self.heapify_subtree(largest)
return
The Call
This makes the MaxHeap callable and heapifies the entire heap using a bottoms-up construction.
def __call__(self):
"""Heapifies the heap
Raises:
AssertionError: something bad happened and the Heap Property failed
"""
for parent in reversed(range(1, self.size//2 + 1)):
self.heapify_subtree(parent)
self.check_rep()
return
Increase a Key
When we change the value of a node, if the value is higher than the previous value it might be in the wrong place in the heap (e.g. it might be bigger than its parent) so we need to traverse upward, swapping it with parents smaller than it, until we find where it should go. CLRS made it a requirement that the new value is larger than the old one, which makes sense in light of the name IncreaseKey
, but it seems to me that you could just call it ChangeKey
and use a conditional instead of raise an exception, but I'll stick with the error for now.
def increase_key(self, node, key):
"""Increase the node's value
Args:
node: index of node in heap to change
key: new value for the node
Raises:
AssertionError if new value isn't larger than the previous value
"""
assert key > self.heap[node], (f"{key} not greater than previous value {self.heap.node}")
self.heap[node] = key
while (node > self.ROOT_NODE and
self.heap[self.parent(node)] < self.heap[node]):
self.heap[node], self.heap[self.parent(node)] = (
self.heap[self.parent(node)], self.heap[node])
node = self.parent(node)
return
Insert a Value
CLRS describes insert
and increase_key
as part of updating a priority queue, but Levitin's description of top-down heap construction
seems to use them as an alternative way to create the heap. He describes this method of construction (top-down) as starting with an empty heap and repeatedly inserting elements from the original array until you have a heap.
def insert(self, key):
"""Insert the key into the heap
Args:
key: orderable item to insert into the heap
"""
self.size += 1
self.heap[self.size - 1] = self.NEGATIVE_INFINITY
self.increase_key(self.size - 1, key)
return
Check the Heap Property
This checks that the Heap Property holds for all the nodes.
def check_rep(self) -> None:
"""Checks the heap property
Raises:
AssertionError: the heap property has been violated
"""
for node in range(1, self.size):
assert self.heap[self.parent(node)] >= self.heap[node], (
f"Parent node {self.parent(node)} = {self.heap[self.parent(node)]} "
f"not >= {node}={self.heap[node]}")
return
Get and Set Item
I threw these in because I kept forgetting that the heap is an attribute of the MaxHeap, but it's only for convenience.
def __getitem__(self, node: int):
"""Gets an item from the heap
Args:
node: index of the heap to get the value
"""
return self.heap[node]
def __setitem__(self, node, value):
"""Sets the value at the node in the heap
Args:
node: index of the heap to set the value
value: what to set the location in the heap to
"""
self.heap[node] = value
return
The Tests
start = [10, 20, 5]
max_heap = MaxHeap.from_list(heap=start)
expect(max_heap.heap).to(equal([max_heap.INFINITY] + start))
expect(max_heap.size).to(equal(3))
expect(max_heap.length).to(equal(4))
expect(max_heap.parent(1)).to(equal(0))
expect(max_heap.parent(2)).to(equal(1))
expect(max_heap.parent(3)).to(equal(1))
expect(max_heap.left(1)).to(equal(2))
expect(max_heap.right(1)).to(equal(3))
def failure(): max_heap.check_rep()
expect(failure).to(raise_error(AssertionError))
expect(max_heap.maximum).to(equal(10))
start = [16, 4, 10, 14, 7, 9, 3, 2, 8, 1]
heap = MaxHeap.from_list(start)
expect(heap.maximum).to(equal(16))
heap.heapify_subtree(2)
expect(heap[2]).to(equal(14))
heap.heapify_subtree(1)
expect(heap.maximum).to(equal(16))
expect(heap[2]).to(equal(14))
expect(heap[4]).to(equal(8))
expect(heap[9]).to(equal(4))
start = [10, 20, 30, 40]
heap = MaxHeap.from_list(start)
heap()
expect(heap.maximum).to(equal(40))
start = [1, 2, 3, 4, 7, 8, 9, 10, 14, 16]
heap = MaxHeap.from_list(start)
expect(heap.maximum).to(equal(1))
heap()
expect(heap.maximum).to(equal(16))
Heap Sort
<<heapsort-imports>>
<<heap-sort>>
Imports
# pypi
# https://www.attrs.org/en/stable/index.html
from attrs import define
# this project
from bowling.data_structures.heap import MaxHeap
The Heap Sort Class
The HeapSort uses the fact that a Max Heap always has the largest element at the root and repeatedly puts the root at the end of the list then shrinks the heap so it doesn't include the value that was moved over.
@define
class HeapSort:
"""Sort using a heap
Args:
items: collection of items for the sort
"""
items: list
_heap: MaxHeap=None
@property
def heap(self) -> MaxHeap:
"""The heap of items"""
if self._heap is None:
self._heap = MaxHeap.from_list(self.items)
self._heap()
return self._heap
@property
def without_root(self) -> list:
"""The items without the root """
return self.heap.heap[self.heap.ROOT_NODE:]
def __call__(self):
"""sorts the items"""
self.heap()
for node in range(self.heap.size, 1, -1):
self.heap.heap[self.heap.ROOT_NODE], self.heap.heap[node] = (
self.heap.heap[node],
self.heap.maximum)
self.heap.size -= 1
self.heap()
return
The Tests
from bowling.sort.heap import HeapSort
k = 100
items = random.choices(range(k), k=k)
sorter = HeapSort(items.copy())
sorter()
items.sort()
expect(sorter.without_root).to(contain_exactly(*items))
A Priority Queue
Although some books mention that MinHeaps are used for priority queues, CLRS shows a MaxHeap version. This involves adding a couple of methods to the MaxHeap so there's no special class.
The Tests
items = [1, 2, 3]
heap = MaxHeap.from_list(items)
heap()
def failure(): heap.increase_key(2, 0)
expect(failure).to(raise_error(AssertionError))
heap.increase_key(2, 5)
expect(heap.maximum).to(equal(5))
items = [1, 2, 3, 4, 7, 8, 9, 10, 14, 16]
heap = MaxHeap.from_list(items)
heap()
heap.increase_key(9, 15)
expect(heap[heap.left(1)]).to(equal(15))
heap.insert(20)
expect(heap.size).to(equal(len(items) + 1))
expect(heap.maximum).to(equal(20))
Plotting
from networkx import Graph
import holoviews
graph = Graph()
for node in range(1, len(heap.heap)//2+ 1):
if heap.left(node) < heap.length:
graph.add_edge(heap.heap[node], heap.heap[heap.left(node)])
if heap.right(node) < heap.length:
graph.add_edge(heap.heap[node], heap.heap[heap.right(node)])
positions = networkx.drawing.nx_pydot.graphviz_layout(graph, prog="dot")
plot = holoviews.Graph.from_networkx(graph, positions)
output = Embed(plot=plot, file_name="heap-plot")()
End
Quicksort
Table of Contents
The Algorithm
I covered the Partition
function in The Road To Partition and now I'll use it to implement a quicksort. This is the CLRS version.
\begin{algorithm} \caption{QuickSort} \begin{algorithmic} \INPUT An array and left and right locations defining a subarray \OUTPUT The subarray is sorted \PROCEDURE{QuickSort}{A, left, right} \IF {left < right} \STATE pivot $\gets$ \textsc{Partition}(A, left, right) \STATE \textsc{QuickSort}(A, left, pivot - 1) \STATE \textsc{QuickSort}(A, pivot + 1, right) \ENDIF \ENDPROCEDURE \end{algorithmic} \end{algorithm}
Implementing It
Some Imports
# python
from collections.abc import MutableSequence
from dataclasses import dataclass
from functools import partial
from math import log2
import random
# pypi
from expects import contain_exactly, equal, expect
from joblib import Parallel, delayed
import altair
import pandas
# this project
from bowling.sort.insertion import insertion_sort
from bowling.sort.merge import mergesort
# other monkey stuff
from graeae.visualization.altair_helpers import output_path, save_chart
from graeae import Timer
Some Setup
@dataclass
class PartitionOutput:
"""Keeps the output for the partition funcion"""
pivot: int
count: int
TIMER = Timer()
SLUG = "quicksort"
OUTPUT_PATH = output_path(SLUG)
save_it = partial(save_chart, output_path=OUTPUT_PATH)
The Partition
Even though I implemented it in that other post I didn't make it so it returns counts so we can estimate the runtime so I'll do that here.
def partition(collection: MutableSequence,
left: int, right: int) -> PartitionOutput:
"""Partitions the collection around the last element
Args:
collection: the list to partition
left: index of the first element in the sub-list to partition
right: index of the last element in the sub-list to partition
Returns:
PartitionOutput(pivot, count)
"""
count = 0
pivot_element = collection[right]
lower_bound = left - 1
for upper_bound in range(left, right):
count += 1
if collection[upper_bound] <= pivot_element:
lower_bound += 1
(collection[lower_bound],
collection[upper_bound]) = (collection[upper_bound],
collection[lower_bound])
pivot = lower_bound + 1
(collection[pivot],
collection[right]) = (collection[right],
collection[pivot])
count += 1
return PartitionOutput(pivot=pivot, count=count)
Some Checks
The First Example
start = [5, 7, 9, 4, 6]
test = start.copy()
expected = [5, 4, 6, 7, 9]
first_expected_pivot = 2
output = partition(test, 0, 4)
expect(output.pivot).to(equal(first_expected_pivot))
expect(test).to(contain_exactly(*expected))
expect(output.count).to(equal(len(test)))
And to make sure the sub-list works.
left, right = [100, 20], [999, 888, 777]
test = left + start.copy() + right
output = partition(test, 2, 6)
# all we did was shift the sub-list to spots to the right
expect(output.pivot).to(equal(first_expected_pivot + 2))
# only the sub-list should be partitioned
expect(test).to(contain_exactly(*(left + expected + right)))
# the count should match our sub-array
expect(output.count).to(equal(len(start)))
The Pivot Is the Biggest Element
start = [9, 6, 25, 4, 100]
test = start.copy()
output = partition(test, 0, 4)
# the pivot should be the last element
expect(output.pivot).to(equal(4))
# nothing changes in the list
expect(test).to(contain_exactly(*start))
# once again, count should match the size of the input
expect(output.count).to(equal(len(test)))
The QuickSort
def quicksort(collection: MutableSequence, left: int, right: int) -> int:
"""Recursive quicksort
Args:
collection: list to sort
left: index of start of sub-list in collection to sort
right: index of end of sub-list in collection to sort
Returns:
count of comparisons
"""
count = 0
if left < right:
output = partition(collection, left, right)
count += output.count
count += quicksort(collection, left, output.pivot - 1)
count += quicksort(collection, output.pivot + 1, right)
return count
Check It Out
start = list(range(10))
items = start.copy()
random.shuffle(items)
length = len(items)
count = quicksort(items, 0, length-1)
print(f"count: {count}")
print(f"Theoretical Average: {length * log2(length):.2f}")
print(f"Theoretical Worst: {length**2}")
expect(items).to(contain_exactly(*start))
count: 37 Theoretical Average: 33.22 Theoretical Worst: 100
Plotting The Quicksort Runtimes
@dataclass
class QuicksortOutput:
"""Holds the output of the quicksort counts"""
comparisons: int
size: int
def quicksorter(collection: MutableSequence) -> QuicksortOutput:
"""runs the quicksort and outputs count and size of collection
Args:
collection: thing to sort
Returns:
QuicksortOutput(count, size)
"""
size = len(collection)
count = quicksort(collection, 0, size - 1)
return QuicksortOutput(comparisons=count, size=size)
With Random Input
things_to_sort = [list(range(count)) for count in range(1, 10**5, 1000)]
for things in things_to_sort:
random.shuffle(things)
with TIMER:
quick_output = Parallel(n_jobs=-1)(
delayed(quicksorter)(thing_to_sort)
for thing_to_sort in things_to_sort)
Started: 2022-02-05 00:57:12.120397 Ended: 2022-02-05 00:57:14.397235 Elapsed: 0:00:02.276838
with TIMER:
merge_output = Parallel(n_jobs=-1)(
delayed(mergesort)(thing_to_sort)
for thing_to_sort in things_to_sort)
Started: 2022-02-05 00:58:40.087042 Ended: 2022-02-05 00:58:42.326204 Elapsed: 0:00:02.239162
Note to future self: Pypy is much faster with the python inputs and much slower with numpy inputs.
counts = [output.comparisons for output in quick_output]
sizes = [output.size for output in quick_output]
frame = pandas.DataFrame({"Size": sizes, "QuickSort": counts})
frame["Merge Sort"] = [output for output in merge_output]
melted = frame.melt(id_vars=["Size"],
var_name="Sort Algorithm", value_name="Comparisons")
chart = altair.Chart(melted).mark_point().encode(
x="Size",
y="Comparisons",
color="Sort Algorithm",
tooltip=[altair.Tooltip("Size", format=","),
altair.Tooltip("Comparisons", format=","),
"Sort Algorithm"]
).properties(
title="QuickSort vs Merge Sort",
width=800,
height=525,
)
save_it(chart, "quicksort-runtime")
I originally had Insertion Sort in the plot too, but it does so poorly that it just squashes both the Merge Sort and Quick Sort runtimes to a flat line. This is kind of an interesting plot. Quick Sort does much, much better than Insertion Sort, but it still doesn't quite keep up with Merge Sort. The trade-off being that Quick Sort does its sorting in place while Merge Sort creates all these temporary copies.
Worst-Case Input
Remember that case in the the partition post where the last item (the pivot) was the largest item, and how it resulted in nothing being moved around? What if no matter what sub-array you picked, the last item was always the largest? In other words, what if it's already sorted?
For one thing with really big inputs the interpreter throws an error because you've made too many recursive calls, so that tells you that something bad is happening.
things_to_sort = [list(range(count)) for count in range(1, 10**3, 100)]
with TIMER:
quick_output = Parallel(n_jobs=-1)(
delayed(quicksorter)(thing_to_sort)
for thing_to_sort in things_to_sort)
Started: 2022-02-05 01:06:19.614254 Ended: 2022-02-05 01:06:20.159874 Elapsed: 0:00:00.545620
with TIMER:
merge_output = Parallel(n_jobs=-1)(
delayed(mergesort)(thing_to_sort)
for thing_to_sort in things_to_sort)
Started: 2022-02-05 01:06:23.592928 Ended: 2022-02-05 01:06:23.661150 Elapsed: 0:00:00.068222
Note to future self: Pypy is much faster with the python inputs and much slower with numpy inputs.
counts = [output.comparisons for output in quick_output]
sizes = [output.size for output in quick_output]
frame = pandas.DataFrame({"Size": sizes, "QuickSort": counts})
frame["Merge Sort"] = [output for output in merge_output]
melted = frame.melt(id_vars=["Size"],
var_name="Sort Algorithm", value_name="Comparisons")
chart = altair.Chart(melted).mark_point().encode(
x="Size",
y="Comparisons",
color="Sort Algorithm",
tooltip=[altair.Tooltip("Size", format=","),
altair.Tooltip("Comparisons", format=","),
"Sort Algorithm"]
).properties(
title="QuickSort vs Merge Sort (Worst-Case)",
width=800,
height=525,
)
save_it(chart, "quicksort-runtime-worst")
Looking At the Sort In Progress
def quicksort_tracer(collection: MutableSequence,
left: int, right: int, tracer: dict=None) -> int:
"""Recursive quicksort
Args:
collection: list to sort
left: index of start of sub-list in collection to sort
right: index of end of sub-list in collection to sort
tracer: dict of element: list of locations
Returns:
tracer dict
"""
if tracer is None:
tracer = {element: [index] for index, element in enumerate(collection)}
if left < right:
output = partition(collection, left, right)
quicksort_tracer(collection, left, output.pivot - 1, tracer)
quicksort_tracer(collection, output.pivot + 1, right, tracer)
for index, element in enumerate(collection):
tracer[element].append(index)
return tracer
size = 20
start = list(range(size))
inputs = start.copy()
inputs.reverse()
tracer = quicksort_tracer(inputs, 0, size - 1)
frame = pandas.DataFrame(tracer)
frame = frame.reset_index().rename(columns={"index": "Quicksort Call"})
melted = frame.melt(id_vars=["Quicksort Call"], var_name="Element", value_name="Location")
chart = altair.Chart(melted).mark_line().encode(
x="Quicksort Call",
y="Location",
color="Element",
tooltip=["Quicksort Call", "Location", "Element"]
).properties(
title="Quicksort Trace (Reversed Input)",
width=800,
height=525,
)
save_it(chart, "quicksort-trace-backwards")
size = 20
start = list(range(size))
inputs = start.copy()
random.shuffle(inputs)
tracer = quicksort_tracer(inputs, 0, size - 1)
frame = pandas.DataFrame(tracer)
frame = frame.reset_index().rename(columns={"index": "Quicksort Call"})
melted = frame.melt(id_vars=["Quicksort Call"], var_name="Element", value_name="Location")
chart = altair.Chart(melted).mark_line().encode(
x="Quicksort Call",
y="Location",
color="Element",
tooltip=["Quicksort Call", "Location", "Element"]
).properties(
title="Quicksort Trace (Shuffled Input)",
width=800,
height=525,
)
save_it(chart, "quicksort-trace-shuffled")
The Master Theorem
The General Divide and Conquer Recurrence
When we have a Divide and Conquer algorithm that we solve via recursion we can estimate the running time using the General Divide and Conquer Recurrence.
- \(T(n)\) is the total runtime.
- \(n\) is the size of the problem (the number of instances in the input).
- \(a\) is the number of the sub-problems that we create that need to be solved.
- \(b\) is the number of sub-problems we create (we'll mostly see cases where \(a=b=2\)).
- \(f(n)\) is the time spent dividing the problem and later recombining the sub-problems.
Which we combine into this equation.
\[ T(n) = aT \left(\frac{n}{b} \right) + f(n) \]
The Master Theorem
In the case where \(f(n) \in \Theta \left (n^d \right)\) and \(d ≥ 0\) (so it's either 1 or a power of n), we can use the following conditional cases to estimate the runtime.
\[ T(n) \in \begin{cases} \Theta\left (n^d \right ) & \textrm{if }a < b^d \\ \Theta\left (n^d \log n \right ) & \textrm{if }a = b^d \\ \Theta\left (n^{\log_b a} \right) & \textrm{if }a > b^d \end{cases} \]
The Mergesort
Table of Contents
Beginning
This is a look at the Mergesort, an example of the Divide and Conquer strategy for sorting collections.
Middle
The CLRS Algorithm
Our Mergesort repeatedly divides the given list into two parts and then recursively calls itself with the sub-problems and then takes advantage of our previously defined Merge function to combine the solutions together once we are down to one item to sort (\(p = r\)).
\begin{algorithm} \caption{Mergesort} \begin{algorithmic} \INPUT An array and left and right locations of the subarray in the array \OUTPUT The array in sorted order \PROCEDURE{Mergesort}{$A, p, r$} \IF {p < r} \STATE \textbf{Divide} \STATE $q \gets \left \lfloor \frac{p + r}{2} \right \rfloor$ \STATE \\ \textbf{Conquer} \STATE \textsc{MergeSort}(A, p, q) \STATE \textsc{MergeSort}(A, q + 1, r) \STATE \\ \textbf{Combine} \STATE \textsc{Merge}(A, p, q, r) \ENDIF \ENDPROCEDURE \end{algorithmic} \end{algorithm}
Runtime
- Divide: \(D(n) = \Theta(1)\)
- Conquer: \(2T\left (\frac{n}{2}\right)\)
- Combine: \(C(n) = \Theta(n)\)
The Levitin
\begin{algorithm} \caption{Levitin's Mergesort} \begin{algorithmic} \INPUT Array $A[0 \ldots n - 1]$ of orderable elements. \OUTPUT Array $A[0 \ldots n - 1]$ sorted in non-decreasing order. \PROCEDURE{MergeSort}{A} \IF {n > 1} \STATE \textbf{Divide} \STATE Copy $A\left[0 \ldots \left\lfloor \frac{n}{2} \right\rfloor - 1 \right]$ to $B \left[0 \ldots \left\lfloor \frac{n}{2} \right\rfloor - 1 \right]$ \STATE Copy $A \left[\left\lfloor \frac{n}{2}\right\rfloor \ldots n - 1 \right]$ to $C \left[0 \ldots \left\lfloor \frac{n}{2} \right\rfloor - 1 \right]$ \STATE \\ \textbf{Conquer} \STATE \textsc{MergeSort}(B) \STATE \textsc{MergeSort}(C) \STATE \\ \textbf{Combine} \STATE \textsc{Merge}(B, C, A) \ENDIF \ENDPROCEDURE \end{algorithmic} \end{algorithm}
Implement It
# python
from collections.abc import MutableSequence, Sequence
from math import log2
import random
# pypi
from expects import contain_exactly, equal, expect
def merge(left_stack: Sequence,
right_stack: Sequence,
target: MutableSequence) -> int:
"""Merges values from left and right stacks into target collection
Args:
left_stack: sorted collection of items to merge
right_stack: sorted collection of items to merge
target: collection into which to merge the items
Returns:
count of basic operations
"""
left_size, right_size = len(left_stack), len(right_stack)
next_left = next_right = put_item_here = count = 0
while next_left < left_size and next_right < right_size:
count += 1
if left_stack[next_left] <= right_stack[next_right]:
target[put_item_here] = left_stack[next_left]
next_left += 1
else:
target[put_item_here] = right_stack[next_right]
next_right += 1
put_item_here += 1
if next_left == left_size and next_right < right_size:
for stack_offset in range(left_size + right_size - put_item_here):
count += 1
target[put_item_here + stack_offset] = right_stack[next_right + stack_offset]
elif next_left < left_size:
for stack_offset in range(left_size + right_size - put_item_here):
count += 1
target[put_item_here + stack_offset] = left_stack[next_left + stack_offset]
return count
def mergesort(collection: MutableSequence) -> int:
"""Sorts the collection using a recursive mergesort
Args:
collection: a mutable sequence
Returns:
runtime count
"""
items = len(collection)
count = 0
if items > 1:
middle = items//2
left_stack = collection[:middle]
right_stack = collection[middle:]
assert len(left_stack) + len(right_stack) == items
count += mergesort(left_stack)
count += mergesort(right_stack)
count += merge(left_stack, right_stack, collection)
return count
size = 2**10
items = random.choices(list(range(size)), k=size)
starting = items.copy()
runtime = mergesort(items)
expect(runtime).to(equal(size * log2(size)))
expect(items).to(contain_exactly(*list(sorted(starting))))
print(f"{size * log2(size):,}")
print(f"{runtime:,}")
10,240.0 10,240
Comparing Sorts
# python
from functools import partial
# pypi
from numba import njit
from joblib import Parallel, delayed
from numpy.random import default_rng
import altair
import numpy
import pandas
# this project
from bowling.sort.insertion import insertion_sort
from graeae import Timer
from graeae.visualization.altair_helpers import output_path, save_chart
numba_random = default_rng(2022)
TIMER = Timer()
SLUG = "the-mergesort"
OUTPUT_PATH = output_path(SLUG)
save_it = partial(save_chart, output_path=OUTPUT_PATH)
def merger(thing_to_sort) -> tuple:
"""A thing to add the size of the input to the output
Args:
thing_to_sort: collection of items to sort
Returns:
number of things to sort, count of merges
"""
return len(thing_to_sort), mergesort(thing_to_sort)
ninsertion = njit(insertion_sort)
things_to_sort = [numba_random.integers(low=0, high=count, size=count)
for count in range(1, 10**5 + 1, 1000)]
with TIMER:
insertion_output = Parallel(n_jobs=-1)(
delayed(ninsertion)(thing_to_sort)
for thing_to_sort in things_to_sort)
Started: 2022-01-28 01:12:10.628102 Ended: 2022-01-28 01:12:22.681161 Elapsed: 0:00:12.053059
with TIMER:
merge_output = Parallel(n_jobs=-1)(
delayed(merger)(thing_to_sort)
for thing_to_sort in things_to_sort)
Started: 2022-01-28 01:12:33.278574 Ended: 2022-01-28 01:12:38.850064 Elapsed: 0:00:05.571490
SIZE, COMPARISONS = 0, 1
unzipped = list(zip(*merge_output))
frame = pandas.DataFrame({"Size": unzipped[SIZE],
"Mergesort": unzipped[COMPARISONS]})
unzipped = list(zip(*insertion_output))
frame["Insertion Sort"] = unzipped[COMPARISONS]
melted = frame.melt(id_vars=["Size"],
value_vars=["Mergesort", "Insertion Sort"],
var_name="Sort Algorithm", value_name="Comparisons")
chart = altair.Chart(melted).mark_point().encode(
x="Size",
y="Comparisons",
color="Sort Algorithm",
tooltip=[altair.Tooltip("Size", format=","),
altair.Tooltip("Comparisons", format=","),
"Sort Algorithm"]
).properties(
title="Insertion vs Merge Sort",
width=800,
height=525,
)
save_it(chart, "insertion-vs-merge")
frame["nlog2(n)"] = frame["Size"] * numpy.log2(frame["Size"])
del(frame["Insertion Sort"])
melted = frame.melt(id_vars=["Size"],
value_vars=["Mergesort", "nlog2(n)"],
var_name="Source", value_name="Comparisons")
chart = altair.Chart(melted).mark_point().encode(
x="Size",
y="Comparisons",
color="Source",
tooltip=[altair.Tooltip("Size", format=","),
altair.Tooltip("Comparisons", format=","),
"Source"]
).properties(
title="Merge Sort vs Theoretical",
width=800,
height=525,
)
save_it(chart, "merge-by-logn")
Looking at the Mergesort
The version of mergesort I implemented above is based on Levitin's algorithm, which I thought was clearer, since the copying was happening outside of the merge. This makes it harder to plot the progress of the sort, though, so I'm going to use the CLRS algorithm here to get some kind of visualization (hopefully).
from bowling.sort.merge import merge_clrs
def mergesort_tracer(collection: MutableSequence, left: int, right: int,
tracer: dict=None) -> dict:
"""Sorts the collection using a recursive mergesort
This uses the CLRS merge-sort to build dict of the locations of the
elements as the merging is done
Args:
collection: a mutable sequence
left: the index of the beginning element of the sub-array
right: the index of the ending element of the sub-array
tracer: the dictionary to track the indices of the items
Returns:
tracer updated with post-merge locations of the items in tracer
"""
if tracer is None:
tracer = {element: [index] for index, element in enumerate(collection)}
if left < right:
middle = (left + right)//2
mergesort_tracer(collection, left, middle, tracer)
mergesort_tracer(collection, middle + 1, right, tracer)
merge_clrs(collection, left, middle, right)
for index, element in enumerate(collection):
tracer[element].append(index)
return tracer
A Shuffle Input
elements = 20
start = list(range(elements))
inputs = start.copy()
random.shuffle(inputs)
tracer = mergesort_tracer(inputs, left=0, right=len(inputs) - 1)
expect(inputs).to(contain_exactly(*start))
frame = pandas.DataFrame(tracer)
frame = frame.reset_index().rename(columns={"index": "Merge"})
melted = frame.melt(id_vars=["Merge"], var_name="Element", value_name="Location")
chart = altair.Chart(melted).mark_line().encode(
x=altair.X("Merge", axis=altair.Axis(tickMinStep=1)),
y=altair.Y("Location", axis=altair.Axis(tickMinStep=1)),
color=altair.Color("Element", legend=None),
tooltip=["Merge", "Location", "Element"]
).properties(
title="Mergesort Tracing",
width=800,
height=525
)
save_it(chart, "mergesort-trace")
You can kind of see what's going on, every time you see a bunch of crossed lines it's because the merge moved items around… but maybe an artificial case will be clearer.
A Reversed Input
Let's use an input that's sorted but backwards to see the swaps more clearly.
elements = 20
start = list(range(elements))
inputs = start.copy()
inputs.reverse()
tracer = mergesort_tracer(inputs, left=0, right=len(inputs) - 1)
expect(inputs).to(contain_exactly(*start))
frame = pandas.DataFrame(tracer)
frame = frame.reset_index().rename(columns={"index": "Merge"})
melted = frame.melt(id_vars=["Merge"], var_name="Element", value_name="Location")
chart = altair.Chart(melted).mark_line().encode(
x=altair.X("Merge", axis=altair.Axis(tickMinStep=1)),
y=altair.Y("Location", axis=altair.Axis(tickMinStep=1)),
color=altair.Color("Element", legend=None),
tooltip=["Merge", "Location", "Element"]
).properties(
title="Mergesort Tracing (From Backwards Source)",
width=800,
height=525
)
save_it(chart, "mergesort-trace-backwards")
Divide and Conquer
Description
The Divide-and-Conquer approach involves three parts.
- Divide: Break the problem up into sub-problems that are smaller instances of the same problem.
- Conquer: If the problem is small enough solve it, otherwise recursively solve the sub-problem.
- Combine: Combine the solutions to the sub-problems into a solution for the original problem.
The Merge
Table of Contents
The Merge
This is an implementation of the merge portion of the merge-sort. It takes two sorted sections of a collection and merges them together in place.
CLRS
The precondition for this to work is that there are two sections within the array passed in to the algorithm that are already sorted and that they are located by the three index-values given to the algorithm. The first sorted section starts at p
and ends at q
in the array and the second sorted section starts at q + 1
and ends at r
within the array.
This is the CLRS version with the indexing changed to start at 0.
\begin{algorithm} \caption{Merge} \begin{algorithmic} \INPUT An array and left, middle, and right locations in the array \REQUIRE Sub-arrays from $p$ to $q$ and from $q + 1$ to $r$ are sorted \OUTPUT The array with the two sections collated in order \PROCEDURE{Merge}{$A, p, q, r$} \STATE \textbf{The sizes of the sub-sections} \STATE $n_1 \gets q - p + 1$ \STATE $n_2 \gets r - q$ \STATE \\ \textbf{Copy the subsections to new arrays.} \STATE \textit{New arrays have one extra cell to hold a sentinel.} \STATE $L \gets Array[0\ldots n_1]$ \STATE $R \gets Array[0 \ldots n_2]$ \FOR {$i \in {0 \ldots n_1 - 1}$} \STATE $L[i] \gets A[p + i - 1]$ \ENDFOR \FOR{$j \in {0 \ldots n_2 - 1}$} \STATE $R[j] \gets A[q + j]$ \ENDFOR \STATE \\ \textbf{Add sentinel to indicate end} \STATE $L[n_1] \gets \infty $ \STATE $R[n_2] \gets \infty $ \STATE \\ \textbf{Collate} \STATE $i \gets 0$ \STATE $j \gets 0$ \FOR {$k \in {p \ldots r}$} \IF {$L[i] \leq R[j]$} \STATE $A[k] \gets L[i]$ \STATE $i' \gets i + 1$ \ELSE \STATE $A[k] \gets R[j]$ \STATE $j' \gets j + 1$ \ENDIF \ENDFOR \ENDPROCEDURE \end{algorithmic} \end{algorithm}
One way to think about how the algorithm is like you have two stacks of cards, each of which is sorted and you want to merge them together in sorted order. Since they are already sorted, you only have to compare the top cards on the two stacks to each other, chose the lower card and put it in your output stack, and keep repeating this until you've moved all the cards from the two stacks to the output. If one stack runs out of cards you just move the other remaining cards onto the output stack.
Our algorithm starts by copying out the values from the source array into two new arrays to create our stacks. We append an \(\infty\) onto the end of each to simulate the emptying of the stack. Then we keep looking at the top item in each stack and copying the smaller item back into the original array. Because of the copying we are going over the input twice, but that is a relatively small (linear) increase.
Implement It
# python
from collections.abc import MutableSequence
# pypi
from expects import contain_exactly, equal, expect
INFINITY = float("inf")
def merge_clrs(collection: MutableSequence,
left_start: int,
left_end: int,
right_end: int) -> int:
"""Merge the sub-sections from the collection
Args:
collection: list or array with sorted sub-sections
left_start: index of start of first sub-section
left_end: index of last item of first sub-section
right_end: index of the last item of second sub-section
"""
count = 0
left_size = left_end - left_start + 1
right_size = right_end - left_end
right_start = left_end + 1
left_stack = ([None] * left_size)
right_stack = ([None] * right_size)
for stack_location in range(left_size):
left_stack[stack_location] = collection[left_start + stack_location]
count += 1
for stack_location in range(right_size):
right_stack[stack_location] = collection[right_start + stack_location]
count += 1
left_stack.append(INFINITY)
right_stack.append(INFINITY)
next_left = next_right = 0
for put_next_item_here in range(left_start, right_end + 1):
count += 1
if left_stack[next_left] <= right_stack[next_right]:
collection[put_next_item_here] = left_stack[next_left]
next_left += 1
else:
collection[put_next_item_here] = right_stack[next_right]
next_right += 1
return count
def merge_check(merger):
first = list(range(5))
second = first[:]
collection = first + second
count = merger(collection, 0, 4, 9)
expect(count).to(equal(20))
expect(collection).to(contain_exactly(0,0,1,1,2,2,3,3,4,4))
collection = [10] + first + second
count = merger(collection, 1, 5, 10)
expect(count).to(equal(20))
expect(collection[1:11]).to(contain_exactly(0,0,1,1,2,2,3,3,4,4))
collection = [10] + first + second + [-1, 5]
count = merger(collection, 1, 5, 10)
expect(count).to(equal(20))
expect(collection[1:11]).to(contain_exactly(0,0,1,1,2,2,3,3,4,4))
return
merge_check(merge_clrs)
Runtime
Without doing anything fancy we can see that there's three for loops, the first two cover copying over all the sub-list items from the original list to the new lists, so together they execute once for every item (n times). And the loop that does the actual merge also runs once for each item so it also runs n times so altogether it has a run time of 2n which we'll say is \(\Theta(n)\). This is actually going to be part of the merge-sort but I thought I'd put that in here since the post is separate.
Levitin
This is the version given in Introduction to the Design & Analysis of Algorithms (Levitin) which I find a little clearer than the CLRS version. I generally prefer Levitin's versions, but, you know, CLRS is the one you have to have, so it's there too.
\begin{algorithm} \caption{Merge} \begin{algorithmic} \INPUT $B[0 \ldots p-1]$, $C[0 \ldots q - 1]$, $A[0 \ldots p + q - 1]$ \REQUIRE Sub-arrays $B$ and $C$ are sorted \OUTPUT Sorted array $A$ with the elements of $B$ and $C$. \PROCEDURE{Merge}{$B, C, A$} \STATE $i \gets 0$ \STATE $j \gets 0$ \STATE $k \gets 0$ \WHILE {$i < p$ and $j < q$} \IF {$B[i] \le C[j]$} \STATE $A[k] \gets B[i]$ \STATE $i \gets i + 1$ \ELSE \STATE $A[k] \gets C[j]$ \STATE $j \gets j + 1$ \ENDIF \STATE $k \gets k + 1$ \ENDWHILE \IF {$i=p$} \STATE Copy $C[j \ldots q-1]$ to $A[k \ldots p + q - 1]$ \ELSE \STATE Copy $B[i \ldots p - 1]$ to $A[k \ldots p + q - 1]$ \ENDIF \ENDPROCEDURE \end{algorithmic} \end{algorithm}
There are a couple of noticeable differences between Levitin's version and the CLRS version. The first is that the lists to merge are passed into the function rather than being separated inside the function, which makes the "Divide" step separate from the "Combine" step. Additionally, instead of adding a sentinel to the end of the stacks the conditional checks to see if one of them is empty and copies over the stack that isn't empty outside of the merge-loop. This adds an additional conditional check to the main loop but then takes away the conditional check when copying over the leftovers. I might give the CLRS version a point for being more concise in handling the leftovers for the case where left and right are different sizes.
Implement It
# python
from collections.abc import Sequence
def merge_levitin(left_stack: Sequence, right_stack: Sequence,
target: MutableSequence) -> int:
"""Merges values from left and right stacks into target collection
Args:
left_stack: sorted collection of items to merge
right_stack: sorted collection of items to merge
target: collection into which to merge the items
Returns:
count of basic operations
"""
left_size, right_size = len(left_stack), len(right_stack)
next_left = next_right = put_item_here = count = 0
while next_left < left_size and next_right < right_size:
count += 1
if left_stack[next_left] <= right_stack[next_right]:
target[put_item_here] = left_stack[next_left]
next_left += 1
else:
target[put_item_here] = right_stack[next_right]
next_right += 1
put_item_here += 1
if next_left == left_size and next_right < right_size:
for stack_offset in range(left_size + right_size - put_item_here):
count += 1
target[put_item_here + stack_offset] = right_stack[next_right + stack_offset]
elif next_left < left_size:
for stack_offset in range(left_size + right_size - put_item_here):
count += 1
target[put_item_here + stack_offset] = left_stack[next_left + stack_offset]
return count
first = list(range(5))
second = [item + index for index, item in enumerate(first)]
collection = [None] * (len(first) + len(second))
count = merge_levitin(first, second, collection)
expect(count).to(equal(10))
expect(collection).to(contain_exactly(0,0,1,2,2,3,4,4,6,8))
second = list(range(5))
first = [item + index for index, item in enumerate(second)]
collection = [None] * (len(first) + len(second))
count = merge_levitin(first, second, collection)
expect(count).to(equal(10))
expect(collection).to(contain_exactly(0,0,1,2,2,3,4,4,6,8))
Runtime
Since the dividing of the array is moved out of the merge the runtime for the merge is \(n\) so it's also \(\Theta(n)\).
A Hybrid
def merge(left_stack: Sequence, right_stack: Sequence,
target: MutableSequence) -> int:
"""Merges values from left and right stacks into target collection
Args:
left_stack: sorted collection of items to merge
right_stack: sorted collection of items to merge
target: collection into which to merge the items
Returns:
count of basic operations
"""
target_size = len(left_stack) + len(right_stack)
# since we aren't copying the lists this can be kind of dangerous
# passing in the same list more than once will append INFINITY each time
left_stack.append(INFINITY)
right_stack.append(INFINITY)
next_left = next_right = count = 0
for put_item_here in range(target_size):
count += 1
if left_stack[next_left] <= right_stack[next_right]:
target[put_item_here] = left_stack[next_left]
next_left += 1
else:
target[put_item_here] = right_stack[next_right]
next_right += 1
return count
first = list(range(5))
second = [item + index for index, item in enumerate(first)]
collection = [None] * (len(first) + len(second))
count = merge(first, second, collection)
expect(count).to(equal(10))
expect(collection).to(contain_exactly(0,0,1,2,2,3,4,4,6,8))
second = list(range(5))
first = [item + index for index, item in enumerate(second)]
collection = [None] * (len(first) + len(second))
count = merge(first, second, collection)
expect(count).to(equal(10))
expect(collection).to(contain_exactly(0,0,1,2,2,3,4,4,6,8))
Binary Search
Table of Contents
Beginning
The Algorithm Parts
- Precondition: The input array (A) is sorted in non-decreasing order and the key \(k\) is in \(A\).
- Postcondition: The index of the matching element is output
- Loop Invariant: The key is in our current sublist: \(k \in A[left \ldots right]\).
- Basic Step: Find the midpoint of the sub-array, adjust the sub-array to use the midpoint such that the key is in the new sub-array.
- Exit Condition: \(left \le right\).
- Make Progress: After each loop the sublist is half the size of the previous sublist.
- Maintain the Loop Invariant: Pick the half of the sublist whose boundary value would allow the key.
- Establish the Loop Invariant: The initial sublist is the original list (\(left = 0, right=A.length -1\)).
- Worst Case Runtime: Since the input is repeatedly halved, the worst-case is \(\Theta(\log_{2} n)\).
Rather than saying "if k is in A" over and over, the search precondition requires that \(k\) is in \(A\) and if it isn't it's considered a failed search - which doesn't mean the algorithm is incorrect, just that the pre-condition wasn't met. We have to handle the other case, but for the purpose of defining the algorithm we'll set a stricter pre-condition.
Implementation
Imports
# python
from collections import namedtuple
from collections.abc import Sequence
from functools import partial
import random
# pypi
from expects import be_false, be_none, equal, expect, raise_error
from joblib import Parallel, delayed
import altair
import numpy
import pandas
from graeae import Timer
from graeae.visualization.altair_helpers import output_path, save_chart
Set Up
TIMER = Timer()
OUTPUT_PATH = output_path("binary-search")
save_it = partial(save_chart, output_path=OUTPUT_PATH)
SearchOutput = namedtuple("SearchOutput", ["index", "count"])
Binary Search One
The Pseudocode
\begin{algorithm} \caption{BinarySearch} \begin{algorithmic} \INPUT Array A of items in non-decreasing order, search key \OUTPUT If key is in array, the index of the item that matches \PROCEDURE{BinarySearch}{A, key} \STATE left $\gets$ 0 \STATE right $\gets$ A.length - 1 \WHILE {right > left} \STATE middle $\gets \left \lfloor \frac{\textrm{left} + \textrm{right}}{2} \right \rfloor$ \IF {key $\le$ A[middle]} \STATE right $\gets$ middle \ELSE \STATE left $\gets$ middle + 1 \ENDIF \ENDWHILE \IF {key = A[left]} \RETURN left \ELSE \RETURN NotFound \ENDIF \ENDPROCEDURE \end{algorithmic} \end{algorithm}
The Function
def binary_search(element: int, elements: Sequence) -> SearchOutput:
"""Does an iterative binary search for the key
Args:
element: item in the source to search for
elements: sorted (non-decreasing) collection with the key
Returns:
SearchOutput(index of the key in the source or None if not found, count of comparisons)
"""
left, right = 0, len(elements) - 1
count = 0
if not len(elements) or not (elements[left] <= element <= elements[right]):
return (None, 1)
while right > left:
assert elements[left] <= element <= elements[right]
count += 1
middle = (left + right)//2
left, right = ((left, middle) if element <= elements[middle]
else (middle + 1, right))
location = left if (elements[left] == element) else None
return (location, count)
def test_it(searcher):
test_case = list(range(11))
expect(searcher(element=5, elements=test_case)[0]).to(equal(5))
items = list(sorted(random.sample(range(100), k=50)))
for expected, item in enumerate(items):
expect(searcher(element=item, elements=items)[0]).to(equal(expected))
expect(searcher(element=-5, elements=items)[0]).to(be_none)
last = items[-1]
items[-1] = last + 100
expect(searcher(101, items)[0]).to(be_none)
expect(searcher(5, [])[0]).to(be_none)
expect(searcher(5, [5])[0]).to(equal(0))
return
test_it(binary_search)
Levitin's Version
The version I entered above is one that I found on the web (PDF Lecture Notes), and is roughly what CLRS has. The one in Levitin's Book is clearer to me but has one more comparison.
The Pseudocode
\begin{algorithm} \caption{BinarySearchTwo} \begin{algorithmic} \INPUT Array A of items in non-decreasing order, search key in A \OUTPUT The index of the item that matches key \PROCEDURE{BinarySearchTwo}{A, key} \STATE left $\gets$ 0 \STATE right $\gets$ A.length - 1 \WHILE {left $\le$ right} \STATE middle $\gets \left \lfloor \frac{\textrm{left} + \textrm{right}}{2} \right \rfloor$ \IF {key = A[middle]} \RETURN middle \ELIF {key < A[middle]} \STATE right $\gets$ middle - 1 \ELSE \STATE left $\gets$ middle + 1 \ENDIF \ENDWHILE \IF {key = A[left]} \RETURN left \ELSE \RETURN NotFound \ENDIF \ENDPROCEDURE \end{algorithmic} \end{algorithm}
The Function
def binary_search_two(element, elements) -> SearchOutput:
"""Iterative Binary Search
Args:
element: item in the source
elements: sorted collection to search
Returns:
index where key is found in the source or False if not found
Raises:
AssertionError: key is out of bounds for the source
"""
left, right = 0, len(elements) - 1
count = 0
location = None
while left <= right and location is None:
middle = (left + right)//2
if element == elements[middle]:
location = middle
left, right = ((left, middle - 1) if element <= elements[middle] else
(middle + 1, right))
return (location, count)
test_it(binary_search_two)
Recursive Version
Although it is pretty straightforward as an iterative function, divide and conquer lends itself to recursion so, just for completeness, here's a recursive version of the binary search.
The Pseudocode
\begin{algorithm} \caption{Recursive Binary Search} \begin{algorithmic} \REQUIRE Input Array is in non-decreasing order \INPUT Array A , search key, left and right indices to limit the search \OUTPUT The index of the item that matches the key \PROCEDURE{BinarySearchRecursive}{A, key, left, right} \IF {left > right} \RETURN NotFound \ENDIF \STATE middle $\gets \left \lfloor \frac{\textrm{left} + \textrm{right}}{2} \right \rfloor$ \IF {key = A[middle]} \RETURN middle \ELIF {key < A[middle]} \STATE right $\gets$ middle - 1 \ELSE \STATE left $\gets$ middle + 1 \ENDIF \RETURN \textsc{BinarySearchRecursive}(elements, key, left, right) \ENDPROCEDURE \end{algorithmic} \end{algorithm}
The Implementation
def recursive_binary_search(elements: Sequence, key: int,
left: int, right: int,
count: int=0):
"""Recursive binary search
Args:
elements: sorted sequence with element in it
key: item in elements to search for
left: left index of the current sub-list to search
right: right index of the current sub-list to search
count: number of times we run the comparison (for plotting)
Returns:
(index, count) - index of the element in the elements and the comparison count
"""
count += 1
if left > right:
# we missed it, the element isn't in the elements
return None, count
middle = (left + right)//2
if elements[middle] == key:
# we found it
return middle, count
# move one of the boundaries to the middle
if key < elements[middle]:
right = middle - 1
else:
left = middle + 1
return recursive_binary_search(elements, key, left, right, count)
This is a helper to get the recursive call started and to handle empty lists or search terms that are outside of the range of the list.
def search(element: int, elements: Sequence) -> tuple:
"""calls the recursive binary search
Args:
element: an element in source to search for
elements: sorted sequence of items
"""
left, right = 0, len(elements) - 1
if not len(elements) or not elements[left] <= element <= elements[right]:
return (None, 1)
return recursive_binary_search(elements, element, left, right)
test_it(search)
Some Plotting
Left and Right
Let's look at how the search updates the left and right boundaries. First we'll need a function that records the locations.
def binary_search_points(key, source) -> tuple:
"""Iterative Binary Search
Args:
key: item in the source
source: sorted collection to search
Returns:
tuple of left-right locations
Raises:
AssertionError: key is out of bounds for the source
"""
left, right = 0, len(source) - 1
lefts = [left]
rights = [right]
while right > left:
assert source[left] <= key <= source[right]
middle = (left + right)//2
(left, right) = ((left, middle) if key <= source[middle]
else (middle + 1, right))
lefts.append(left)
rights.append(right)
return lefts, rights
Now we'll plot it.
items = list(range(101))
key = items[24]
lefts, rights = binary_search_points(key, items)
data = pandas.DataFrame(dict(Left=lefts, Right=rights, Split=list(range(len(lefts)))))
melted = data.melt(id_vars=["Split"], value_vars=["Left", "Right"],
var_name="left_or_right",
value_name="Index")
base = altair.Chart(melted)
lines = base.mark_line().encode(
x="Split:O",
y="Index",
color="left_or_right"
)
points = base.mark_point().encode(
x="Split:O",
y="Index",
color="left_or_right"
)
chart = (lines + points).properties(
title="Binary Search Left-Right Boundaries",
width=800,
height=550
).interactive()
save_it(chart=chart, name="binary-search")
So, it isn't really so pretty as with the sorting plots. As the plot confirms, the left and right slowly narrow to find the item in the list.
Runtime
Let's see how the number of loops goes up with the size of the search space.
EXPONENT = 5
numba_search = njit(binary_search)
sizes = tuple(range(1, 10**EXPONENT + 1, 1000))
random_source = [numpy.sort(random.integers(low=0, high=count, size=count))
for count in sizes]
random_things = [(random.choice(elements), elements)
for elements in random_source]
worst_things = [(elements[0], elements) for elements in random_source]
with TIMER:
random_output = Parallel(n_jobs=-1)(
delayed(numba_search)(element, elements)
for (element, elements) in random_things)
worst_output = Parallel(n_jobs=-1)(
delayed(numba_search)(element, elements)
for (element, elements) in worst_things)
Started: 2022-01-16 21:33:07.668315 Ended: 2022-01-16 21:33:08.905390 Elapsed: 0:00:01.237075
data = pandas.DataFrame(dict(
Count=sizes,
Random=[output[1] for output in random_output],
First=[output[1] for output in worst_output]
))
melted = data.melt(id_vars=["Count"], value_vars=["Random", "First"],
var_name="Location", value_name="Bisections")
theoretical = pandas.DataFrame(dict(Count=sizes, Theoretical=numpy.log2(sizes)))
Now, to plot.
points = altair.Chart(melted).mark_point().encode(
x="Count",
y="Bisections",
color="Location")
line = altair.Chart(theoretical).mark_line().encode(
x="Count",
y="Theoretical",
tooltip=[altair.Tooltip("Count", format=","),
altair.Tooltip("Theoretical", format=".2f")],
)
chart = (line + points).properties(
title="Binary Search Bisections",
width=800,
height=525,
).interactive()
save_chart(chart=chart, name="binary-search-comparisons",
output_path=OUTPUT_PATH)
See Also
- Loop Invariants [Internet]. [cited 2022 Jan 15]. Available from: https://www.cs.cornell.edu/courses/cs2112/2018fa/lectures/lec_loopinv/
- Introduction to the Design & Analysis of Algorithms (Levitin)
Decrease And Conquer
With Decrease-and-Conquer algorithms you exploit the relationship between the solution to a problem and the solution to a sub-problem. There are three types of Decrease-and-Conquer algorithms:
- Decrease by a constant (typically one)
- Decrease by a constant factor (typically two)
- Decrease by a variable amount