The Selection Problem

What is the Selection Problem?

The Selection Problem involves finding the kth smallest item in a collection of n items. This item is called an Order Statistic.

Selection Vs Sorting

If the collection is already sorted then you can just pull the order statistic out directly (assuming it's accessible by its index) and if it isn't you could sort it first and then retrieve the order statistic. Since Mergesort and Heapsort have a runtime of \(\Theta(n \log n)\) we know that this would be the runtime using this method, but since we are only choosing one item we don't need to sort the collection, so we can find the item with less computation, and thus selection is its own problem.

Familiar Cases

While you might search for any ranked item (the third item, say), there are three special cases that are common enough to have been given names.

  • the Minimum
  • the Maximum
  • the Median

Minimum and Maximum

The Minimum and Maximum are probably self-explanatory, but to be complete: the Minimum is the order statistic when \(k = 1\) and the Maximum is the order statistic when \(k=n\). As an example of the Selection Problem requiring fewer comparisons than the Sorting Problem we can look at finding the Minimum by traversing the array.

\begin{algorithm}
\caption{Minimum}
\begin{algorithmic}
\INPUT An array of comparable items
\OUTPUT The smallest item
\PROCEDURE{Minimum}{$A$}
  \STATE minimum $\gets$ A[0]
  \FOR{$i \in \{ 1 \ldots A.length - 1\}$}
   \IF {minimum > A[i]}
    \STATE minimum $\gets$ A[i]
   \ENDIF
  \ENDFOR
\ENDPROCEDURE
\end{algorithmic}
\end{algorithm}

Since we're traversing the collection once it will always have a runtime of \(\Theta(n)\). The Maximum can be found the same way but with the greater than sign in the if-statement.

The Median

While most people know what the median is (I once worked with an engineer who thought it was the middle item of the unsorted items, not the item whose value was in the middle, so it might not be true that all people know what it is) but since we're talking about Order Statistics and not Statistics like in "How to Lie With Statistics" we'll make it a little more confusing by using a similar but not exactly the same definition.

\[ k = \left \lceil \frac{n}{2} \right \rceil \]

This definition comes from Levitin. In his description he defines the median as the value with half the remaining values less than it and half the remaining values above it, which suggests that he's defining the case where there are an odd number of items only.

CLRS gives a two-part definition with the odd case:

\[ k = \frac{n + 1}{2} \]

and the case where there are an even number of items broken up into two:

\[ \textbf{Lower Median} = \left \lfloor \frac{n + 1}{2} \right \rfloor \\ \textbf{Upper Median} = \left \lceil \frac{n + 1}{2} \right \rceil \\ \]

With their median being the Lower Median. Despite the differences if you run through some numbers you can see that they're basically the same (adding 1 and then taking the floor after dividing by 2 is the same as the ceiling of dividing by 2 without adding 1).

def ceiling(a: int, b: int) -> int:
    return -(a//-b)

for n in range(1, 10):
    print(f"n={n}: \tLevitin: {ceiling(n, 2)}\tLower Median: {(n + 1)//2}")
n=1:    Levitin: 1      Lower Median: 1
n=2:    Levitin: 1      Lower Median: 1
n=3:    Levitin: 2      Lower Median: 2
n=4:    Levitin: 2      Lower Median: 2
n=5:    Levitin: 3      Lower Median: 3
n=6:    Levitin: 3      Lower Median: 3
n=7:    Levitin: 4      Lower Median: 4
n=8:    Levitin: 4      Lower Median: 4
n=9:    Levitin: 5      Lower Median: 5

So we've defined the median but actually finding the median is a little more complicated than finding the minimum or maximum so I'll save that for later.

Sources

Heap!

Beginning

What's a Heap?

This is a post that looks at Heaps. According to Introduction To Algorithms (CLRS), a heap is an array that can be thought of as a nearly complete binary tree which satisfies the Heap Property. There are actually two heap properties, one for a Max Heap and one for a Min Heap.

  • Max-Heap Property: \(A[Parent(i)]) \ge A[i]\)
  • Min-Heap Property: \(A[Parent(i)]) \le A[i]\)

Which means that for any node in a MaxHeap, its value is greater than or equal to that of any of its children, and for any node in a MinHeap, the node's value is less than or equal to its children.

The heap has (at least) two attributes.

  • A.length: the number of elements in the array
  • A.heap-size: the number of elements in the heap (not all elements in the array need to be in the heap)

The first element in the heap is the root (for the Max-Heap it is the largest element, for the Min-Heap it is the smallest).

Some Functions

For any Node in the Heap located at i:

\begin{algorithm}
\caption{Parent}
\begin{algorithmic}
\INPUT The index of a Child Node
\OUTPUT The index of the Child's Parent
\PROCEDURE{Parent}{$i$}
 \RETURN {\(\left \lfloor \frac{i}{2} \right \rfloor\)}
\ENDPROCEDURE
\end{algorithmic}
\end{algorithm}
\begin{algorithm}
\caption{Left}
\begin{algorithmic}
\INPUT The index of a Parent Node
\OUTPUT The index of the Parent's left child node
\PROCEDURE{Left}{$i$}
 \RETURN 2i
\ENDPROCEDURE
\end{algorithmic}
\end{algorithm}

These Functions show the relationship between the location of a node and its children in the array that makes up the heap.

Some Requirements

There are also two requirements:

  • Shape Requirement: All levels but the last have to be complete (only the rightmost leaves can be incomplete)
  • Dominance Requirement: Every node is greater than or equal to its children (in the Max-Heap, less than or equal for the Min-Heap)

That Dominance Requirement is also covered by the Heap Property.

Heap Properties

These are from Introduction to the Design & Analysis of Algorithms (Levitin).

  1. There exists exactly one essentially complete binary tree with n nodes. It's a binary tree so its height is \(\lfloor \log_2 n\rfloor\).
  2. The root of a (Max) Heap always has the largest element.
  3. A node of a heap with all its descendants is also a heap.
  4. A heap implemented as an array with the heap's elements stored at \(1 \ldots n\) can be constructed using:
    • the parental node keys will be the first \(\lfloor \frac{n}{2} \rfloor\) elements while the leaf nodes will be in the last \(\lceil \frac{n}{2} \rceil\) positions
    • The childern of a parent node at position i will be in positions 2i and 2i + 1.
    • The parent of a child node at position i will be in position \(\lfloor \frac{i}{2}\rfloor\).

Property four is what was defined in the functions up above. Levitin also gives a slightly different variation on stating the heap property.

\[ H[i] \ge \max{H[2i], H[2i + 1]} \textrm{ for } i= 1, \ldots, \lfloor \frac{n}{2} \rfloor. \]

This is sort of an inversion of the Max-Heap Property stated above. While the Max-Heap Property was given as every node is less than or equal to its parent, this says that every parent is greater than or equal to its children.

The Max Heap

The Imports

# python
from collections.abc import MutableSequence
from functools import partial
from typing import Generator

import random

# pypi
from expects import contain_exactly, equal, expect, raise_error

# software under test
from bowling.data_structures.heap import MaxHeap

from graeae import EmbedHoloviews
SLUG = "max-heap"
path = f"files/posts/{SLUG}/"
Embed = partial(EmbedHoloviews, folder_path=path)

The Class

Imports

# pypi
# https://www.attrs.org/en/stable/index.html
from attrs import define

The Definition

Besides declaring the class definition, the MaxHeap will hold some constants to hopefully make the code easier to read.

@define
class MaxHeap:
    """Build and maintain a max-heap

    If you pass in the heap as a list pre-pend it with Infinity

    Otherwise use ~heap = MaxHeap.from_list(elements)~ to build it
    """
    INFINITY = float("inf")
    NEGATIVE_INFINITY = -INFINITY
    ROOT_NODE = 1

    heap: list = [INFINITY]
    _size: int = None

From List

This is a class method to create the list for the heap. The calculations for the locations in the array of parent and child nodes is easier if we have a 1-based list so I'll pad the list being passed in. Additionally, I'll set the first value to \(\infty\) so that the Heap Property will pass for the root node without needing any special consideration for what its parent is.

@classmethod
def from_list(cls, heap: list):
    """Builds a max-heap instance from the starter list

    Args:
     heap: list of elements to dump on the heap

    Returns:
     MaxHeap instance with the heap list added
    """
    return cls(heap = [cls.INFINITY] + heap)

The Heap Size

Since we padded the list holding the heap the length of the list will never be the same as the number of nodes in the heap. Additionally we'll sometimes manipulate things so things that were in the heap are later excluded but still in the list so the size property will help us to keep track of how big we think our heap is.

@property
def size(self) -> int:
    """The size of the max heap"""
    if self._size is None:
        self._size = len(self.heap) - 1
    return self._size

@size.setter
def size(self, new_size) -> int:
    """Set the size of the max heap

    Args:
     new_size: how much of the list is in the heap

    Raises:
     AssertionError if the size is out of bounds for the list
    """
    assert 0 <= new_size <= self.length
    self._size = new_size
    return

Length

This is the length of the array, not necessarily of the heap. This one's a little tricky, since the padding throws it off by one it would seem that it should be lowered by one, but if you use it to figure out the last index that will mess it up a little. Since we already have the size for the number of nodes I'll just pass the length of the list on and see what happens.

I was debating whether to use __len__ but I decided that this is really an internal measure and size is meant to be the attribute to use. I'm mostly keeping this around so that it matches the CLRS attributes.

@property
def length(self) -> int:
    """The size of the array for the heap

    Warning:
     This includes the padding at the beginning of the list
    """
    return len(self.heap)

Maximum

Since this is a max-heap the largest element is in the root, this just makes getting it more explicit.

@property
def maximum(self):
    """The value in the root node"""
    return self.heap[self.ROOT_NODE]

Finding the Parent, Left-Child, and Right-Child of a Node

These are the implementations of the functions at the start of the post.

def parent(self, node: int) -> int:
    """Find the parent of a node

    Args:
     node: the index of the node to check

    Returns:
     the index of the parent of the node
    """
    return node//2
def left_child(self, parent: int) -> int:
    """Find the left child of a parent

    Args:
     parent: the index of the parent node

    Returns:
     index of the left child of the parent
    """
    return 2 * parent
def right_child(self, parent: int) -> int:
    """Find the right child of a parent

    Args:
     parent: the index of the parent node

    Returns:
     index of the right child of the parent
    """
    return 2 * parent + 1

Heapify A Sub Tree

CLRS just calls this MaxHeapify. But then the bottoms-up heapification of the tree seemed more like it should be called heapify so I called it heapify_subtree to note that it starts at a specific node which might not be the root.

def heapify_subtree(self, node: int):
    """Heapify the tree rooted at the node

    Args:
     node: index of the node to compare to its descendants
    """
    left, right = self.left_child(node), self.right_child(node)
    largest = node

    if left <= self.size and self.heap[left] > self.heap[largest]:
        # the left child was larger than the current parent node
        largest = left

    if right <= self.size and self.heap[right] > self.heap[largest]:
        # the right child is larger than the left and the current parent
        largest = right

    if largest != node:
        # the current parent is out of place, swap it with the larger child
        self.heap[node], self.heap[largest] = (self.heap[largest],
                                               self.heap[node])

        # after the swap the item at "largest" is the value from the 
        # "node" we started with so try it again with this new location
        self.heapify_subtree(largest)
    return

The Call

This makes the MaxHeap callable and heapifies the entire heap using a bottoms-up construction.

def __call__(self):
    """Heapifies the heap

    Raises:
     AssertionError: something bad happened and the Heap Property failed
    """
    for parent in reversed(range(1, self.size//2 + 1)):
        self.heapify_subtree(parent)

    self.check_rep()
    return

Increase a Key

When we change the value of a node, if the value is higher than the previous value it might be in the wrong place in the heap (e.g. it might be bigger than its parent) so we need to traverse upward, swapping it with parents smaller than it, until we find where it should go. CLRS made it a requirement that the new value is larger than the old one, which makes sense in light of the name IncreaseKey, but it seems to me that you could just call it ChangeKey and use a conditional instead of raise an exception, but I'll stick with the error for now.

def increase_key(self, node, key):
    """Increase the node's value

    Args:
     node: index of node in heap to change
     key: new value for the node

    Raises:
     AssertionError if new value isn't larger than the previous value
    """
    assert key > self.heap[node], (f"{key} not greater than previous value {self.heap.node}")
    self.heap[node] = key

    while (node > self.ROOT_NODE and
           self.heap[self.parent(node)] < self.heap[node]):
        self.heap[node], self.heap[self.parent(node)] = (
            self.heap[self.parent(node)], self.heap[node])
        node = self.parent(node)
    return

Insert a Value

CLRS describes insert and increase_key as part of updating a priority queue, but Levitin's description of top-down heap construction seems to use them as an alternative way to create the heap. He describes this method of construction (top-down) as starting with an empty heap and repeatedly inserting elements from the original array until you have a heap.

def insert(self, key):
    """Insert the key into the heap

    Args:
     key: orderable item to insert into the heap
    """
    self.size += 1
    self.heap[self.size - 1] = self.NEGATIVE_INFINITY
    self.increase_key(self.size - 1, key)
    return

Check the Heap Property

This checks that the Heap Property holds for all the nodes.

def check_rep(self) -> None:
    """Checks the heap property

    Raises:
     AssertionError: the heap property has been violated
    """
    for node in range(1, self.size):
        assert self.heap[self.parent(node)] >= self.heap[node], (
            f"Parent node {self.parent(node)} = {self.heap[self.parent(node)]} "
            f"not >= {node}={self.heap[node]}")
    return

Get and Set Item

I threw these in because I kept forgetting that the heap is an attribute of the MaxHeap, but it's only for convenience.

def __getitem__(self, node: int):
    """Gets an item from the heap

    Args: 
     node: index of the heap to get the value
    """
    return self.heap[node]

def __setitem__(self, node, value):
    """Sets the value at the node in the heap

    Args:
     node: index of the heap to set the value
     value: what to set the location in the heap to
    """
    self.heap[node] = value
    return

The Tests

start = [10, 20, 5]
max_heap = MaxHeap.from_list(heap=start)

expect(max_heap.heap).to(equal([max_heap.INFINITY] + start))
expect(max_heap.size).to(equal(3))
expect(max_heap.length).to(equal(4))

expect(max_heap.parent(1)).to(equal(0))
expect(max_heap.parent(2)).to(equal(1))
expect(max_heap.parent(3)).to(equal(1))

expect(max_heap.left(1)).to(equal(2))
expect(max_heap.right(1)).to(equal(3))

def failure(): max_heap.check_rep()

expect(failure).to(raise_error(AssertionError))

expect(max_heap.maximum).to(equal(10))
start = [16, 4, 10, 14, 7, 9, 3, 2, 8, 1]
heap = MaxHeap.from_list(start)
expect(heap.maximum).to(equal(16))

heap.heapify_subtree(2)
expect(heap[2]).to(equal(14))

heap.heapify_subtree(1)
expect(heap.maximum).to(equal(16))
expect(heap[2]).to(equal(14))
expect(heap[4]).to(equal(8))
expect(heap[9]).to(equal(4))
start = [10, 20, 30, 40]
heap = MaxHeap.from_list(start)
heap()
expect(heap.maximum).to(equal(40))
start = [1, 2, 3, 4, 7, 8, 9, 10, 14, 16]
heap = MaxHeap.from_list(start)
expect(heap.maximum).to(equal(1))

heap()
expect(heap.maximum).to(equal(16))

Heap Sort

<<heapsort-imports>>

<<heap-sort>>

Imports

# pypi
# https://www.attrs.org/en/stable/index.html
from attrs import define

# this project
from bowling.data_structures.heap import MaxHeap

The Heap Sort Class

The HeapSort uses the fact that a Max Heap always has the largest element at the root and repeatedly puts the root at the end of the list then shrinks the heap so it doesn't include the value that was moved over.

@define
class HeapSort:
    """Sort using a heap

    Args:
     items: collection of items for the sort
    """
    items: list
    _heap: MaxHeap=None

    @property
    def heap(self) -> MaxHeap:
        """The heap of items"""
        if self._heap is None:
            self._heap = MaxHeap.from_list(self.items)
            self._heap()
        return self._heap

    @property
    def without_root(self) -> list:
        """The items without the root """
        return self.heap.heap[self.heap.ROOT_NODE:]

    def __call__(self):
        """sorts the items"""
        self.heap()
        for node in range(self.heap.size, 1, -1):
            self.heap.heap[self.heap.ROOT_NODE], self.heap.heap[node] = (
                self.heap.heap[node],
                self.heap.maximum)
            self.heap.size -= 1
            self.heap()
        return

The Tests

from bowling.sort.heap import HeapSort
k = 100
items = random.choices(range(k), k=k)
sorter = HeapSort(items.copy())

sorter()

items.sort()
expect(sorter.without_root).to(contain_exactly(*items))

A Priority Queue

Although some books mention that MinHeaps are used for priority queues, CLRS shows a MaxHeap version. This involves adding a couple of methods to the MaxHeap so there's no special class.

The Tests

items = [1, 2, 3]
heap = MaxHeap.from_list(items)
heap()

def failure(): heap.increase_key(2, 0)
expect(failure).to(raise_error(AssertionError))

heap.increase_key(2, 5)
expect(heap.maximum).to(equal(5))

items = [1, 2, 3, 4, 7, 8, 9, 10, 14, 16]
heap = MaxHeap.from_list(items)
heap()
heap.increase_key(9, 15)
expect(heap[heap.left(1)]).to(equal(15))

heap.insert(20)
expect(heap.size).to(equal(len(items) + 1))
expect(heap.maximum).to(equal(20))

Plotting

from networkx import Graph
import holoviews

graph = Graph()
for node in range(1, len(heap.heap)//2+ 1):
    if heap.left(node) < heap.length:
        graph.add_edge(heap.heap[node], heap.heap[heap.left(node)])
    if heap.right(node) < heap.length:
        graph.add_edge(heap.heap[node], heap.heap[heap.right(node)])
positions = networkx.drawing.nx_pydot.graphviz_layout(graph, prog="dot")

plot = holoviews.Graph.from_networkx(graph, positions)

output = Embed(plot=plot, file_name="heap-plot")()

Figure Missing

End

Quicksort

The Algorithm

I covered the Partition function in The Road To Partition and now I'll use it to implement a quicksort. This is the CLRS version.

\begin{algorithm}
\caption{QuickSort}
\begin{algorithmic}
\INPUT An array and left and right locations defining a subarray
\OUTPUT The subarray is sorted

\PROCEDURE{QuickSort}{A, left, right}

\IF {left < right} 

\STATE pivot $\gets$ \textsc{Partition}(A, left, right)
\STATE \textsc{QuickSort}(A, left, pivot - 1)
\STATE \textsc{QuickSort}(A, pivot + 1, right)

\ENDIF
\ENDPROCEDURE
\end{algorithmic}
\end{algorithm}

Implementing It

Some Imports

# python
from collections.abc import MutableSequence
from dataclasses import dataclass
from functools import partial
from math import log2

import random

# pypi
from expects import contain_exactly, equal, expect
from joblib import Parallel, delayed

import altair
import pandas

# this project
from bowling.sort.insertion import insertion_sort
from bowling.sort.merge import mergesort

# other monkey stuff
from graeae.visualization.altair_helpers import output_path, save_chart
from graeae import Timer

Some Setup

@dataclass
class PartitionOutput:
    """Keeps the output for the partition funcion"""
    pivot: int
    count: int

TIMER = Timer()

SLUG = "quicksort"
OUTPUT_PATH = output_path(SLUG)
save_it = partial(save_chart, output_path=OUTPUT_PATH)

The Partition

Even though I implemented it in that other post I didn't make it so it returns counts so we can estimate the runtime so I'll do that here.

def partition(collection: MutableSequence,
              left: int, right: int) -> PartitionOutput:
    """Partitions the collection around the last element

    Args:
     collection: the list to partition
     left: index of the first element in the sub-list to partition
     right: index of the last element in the sub-list to partition

    Returns:
     PartitionOutput(pivot, count)
    """
    count = 0
    pivot_element = collection[right]
    lower_bound = left - 1
    for upper_bound in range(left, right):
        count += 1
        if collection[upper_bound] <= pivot_element:
            lower_bound += 1
            (collection[lower_bound],
             collection[upper_bound]) = (collection[upper_bound],
                                         collection[lower_bound])
    pivot = lower_bound + 1
    (collection[pivot],
     collection[right]) = (collection[right],
                           collection[pivot])
    count += 1
    return PartitionOutput(pivot=pivot, count=count)

Some Checks

The First Example

start = [5, 7, 9, 4, 6]
test = start.copy()
expected = [5, 4, 6, 7, 9]
first_expected_pivot = 2

output = partition(test, 0, 4)

expect(output.pivot).to(equal(first_expected_pivot))
expect(test).to(contain_exactly(*expected))
expect(output.count).to(equal(len(test)))

And to make sure the sub-list works.

left, right = [100, 20], [999, 888, 777]
test = left + start.copy() + right

output = partition(test, 2, 6)

# all we did was shift the sub-list to spots to the right
expect(output.pivot).to(equal(first_expected_pivot + 2))

# only the sub-list should be partitioned
expect(test).to(contain_exactly(*(left + expected + right)))

# the count should match our sub-array
expect(output.count).to(equal(len(start)))

The Pivot Is the Biggest Element

start = [9, 6, 25, 4, 100]
test = start.copy()

output = partition(test, 0, 4)

# the pivot should be the last element
expect(output.pivot).to(equal(4))

# nothing changes in the list
expect(test).to(contain_exactly(*start))

# once again, count should match the size of the input
expect(output.count).to(equal(len(test)))

The QuickSort

def quicksort(collection: MutableSequence, left: int, right: int) -> int:
    """Recursive quicksort

    Args:
     collection: list to sort
     left: index of start of sub-list in collection to sort
     right: index of end of sub-list in collection to sort

    Returns:
     count of comparisons
    """
    count = 0
    if left < right:
        output = partition(collection, left, right)

        count += output.count
        count += quicksort(collection, left, output.pivot - 1)
        count += quicksort(collection, output.pivot + 1, right)
    return count

Check It Out

start = list(range(10))
items = start.copy()
random.shuffle(items)
length = len(items)

count = quicksort(items, 0, length-1)
print(f"count: {count}")
print(f"Theoretical Average: {length * log2(length):.2f}")
print(f"Theoretical Worst: {length**2}")
expect(items).to(contain_exactly(*start))
count: 37
Theoretical Average: 33.22
Theoretical Worst: 100

Plotting The Quicksort Runtimes

@dataclass
class QuicksortOutput:
    """Holds the output of the quicksort counts"""
    comparisons: int
    size: int


def quicksorter(collection: MutableSequence) -> QuicksortOutput:
    """runs the quicksort and outputs count and size of collection

    Args:
     collection: thing to sort

    Returns:
     QuicksortOutput(count, size)
    """
    size = len(collection)
    count = quicksort(collection, 0, size - 1)
    return QuicksortOutput(comparisons=count, size=size)

With Random Input

things_to_sort = [list(range(count)) for count in range(1, 10**5, 1000)]
for things in things_to_sort:
    random.shuffle(things)
with TIMER:
    quick_output = Parallel(n_jobs=-1)(
    delayed(quicksorter)(thing_to_sort)
        for thing_to_sort in things_to_sort)
Started: 2022-02-05 00:57:12.120397
Ended: 2022-02-05 00:57:14.397235
Elapsed: 0:00:02.276838
with TIMER:
    merge_output = Parallel(n_jobs=-1)(
    delayed(mergesort)(thing_to_sort)
        for thing_to_sort in things_to_sort)
Started: 2022-02-05 00:58:40.087042
Ended: 2022-02-05 00:58:42.326204
Elapsed: 0:00:02.239162

Note to future self: Pypy is much faster with the python inputs and much slower with numpy inputs.

counts = [output.comparisons for output in quick_output]
sizes = [output.size for output in quick_output]
frame = pandas.DataFrame({"Size": sizes, "QuickSort": counts})
frame["Merge Sort"] = [output for output in merge_output]

melted = frame.melt(id_vars=["Size"],
                    var_name="Sort Algorithm", value_name="Comparisons")

chart = altair.Chart(melted).mark_point().encode(
    x="Size",
    y="Comparisons",
    color="Sort Algorithm",
    tooltip=[altair.Tooltip("Size", format=","),
             altair.Tooltip("Comparisons", format=","),
             "Sort Algorithm"]
).properties(
    title="QuickSort vs Merge Sort",
    width=800,
    height=525,
)

save_it(chart, "quicksort-runtime")

Figure Missing

I originally had Insertion Sort in the plot too, but it does so poorly that it just squashes both the Merge Sort and Quick Sort runtimes to a flat line. This is kind of an interesting plot. Quick Sort does much, much better than Insertion Sort, but it still doesn't quite keep up with Merge Sort. The trade-off being that Quick Sort does its sorting in place while Merge Sort creates all these temporary copies.

Worst-Case Input

Remember that case in the the partition post where the last item (the pivot) was the largest item, and how it resulted in nothing being moved around? What if no matter what sub-array you picked, the last item was always the largest? In other words, what if it's already sorted?

For one thing with really big inputs the interpreter throws an error because you've made too many recursive calls, so that tells you that something bad is happening.

things_to_sort = [list(range(count)) for count in range(1, 10**3, 100)]
with TIMER:
    quick_output = Parallel(n_jobs=-1)(
    delayed(quicksorter)(thing_to_sort)
        for thing_to_sort in things_to_sort)
Started: 2022-02-05 01:06:19.614254
Ended: 2022-02-05 01:06:20.159874
Elapsed: 0:00:00.545620
with TIMER:
    merge_output = Parallel(n_jobs=-1)(
    delayed(mergesort)(thing_to_sort)
        for thing_to_sort in things_to_sort)
Started: 2022-02-05 01:06:23.592928
Ended: 2022-02-05 01:06:23.661150
Elapsed: 0:00:00.068222

Note to future self: Pypy is much faster with the python inputs and much slower with numpy inputs.

counts = [output.comparisons for output in quick_output]
sizes = [output.size for output in quick_output]
frame = pandas.DataFrame({"Size": sizes, "QuickSort": counts})
frame["Merge Sort"] = [output for output in merge_output]

melted = frame.melt(id_vars=["Size"],
                    var_name="Sort Algorithm", value_name="Comparisons")

chart = altair.Chart(melted).mark_point().encode(
    x="Size",
    y="Comparisons",
    color="Sort Algorithm",
    tooltip=[altair.Tooltip("Size", format=","),
             altair.Tooltip("Comparisons", format=","),
             "Sort Algorithm"]
).properties(
    title="QuickSort vs Merge Sort (Worst-Case)",
    width=800,
    height=525,
)

save_it(chart, "quicksort-runtime-worst")

Figure Missing

Looking At the Sort In Progress

def quicksort_tracer(collection: MutableSequence,
                     left: int, right: int, tracer: dict=None) -> int:
    """Recursive quicksort

    Args:
     collection: list to sort
     left: index of start of sub-list in collection to sort
     right: index of end of sub-list in collection to sort
     tracer: dict of element: list of locations

    Returns:
     tracer dict
    """
    if tracer is None:
        tracer = {element: [index] for index, element in enumerate(collection)}

    if left < right:
        output = partition(collection, left, right)

        quicksort_tracer(collection, left, output.pivot - 1, tracer)
        quicksort_tracer(collection, output.pivot + 1, right, tracer)

        for index, element in enumerate(collection):
            tracer[element].append(index)
    return tracer
size = 20
start = list(range(size))
inputs = start.copy()
inputs.reverse()

tracer = quicksort_tracer(inputs, 0, size - 1)

frame = pandas.DataFrame(tracer)
frame = frame.reset_index().rename(columns={"index": "Quicksort Call"})
melted = frame.melt(id_vars=["Quicksort Call"], var_name="Element", value_name="Location")

chart = altair.Chart(melted).mark_line().encode(
    x="Quicksort Call",
    y="Location",
    color="Element",
    tooltip=["Quicksort Call", "Location", "Element"]
).properties(
    title="Quicksort Trace (Reversed Input)",
    width=800,
    height=525,
)

save_it(chart, "quicksort-trace-backwards")

Figure Missing

size = 20
start = list(range(size))
inputs = start.copy()
random.shuffle(inputs)

tracer = quicksort_tracer(inputs, 0, size - 1)

frame = pandas.DataFrame(tracer)
frame = frame.reset_index().rename(columns={"index": "Quicksort Call"})
melted = frame.melt(id_vars=["Quicksort Call"], var_name="Element", value_name="Location")

chart = altair.Chart(melted).mark_line().encode(
    x="Quicksort Call",
    y="Location",
    color="Element",
    tooltip=["Quicksort Call", "Location", "Element"]
).properties(
    title="Quicksort Trace (Shuffled Input)",
    width=800,
    height=525,
)

save_it(chart, "quicksort-trace-shuffled")

Figure Missing

End

The Master Theorem

The General Divide and Conquer Recurrence

When we have a Divide and Conquer algorithm that we solve via recursion we can estimate the running time using the General Divide and Conquer Recurrence.

  • \(T(n)\) is the total runtime.
  • \(n\) is the size of the problem (the number of instances in the input).
  • \(a\) is the number of the sub-problems that we create that need to be solved.
  • \(b\) is the number of sub-problems we create (we'll mostly see cases where \(a=b=2\)).
  • \(f(n)\) is the time spent dividing the problem and later recombining the sub-problems.

Which we combine into this equation.

\[ T(n) = aT \left(\frac{n}{b} \right) + f(n) \]

The Master Theorem

In the case where \(f(n) \in \Theta \left (n^d \right)\) and \(d ≥ 0\) (so it's either 1 or a power of n), we can use the following conditional cases to estimate the runtime.

\[ T(n) \in \begin{cases} \Theta\left (n^d \right ) & \textrm{if }a < b^d \\ \Theta\left (n^d \log n \right ) & \textrm{if }a = b^d \\ \Theta\left (n^{\log_b a} \right) & \textrm{if }a > b^d \end{cases} \]

The Mergesort

Beginning

This is a look at the Mergesort, an example of the Divide and Conquer strategy for sorting collections.

Middle

The CLRS Algorithm

Our Mergesort repeatedly divides the given list into two parts and then recursively calls itself with the sub-problems and then takes advantage of our previously defined Merge function to combine the solutions together once we are down to one item to sort (\(p = r\)).

\begin{algorithm}
\caption{Mergesort}
\begin{algorithmic}
\INPUT An array and left and right locations of the subarray in the array
\OUTPUT The array in sorted order

\PROCEDURE{Mergesort}{$A, p, r$}
\IF {p < r}
         \STATE \textbf{Divide}
         \STATE $q \gets \left \lfloor \frac{p + r}{2} \right \rfloor$
         
         \STATE \\ \textbf{Conquer}
         \STATE \textsc{MergeSort}(A, p, q)
         \STATE \textsc{MergeSort}(A, q + 1, r)

         \STATE \\ \textbf{Combine}
         \STATE \textsc{Merge}(A, p, q, r)
\ENDIF
\ENDPROCEDURE
\end{algorithmic}
\end{algorithm}

Runtime

  • Divide: \(D(n) = \Theta(1)\)
  • Conquer: \(2T\left (\frac{n}{2}\right)\)
  • Combine: \(C(n) = \Theta(n)\)
\begin{align} T(n) &= 2T \left( \frac{n}{2} \right) + \Theta(1) + \Theta(n) \\ &= 2T \left( \frac{n}{2} \right) + \Theta(n) \\ &= \Theta(n \log_2 n) \end{align}

The Levitin

\begin{algorithm}
\caption{Levitin's Mergesort}
\begin{algorithmic}
\INPUT Array $A[0 \ldots n - 1]$ of orderable elements.
\OUTPUT Array $A[0 \ldots n - 1]$ sorted in non-decreasing order.

\PROCEDURE{MergeSort}{A}
\IF {n > 1}
         \STATE \textbf{Divide}
         \STATE Copy $A\left[0 \ldots \left\lfloor \frac{n}{2} \right\rfloor - 1 \right]$ to $B \left[0 \ldots \left\lfloor \frac{n}{2} \right\rfloor - 1 \right]$
         \STATE Copy $A \left[\left\lfloor \frac{n}{2}\right\rfloor \ldots n - 1 \right]$ to $C \left[0 \ldots \left\lfloor \frac{n}{2} \right\rfloor - 1 \right]$
         
         \STATE \\ \textbf{Conquer}
         \STATE \textsc{MergeSort}(B)
         \STATE \textsc{MergeSort}(C)

         \STATE \\ \textbf{Combine}
         \STATE \textsc{Merge}(B, C, A)
\ENDIF
\ENDPROCEDURE
\end{algorithmic}
\end{algorithm}

Implement It

# python
from collections.abc import MutableSequence, Sequence
from math import log2

import random

# pypi
from expects import contain_exactly, equal, expect
def merge(left_stack: Sequence,
          right_stack: Sequence,
          target: MutableSequence) -> int:
    """Merges values from left and right stacks into target collection

    Args:
     left_stack: sorted collection of items to merge
     right_stack: sorted collection of items to merge
     target: collection into which to merge the items

    Returns:
     count of basic operations
    """
    left_size, right_size = len(left_stack), len(right_stack)
    next_left = next_right = put_item_here = count = 0

    while next_left < left_size and next_right < right_size:
        count += 1
        if left_stack[next_left] <= right_stack[next_right]:
            target[put_item_here] = left_stack[next_left]
            next_left += 1
        else:
            target[put_item_here] = right_stack[next_right]
            next_right += 1

        put_item_here += 1

    if next_left == left_size and next_right < right_size:
        for stack_offset in range(left_size + right_size - put_item_here):
            count += 1
            target[put_item_here + stack_offset] = right_stack[next_right + stack_offset]
    elif next_left < left_size:
        for stack_offset in range(left_size + right_size - put_item_here):
            count += 1
            target[put_item_here + stack_offset] = left_stack[next_left + stack_offset]
    return count
def mergesort(collection: MutableSequence) -> int:
    """Sorts the collection using a recursive mergesort

    Args:
     collection: a mutable sequence

    Returns:
     runtime count
    """
    items = len(collection)
    count = 0

    if items > 1:
        middle = items//2
        left_stack = collection[:middle]
        right_stack = collection[middle:]
        assert len(left_stack) + len(right_stack) == items
        count += mergesort(left_stack)
        count += mergesort(right_stack)
        count += merge(left_stack, right_stack, collection)
    return count
size = 2**10
items = random.choices(list(range(size)), k=size)
starting = items.copy()
runtime = mergesort(items)
expect(runtime).to(equal(size * log2(size)))
expect(items).to(contain_exactly(*list(sorted(starting))))
print(f"{size * log2(size):,}")
print(f"{runtime:,}")
10,240.0
10,240

Comparing Sorts

# python
from functools import partial

# pypi
from numba import njit
from joblib import Parallel, delayed

from numpy.random import default_rng

import altair
import numpy
import pandas

# this project
from bowling.sort.insertion import insertion_sort

from graeae import Timer
from graeae.visualization.altair_helpers import output_path, save_chart
numba_random = default_rng(2022)
TIMER = Timer()

SLUG = "the-mergesort"
OUTPUT_PATH = output_path(SLUG)
save_it = partial(save_chart, output_path=OUTPUT_PATH)
def merger(thing_to_sort) -> tuple:
    """A thing to add the size of the input to the output

    Args:
     thing_to_sort: collection of items to sort

    Returns:
     number of things to sort, count of merges
    """
    return len(thing_to_sort), mergesort(thing_to_sort)
ninsertion = njit(insertion_sort)

things_to_sort = [numba_random.integers(low=0, high=count, size=count)
                  for count in range(1, 10**5 + 1, 1000)]
with TIMER:
    insertion_output = Parallel(n_jobs=-1)(
        delayed(ninsertion)(thing_to_sort)
        for thing_to_sort in things_to_sort)
Started: 2022-01-28 01:12:10.628102
Ended: 2022-01-28 01:12:22.681161
Elapsed: 0:00:12.053059
with TIMER:
    merge_output = Parallel(n_jobs=-1)(
        delayed(merger)(thing_to_sort)
        for thing_to_sort in things_to_sort)
Started: 2022-01-28 01:12:33.278574
Ended: 2022-01-28 01:12:38.850064
Elapsed: 0:00:05.571490
SIZE, COMPARISONS = 0, 1
unzipped = list(zip(*merge_output))
frame = pandas.DataFrame({"Size": unzipped[SIZE],
"Mergesort": unzipped[COMPARISONS]})

unzipped = list(zip(*insertion_output))
frame["Insertion Sort"] = unzipped[COMPARISONS]

melted = frame.melt(id_vars=["Size"],
                    value_vars=["Mergesort", "Insertion Sort"],
                    var_name="Sort Algorithm", value_name="Comparisons")
chart = altair.Chart(melted).mark_point().encode(
    x="Size",
    y="Comparisons",
    color="Sort Algorithm",
    tooltip=[altair.Tooltip("Size", format=","),
             altair.Tooltip("Comparisons", format=","),
             "Sort Algorithm"]
).properties(
    title="Insertion vs Merge Sort",
    width=800,
    height=525,
)

save_it(chart, "insertion-vs-merge")

Figure Missing

frame["nlog2(n)"] = frame["Size"] * numpy.log2(frame["Size"])
del(frame["Insertion Sort"])
melted = frame.melt(id_vars=["Size"],
                    value_vars=["Mergesort", "nlog2(n)"],
                    var_name="Source", value_name="Comparisons")

chart = altair.Chart(melted).mark_point().encode(
    x="Size",
    y="Comparisons",
    color="Source",
    tooltip=[altair.Tooltip("Size", format=","),
             altair.Tooltip("Comparisons", format=","),
             "Source"]
).properties(
    title="Merge Sort vs Theoretical",
    width=800,
    height=525,
)

save_it(chart, "merge-by-logn")

Figure Missing

Looking at the Mergesort

The version of mergesort I implemented above is based on Levitin's algorithm, which I thought was clearer, since the copying was happening outside of the merge. This makes it harder to plot the progress of the sort, though, so I'm going to use the CLRS algorithm here to get some kind of visualization (hopefully).

from bowling.sort.merge import merge_clrs
def mergesort_tracer(collection: MutableSequence, left: int, right: int,
                     tracer: dict=None) -> dict:
    """Sorts the collection using a recursive mergesort

    This uses the CLRS merge-sort to build dict of the locations of the
    elements as the merging is done

    Args:
     collection: a mutable sequence
     left: the index of the beginning element of the sub-array
     right: the index of the ending element of the sub-array
     tracer: the dictionary to track the indices of the items

    Returns:
     tracer updated with post-merge locations of the items in tracer
    """
    if tracer is None:
        tracer = {element: [index] for index, element in enumerate(collection)}

    if left < right:
        middle = (left + right)//2
        mergesort_tracer(collection, left, middle, tracer)
        mergesort_tracer(collection, middle + 1, right, tracer)
        merge_clrs(collection, left, middle, right)

        for index, element in enumerate(collection):
            tracer[element].append(index)
    return tracer

A Shuffle Input

elements = 20
start = list(range(elements))
inputs = start.copy()
random.shuffle(inputs)
tracer = mergesort_tracer(inputs, left=0, right=len(inputs) - 1)
expect(inputs).to(contain_exactly(*start))

frame = pandas.DataFrame(tracer)
frame = frame.reset_index().rename(columns={"index": "Merge"})
melted = frame.melt(id_vars=["Merge"], var_name="Element", value_name="Location")

chart = altair.Chart(melted).mark_line().encode(
    x=altair.X("Merge", axis=altair.Axis(tickMinStep=1)),
    y=altair.Y("Location", axis=altair.Axis(tickMinStep=1)),
    color=altair.Color("Element", legend=None),
    tooltip=["Merge", "Location", "Element"]
).properties(
    title="Mergesort Tracing",
    width=800,
    height=525
)

save_it(chart, "mergesort-trace")

Figure Missing

You can kind of see what's going on, every time you see a bunch of crossed lines it's because the merge moved items around… but maybe an artificial case will be clearer.

A Reversed Input

Let's use an input that's sorted but backwards to see the swaps more clearly.

elements = 20
start = list(range(elements))
inputs = start.copy()
inputs.reverse()
tracer = mergesort_tracer(inputs, left=0, right=len(inputs) - 1)
expect(inputs).to(contain_exactly(*start))

frame = pandas.DataFrame(tracer)
frame = frame.reset_index().rename(columns={"index": "Merge"})
melted = frame.melt(id_vars=["Merge"], var_name="Element", value_name="Location")

chart = altair.Chart(melted).mark_line().encode(
    x=altair.X("Merge", axis=altair.Axis(tickMinStep=1)),
    y=altair.Y("Location", axis=altair.Axis(tickMinStep=1)),
    color=altair.Color("Element", legend=None),
    tooltip=["Merge", "Location", "Element"]
).properties(
    title="Mergesort Tracing (From Backwards Source)",
    width=800,
    height=525
)

save_it(chart, "mergesort-trace-backwards")

Figure Missing

End

Divide and Conquer

Description

The Divide-and-Conquer approach involves three parts.

  • Divide: Break the problem up into sub-problems that are smaller instances of the same problem.
  • Conquer: If the problem is small enough solve it, otherwise recursively solve the sub-problem.
  • Combine: Combine the solutions to the sub-problems into a solution for the original problem.

The Merge

The Merge

This is an implementation of the merge portion of the merge-sort. It takes two sorted sections of a collection and merges them together in place.

CLRS

The precondition for this to work is that there are two sections within the array passed in to the algorithm that are already sorted and that they are located by the three index-values given to the algorithm. The first sorted section starts at p and ends at q in the array and the second sorted section starts at q + 1 and ends at r within the array.

This is the CLRS version with the indexing changed to start at 0.

\begin{algorithm}
\caption{Merge}
\begin{algorithmic}
\INPUT An array and left, middle, and right locations in the array
\REQUIRE Sub-arrays from $p$ to $q$ and from $q + 1$ to $r$ are sorted
\OUTPUT The array with the two sections collated in order

\PROCEDURE{Merge}{$A, p, q, r$}

\STATE \textbf{The sizes of the sub-sections}
\STATE $n_1 \gets q - p + 1$
\STATE $n_2 \gets r - q$

\STATE \\ \textbf{Copy the subsections to new arrays.}
\STATE \textit{New arrays have one extra cell to hold a sentinel.}
\STATE $L \gets Array[0\ldots n_1]$
\STATE $R \gets Array[0 \ldots n_2]$

\FOR {$i \in {0 \ldots n_1 - 1}$}
  \STATE $L[i] \gets A[p + i - 1]$
\ENDFOR

\FOR{$j \in {0 \ldots n_2 - 1}$}
  \STATE $R[j] \gets A[q + j]$
\ENDFOR

\STATE \\ \textbf{Add sentinel to indicate end}
\STATE $L[n_1] \gets \infty $
\STATE $R[n_2] \gets \infty $

\STATE \\ \textbf{Collate}
\STATE $i \gets 0$
\STATE $j \gets 0$

\FOR {$k \in {p \ldots r}$}
 \IF {$L[i] \leq R[j]$}
   \STATE $A[k] \gets L[i]$
   \STATE $i' \gets i + 1$
 \ELSE
  \STATE $A[k] \gets R[j]$
  \STATE $j' \gets j + 1$
\ENDIF
\ENDFOR
\ENDPROCEDURE
\end{algorithmic}
\end{algorithm}

One way to think about how the algorithm is like you have two stacks of cards, each of which is sorted and you want to merge them together in sorted order. Since they are already sorted, you only have to compare the top cards on the two stacks to each other, chose the lower card and put it in your output stack, and keep repeating this until you've moved all the cards from the two stacks to the output. If one stack runs out of cards you just move the other remaining cards onto the output stack.

Our algorithm starts by copying out the values from the source array into two new arrays to create our stacks. We append an \(\infty\) onto the end of each to simulate the emptying of the stack. Then we keep looking at the top item in each stack and copying the smaller item back into the original array. Because of the copying we are going over the input twice, but that is a relatively small (linear) increase.

Implement It

# python
from collections.abc import MutableSequence

# pypi
from expects import contain_exactly, equal, expect
INFINITY = float("inf")
def merge_clrs(collection: MutableSequence,
          left_start: int,
          left_end: int,
          right_end: int) -> int:
    """Merge the sub-sections from the collection

    Args:
     collection: list or array with sorted sub-sections
     left_start: index of start of first sub-section
     left_end: index of last item of first sub-section
     right_end: index of the last item of second sub-section
    """
    count = 0
    left_size = left_end - left_start + 1
    right_size = right_end - left_end
    right_start = left_end + 1

    left_stack = ([None] * left_size)
    right_stack = ([None] * right_size)

    for stack_location in range(left_size):
        left_stack[stack_location] = collection[left_start + stack_location]
        count += 1

    for stack_location in range(right_size):
        right_stack[stack_location] = collection[right_start + stack_location]
        count += 1

    left_stack.append(INFINITY)
    right_stack.append(INFINITY)

    next_left = next_right = 0

    for put_next_item_here in range(left_start, right_end + 1):
        count += 1
        if left_stack[next_left] <= right_stack[next_right]:
            collection[put_next_item_here] = left_stack[next_left]
            next_left += 1
        else:
            collection[put_next_item_here] = right_stack[next_right]
            next_right += 1
    return count
def merge_check(merger):
    first = list(range(5))
    second = first[:]
    collection = first + second

    count = merger(collection, 0, 4, 9)

    expect(count).to(equal(20))
    expect(collection).to(contain_exactly(0,0,1,1,2,2,3,3,4,4))

    collection = [10] + first + second
    count = merger(collection, 1, 5, 10)
    expect(count).to(equal(20))
    expect(collection[1:11]).to(contain_exactly(0,0,1,1,2,2,3,3,4,4))

    collection = [10] + first + second + [-1, 5]
    count = merger(collection, 1, 5, 10)

    expect(count).to(equal(20))
    expect(collection[1:11]).to(contain_exactly(0,0,1,1,2,2,3,3,4,4))
    return
merge_check(merge_clrs)

Runtime

Without doing anything fancy we can see that there's three for loops, the first two cover copying over all the sub-list items from the original list to the new lists, so together they execute once for every item (n times). And the loop that does the actual merge also runs once for each item so it also runs n times so altogether it has a run time of 2n which we'll say is \(\Theta(n)\). This is actually going to be part of the merge-sort but I thought I'd put that in here since the post is separate.

Levitin

This is the version given in Introduction to the Design & Analysis of Algorithms (Levitin) which I find a little clearer than the CLRS version. I generally prefer Levitin's versions, but, you know, CLRS is the one you have to have, so it's there too.

\begin{algorithm}
\caption{Merge}
\begin{algorithmic}
\INPUT $B[0 \ldots p-1]$, $C[0 \ldots q - 1]$, $A[0 \ldots p + q - 1]$
\REQUIRE Sub-arrays $B$ and $C$ are sorted
\OUTPUT Sorted array $A$ with the elements of $B$ and $C$.

\PROCEDURE{Merge}{$B, C, A$}

\STATE $i \gets 0$
\STATE $j \gets 0$
\STATE $k \gets 0$

\WHILE {$i < p$ and $j < q$}
 \IF {$B[i] \le C[j]$}
  \STATE $A[k] \gets B[i]$
  \STATE $i \gets i + 1$
 \ELSE
  \STATE $A[k] \gets C[j]$
  \STATE $j \gets j + 1$
 \ENDIF
 \STATE $k \gets k + 1$
\ENDWHILE

\IF {$i=p$}
 \STATE Copy $C[j \ldots q-1]$ to $A[k \ldots p + q - 1]$
\ELSE
 \STATE Copy $B[i \ldots p - 1]$ to $A[k \ldots p + q - 1]$
\ENDIF
\ENDPROCEDURE
\end{algorithmic}
\end{algorithm}

There are a couple of noticeable differences between Levitin's version and the CLRS version. The first is that the lists to merge are passed into the function rather than being separated inside the function, which makes the "Divide" step separate from the "Combine" step. Additionally, instead of adding a sentinel to the end of the stacks the conditional checks to see if one of them is empty and copies over the stack that isn't empty outside of the merge-loop. This adds an additional conditional check to the main loop but then takes away the conditional check when copying over the leftovers. I might give the CLRS version a point for being more concise in handling the leftovers for the case where left and right are different sizes.

Implement It

# python
from collections.abc import Sequence
def merge_levitin(left_stack: Sequence, right_stack: Sequence,
                  target: MutableSequence) -> int:
    """Merges values from left and right stacks into target collection

    Args:
     left_stack: sorted collection of items to merge
     right_stack: sorted collection of items to merge
     target: collection into which to merge the items

    Returns:
     count of basic operations
    """
    left_size, right_size = len(left_stack), len(right_stack)
    next_left = next_right = put_item_here = count = 0

    while next_left < left_size and next_right < right_size:
        count += 1
        if left_stack[next_left] <= right_stack[next_right]:
            target[put_item_here] = left_stack[next_left]
            next_left += 1
        else:
            target[put_item_here] = right_stack[next_right]
            next_right += 1

        put_item_here += 1

    if next_left == left_size and next_right < right_size:
        for stack_offset in range(left_size + right_size - put_item_here):
            count += 1
            target[put_item_here + stack_offset] = right_stack[next_right + stack_offset]
    elif next_left < left_size:
        for stack_offset in range(left_size + right_size - put_item_here):
            count += 1
            target[put_item_here + stack_offset] = left_stack[next_left + stack_offset]
    return count
first = list(range(5))
second = [item + index for index, item in enumerate(first)]
collection = [None] * (len(first) + len(second))

count = merge_levitin(first, second, collection)

expect(count).to(equal(10))
expect(collection).to(contain_exactly(0,0,1,2,2,3,4,4,6,8))

second = list(range(5))
first = [item + index for index, item in enumerate(second)]
collection = [None] * (len(first) + len(second))

count = merge_levitin(first, second, collection)

expect(count).to(equal(10))
expect(collection).to(contain_exactly(0,0,1,2,2,3,4,4,6,8))

Runtime

Since the dividing of the array is moved out of the merge the runtime for the merge is \(n\) so it's also \(\Theta(n)\).

A Hybrid

def merge(left_stack: Sequence, right_stack: Sequence,
          target: MutableSequence) -> int:
    """Merges values from left and right stacks into target collection

    Args:
     left_stack: sorted collection of items to merge
     right_stack: sorted collection of items to merge
     target: collection into which to merge the items

    Returns:
     count of basic operations
    """
    target_size = len(left_stack) + len(right_stack)

    # since we aren't copying the lists this can be kind of dangerous
    # passing in the same list more than once will append INFINITY each time
    left_stack.append(INFINITY)
    right_stack.append(INFINITY)
    next_left = next_right = count = 0

    for put_item_here in range(target_size):
        count += 1
        if left_stack[next_left] <= right_stack[next_right]:
            target[put_item_here] = left_stack[next_left]
            next_left += 1
        else:
            target[put_item_here] = right_stack[next_right]
            next_right += 1
    return count
first = list(range(5))
second = [item + index for index, item in enumerate(first)]
collection = [None] * (len(first) + len(second))

count = merge(first, second, collection)

expect(count).to(equal(10))
expect(collection).to(contain_exactly(0,0,1,2,2,3,4,4,6,8))

second = list(range(5))
first = [item + index for index, item in enumerate(second)]
collection = [None] * (len(first) + len(second))

count = merge(first, second, collection)

expect(count).to(equal(10))
expect(collection).to(contain_exactly(0,0,1,2,2,3,4,4,6,8))

End

Binary Search

Beginning

The Algorithm Parts

  • Precondition: The input array (A) is sorted in non-decreasing order and the key \(k\) is in \(A\).
  • Postcondition: The index of the matching element is output
  • Loop Invariant: The key is in our current sublist: \(k \in A[left \ldots right]\).
  • Basic Step: Find the midpoint of the sub-array, adjust the sub-array to use the midpoint such that the key is in the new sub-array.
  • Exit Condition: \(left \le right\).
  • Make Progress: After each loop the sublist is half the size of the previous sublist.
  • Maintain the Loop Invariant: Pick the half of the sublist whose boundary value would allow the key.
  • Establish the Loop Invariant: The initial sublist is the original list (\(left = 0, right=A.length -1\)).
  • Worst Case Runtime: Since the input is repeatedly halved, the worst-case is \(\Theta(\log_{2} n)\).

Rather than saying "if k is in A" over and over, the search precondition requires that \(k\) is in \(A\) and if it isn't it's considered a failed search - which doesn't mean the algorithm is incorrect, just that the pre-condition wasn't met. We have to handle the other case, but for the purpose of defining the algorithm we'll set a stricter pre-condition.

Implementation

Imports

# python
from collections import namedtuple
from collections.abc import Sequence
from functools import partial

import random

# pypi
from expects import be_false, be_none, equal, expect, raise_error
from joblib import Parallel, delayed

import altair
import numpy
import pandas

from graeae import Timer
from graeae.visualization.altair_helpers import output_path, save_chart

Set Up

TIMER = Timer()
OUTPUT_PATH = output_path("binary-search")
save_it = partial(save_chart, output_path=OUTPUT_PATH)

SearchOutput = namedtuple("SearchOutput", ["index", "count"])

Binary Search One

The Pseudocode

\begin{algorithm}
\caption{BinarySearch}
\begin{algorithmic}
\INPUT Array A of items in non-decreasing order, search key
\OUTPUT If key is in array, the index of the item that matches
\PROCEDURE{BinarySearch}{A, key}
\STATE left $\gets$ 0
\STATE right $\gets$ A.length - 1

\WHILE {right > left}
\STATE middle $\gets \left \lfloor \frac{\textrm{left} + \textrm{right}}{2} \right \rfloor$

\IF {key $\le$ A[middle]}
\STATE right $\gets$ middle
\ELSE
\STATE left $\gets$ middle + 1
\ENDIF
\ENDWHILE

\IF {key = A[left]}
\RETURN left
\ELSE
\RETURN NotFound
\ENDIF
\ENDPROCEDURE
\end{algorithmic}
\end{algorithm}

The Function

def binary_search(element: int, elements: Sequence) -> SearchOutput:
    """Does an iterative binary search for the key

    Args:
     element: item in the source to search for
     elements: sorted (non-decreasing) collection with the key

    Returns:
     SearchOutput(index of the key in the source or None if not found, count of comparisons)
    """
    left, right = 0, len(elements) - 1
    count = 0

    if not len(elements) or not (elements[left] <= element <= elements[right]):
        return (None, 1)

    while right > left:
        assert elements[left] <= element <= elements[right]
        count += 1
        middle = (left + right)//2

        left, right = ((left, middle) if element <= elements[middle]
                       else (middle + 1, right))

    location = left if (elements[left] == element) else None
    return (location, count)
def test_it(searcher):
    test_case = list(range(11))
    expect(searcher(element=5, elements=test_case)[0]).to(equal(5))

    items = list(sorted(random.sample(range(100), k=50)))

    for expected, item in enumerate(items):
        expect(searcher(element=item, elements=items)[0]).to(equal(expected))

    expect(searcher(element=-5, elements=items)[0]).to(be_none)

    last = items[-1]
    items[-1] = last + 100
    expect(searcher(101, items)[0]).to(be_none)

    expect(searcher(5, [])[0]).to(be_none)

    expect(searcher(5, [5])[0]).to(equal(0))
    return

test_it(binary_search)

Levitin's Version

The version I entered above is one that I found on the web (PDF Lecture Notes), and is roughly what CLRS has. The one in Levitin's Book is clearer to me but has one more comparison.

The Pseudocode

\begin{algorithm}
\caption{BinarySearchTwo}
\begin{algorithmic}
\INPUT Array A of items in non-decreasing order, search key in A
\OUTPUT The index of the item that matches key
\PROCEDURE{BinarySearchTwo}{A, key}
\STATE left $\gets$ 0
\STATE right $\gets$ A.length - 1

\WHILE {left $\le$ right}
\STATE middle $\gets \left \lfloor \frac{\textrm{left} + \textrm{right}}{2} \right \rfloor$

\IF {key = A[middle]}
  \RETURN middle
\ELIF {key < A[middle]}
\STATE right $\gets$ middle - 1
\ELSE
\STATE left $\gets$ middle + 1
\ENDIF
\ENDWHILE

\IF {key = A[left]}
\RETURN left
\ELSE
\RETURN NotFound
\ENDIF
\ENDPROCEDURE
\end{algorithmic}
\end{algorithm}

The Function

def binary_search_two(element, elements) -> SearchOutput:
    """Iterative Binary Search

    Args:
     element: item in the source
     elements: sorted collection to search

    Returns:
     index where key is found in the source or False if not found

    Raises:
     AssertionError: key is out of bounds for the source
    """
    left, right = 0, len(elements) - 1
    count = 0
    location = None
    while left <= right and location is None:
        middle = (left + right)//2

        if element == elements[middle]:
            location = middle

        left, right = ((left, middle - 1) if element <= elements[middle] else
                       (middle + 1, right))
    return (location, count)
test_it(binary_search_two)

Recursive Version

Although it is pretty straightforward as an iterative function, divide and conquer lends itself to recursion so, just for completeness, here's a recursive version of the binary search.

The Pseudocode

\begin{algorithm}
\caption{Recursive Binary Search}
\begin{algorithmic}
\REQUIRE Input Array is in non-decreasing order
\INPUT Array A , search key, left and right indices to limit the search
\OUTPUT The index of the item that matches the key
\PROCEDURE{BinarySearchRecursive}{A, key, left, right}

\IF {left > right}
  \RETURN NotFound
\ENDIF

\STATE middle $\gets \left \lfloor \frac{\textrm{left} + \textrm{right}}{2} \right \rfloor$

\IF {key = A[middle]}
 \RETURN middle
\ELIF {key < A[middle]}
 \STATE right $\gets$ middle - 1
\ELSE
 \STATE left $\gets$ middle + 1
\ENDIF

\RETURN \textsc{BinarySearchRecursive}(elements, key, left, right)

\ENDPROCEDURE
\end{algorithmic}
\end{algorithm}

The Implementation

def recursive_binary_search(elements: Sequence, key: int,
                            left: int, right: int,
                            count: int=0):
    """Recursive binary search

    Args:
     elements: sorted sequence with element in it
     key: item in elements to search for
     left: left index of the current sub-list to search
     right: right index of the current sub-list to search
     count: number of times we run the comparison (for plotting)

    Returns:
     (index, count) - index of the element in the elements and the comparison count
    """
    count += 1
    if left > right:
        # we missed it, the element isn't in the elements
        return None, count

    middle = (left + right)//2

    if elements[middle] == key:
        # we found it
        return middle, count

    # move one of the boundaries to the middle
    if key < elements[middle]:
        right = middle - 1
    else:
        left = middle + 1
    return recursive_binary_search(elements, key, left, right, count)

This is a helper to get the recursive call started and to handle empty lists or search terms that are outside of the range of the list.

def search(element: int, elements: Sequence) -> tuple:
    """calls the recursive binary search

    Args:
     element: an element in source to search for
     elements: sorted sequence of items     
    """
    left, right = 0, len(elements) - 1

    if not len(elements) or  not elements[left] <= element <= elements[right]:
        return (None, 1)
    return recursive_binary_search(elements, element, left, right)
test_it(search)

Some Plotting

Left and Right

Let's look at how the search updates the left and right boundaries. First we'll need a function that records the locations.

def binary_search_points(key, source) -> tuple:
    """Iterative Binary Search

    Args:
     key: item in the source
     source: sorted collection to search

    Returns:
     tuple of left-right locations

    Raises:
     AssertionError: key is out of bounds for the source
    """
    left, right = 0, len(source) - 1
    lefts = [left]
    rights = [right]

    while right > left:
        assert source[left] <= key <= source[right]
        middle = (left + right)//2

        (left, right) = ((left, middle) if key <= source[middle]
                         else (middle + 1, right))
        lefts.append(left)
        rights.append(right)
    return lefts, rights

Now we'll plot it.

items = list(range(101))
key = items[24]
lefts, rights = binary_search_points(key, items)

data = pandas.DataFrame(dict(Left=lefts, Right=rights, Split=list(range(len(lefts)))))

melted = data.melt(id_vars=["Split"], value_vars=["Left", "Right"],
                   var_name="left_or_right",
                   value_name="Index")
base = altair.Chart(melted)

lines = base.mark_line().encode(
    x="Split:O",
    y="Index",
    color="left_or_right"
)

points = base.mark_point().encode(
    x="Split:O",
    y="Index",
    color="left_or_right"
)

chart = (lines + points).properties(
    title="Binary Search Left-Right Boundaries",
    width=800,
    height=550
).interactive()

save_it(chart=chart, name="binary-search")

Figure Missing

So, it isn't really so pretty as with the sorting plots. As the plot confirms, the left and right slowly narrow to find the item in the list.

Runtime

Let's see how the number of loops goes up with the size of the search space.

EXPONENT = 5
numba_search = njit(binary_search)

sizes = tuple(range(1, 10**EXPONENT + 1, 1000))
random_source = [numpy.sort(random.integers(low=0, high=count, size=count))
                    for count in sizes]
random_things = [(random.choice(elements), elements)
                    for elements in random_source]

worst_things = [(elements[0], elements) for elements in random_source]

with TIMER:
    random_output = Parallel(n_jobs=-1)(
        delayed(numba_search)(element, elements)
        for (element, elements) in random_things)

    worst_output = Parallel(n_jobs=-1)(
        delayed(numba_search)(element, elements)
        for (element, elements) in worst_things)
Started: 2022-01-16 21:33:07.668315
Ended: 2022-01-16 21:33:08.905390
Elapsed: 0:00:01.237075
data = pandas.DataFrame(dict(
    Count=sizes,
    Random=[output[1] for output in random_output],
    First=[output[1] for output in worst_output]
))

melted = data.melt(id_vars=["Count"], value_vars=["Random", "First"],
                   var_name="Location", value_name="Bisections")

theoretical = pandas.DataFrame(dict(Count=sizes, Theoretical=numpy.log2(sizes)))

Now, to plot.

points = altair.Chart(melted).mark_point().encode(
    x="Count",
    y="Bisections",
    color="Location")

line = altair.Chart(theoretical).mark_line().encode(
    x="Count",
    y="Theoretical",
    tooltip=[altair.Tooltip("Count", format=","),
             altair.Tooltip("Theoretical", format=".2f")],
)
chart = (line + points).properties(
    title="Binary Search Bisections",
    width=800,
    height=525,
).interactive()

save_chart(chart=chart, name="binary-search-comparisons",
           output_path=OUTPUT_PATH)

Figure Missing

See Also