# from pythonfrom__future__importannotationsfromfunctoolsimportpartialfromcollectionsimportnamedtupleimportmathimportrandomimportsys# from pypifromjoblibimportParallel,delayedfromexpectsimport(be_a,contain_exactly,equal,expect,raise_error)importaltairimportpandasfromgraeaeimportTimerfromgraeae.visualization.altair_helpersimportoutput_path,save_chart
Grade-School multiplication is how most of us are taught to multiply numbers with more than one digit each. Each digit in one number is multiplied by each digit in the other to create a partial product. Once we've gone through all the digits in the first number we sum up all the partial products we calculated to get our final answer.
\begin{algorithm}
\caption{Grade-School}
\begin{algorithmic}
\REQUIRE The input arrays are of the same length ($n$)
\INPUT Two arrays representing digits in integers ($a, b$)
\OUTPUT The product of the inputs
\PROCEDURE{GradeSchool}{$number_1, number_2$}
\FOR {$j \in \{0 \ldots n - 1\}$}
\STATE $carry \gets 0$
\FOR {$i \in \{0 \ldots n - 1\}$}
\STATE $product \gets a[i] \times b[j] + carry$
\STATE $partial[j][i + j] \gets product \bmod 10$
\STATE $carry \gets product/10$
\ENDFOR
\STATE $partial[j][n + j] \gets carry$
\ENDFOR
\STATE $carry \gets 0$
\FOR {$i \in \{0 \ldots 2n - 1\}$}
\STATE $sum \gets carry$
\FOR {$j \in \{0 \ldots n - 1\}$}
\STATE $sum \gets sum + partial[j][i]$
\ENDFOR
\STATE $result[i] \gets sum \bmod 10$
\STATE $carry \gets sum/10$
\ENDFOR
\STATE $result[2n] \gets carry$
\RETURN result
\ENDPROCEDURE
\end{algorithmic}
\end{algorithm}
Karatsuba Multiplication
Karatsuba Multiplication improves on the Grade-School algorithm using a trick from Frederick Gauss, which is a little too much of a diversion. With all these improvements in multiplication it seems like you need a background in number theory that not all Computer Scientist's have (or at least I don't). Maybe take it on faith for now.
This is a mashup of the Wikipedia version and the Algorithms Illuminated version. It's a little tricky in that we're dealing with integers, in theory, but we have to know the number of digits and how to split it up so in code we're going to have to work with a collection instead, but hopefully this will be clearer in code.
An IntList
classIntegerDigits:"""A hybrid integer and digits list Args: integer: the number to store padding: number of 0's to add to the left of the digits """def__init__(self,integer:int)->None:self.integer=integerself._digits=Nonereturn@propertydefdigits(self)->list:"""The digits for the given integer Raises: ValueError if the given integer isn't really an integer Returns: zero-padded list of digits """ifself._digitsisNone:digits=[int(digit)fordigitinstr(self.integer)]length=len(digits)power_of_two=2**math.ceil(math.log2(length))padding=power_of_two-lengthself._digits=[0]*padding+digitsreturnself._digitsdefadd_padding(self,padding:int)->None:"""Add more zeros to the left of the digits Args: padding: number of zeros to add to the left of the digits """self._digits=[0]*padding+self.digitsreturndefset_length(self,target:int)->None:"""Set the total length of the digit list Args: target: total number of digits to have Raises: RuntimeError: target is less than the current number of digits """iftarget<len(self):raiseRuntimeError(f"target {target} is less than current {len(self.digits)} digits")padding=target-len(self)self.add_padding(padding)returndefset_equal_length(self,other:IntegerDigits)->None:"""Set both self and other to have the same number of digits"""target=max(len(self),len(other))self.set_length(target)other.set_length(target)returndefreset(self)->None:"""Clean out any generated attributes"""self._digits=Nonereturn# collection methodsdef__len__(self)->int:"""The number of digits"""returnlen(self.digits)def__getitem__(self,key)->IntegerDigits:"""Slice the digits"""sliced=self.digits[key]iftype(sliced)isint:sliced=[sliced]gotten=IntegerDigits(sum((value*10**(len(sliced)-1-index)forindex,valueinenumerate(sliced))))# preserve any paddinggotten._digits=slicedreturngotten# integer operationsdef__add__(self,value)->IntegerDigits:"""Add an integer or IntegerDigits to this integer"""returnIntegerDigits(self.integer+valueiftype(value)isintelseself.integer+value.integer)def__sub__(self,value)->IntegerDigits:"""Subtract an integer or IntegerDigits from this integer"""returnIntegerDigits(self.integer-valueiftype(value)isintelseself.integer-value.integer)def__mul__(self,value)->IntegerDigits:"""multiply integer by integer or IntegerDigits"""returnIntegerDigits(self.integer*valueiftype(value)isintelseself.integer*value.integer)# comparisonsdef__eq__(self,other)->bool:"""Compare to integer or IntegerDigits"""returnother==self.integerdef__lt__(self,other)->bool:returnself.integer<otherdef__gt__(self,other)->bool:returnself.integer>otherdef__ge__(self,other)->bool:returnself.integer>=otherdef__repr__(self)->str:returnf"<IntegerDigits: {self.integer}>"
Test it
test=IntegerDigits(567)# build the digits padded to power of 2expect(len(test.digits)).to(equal(4))# implement the length dunder methodexpect(len(test)).to(equal(4))# add slicingexpect(test[0]).to(equal(0))expect(test[-1]).to(equal(7))expect(test[:2].digits).to(contain_exactly(0,5))# multiplicationproduct=test*2expect(product.integer).to(equal(567*2))test_2=IntegerDigits(2)expect(len(test_2)).to(equal(1))product=test*test_2expect(product.integer).to(equal(2*567))# additionsum_=test+10expect(sum_.integer).to(equal(577))sum_=test+test_2expect(sum_.integer).to(equal(569))# subtractiondifference=test-20expect(difference.integer).to(equal(547))difference=test_2-testexpect(difference.integer).to(equal(-565))
An Implementation
Karatsuba Multiplication
defkaratsuba(integer_1:IntegerDigits,integer_2:IntegerDigits)->MultiplicationOutput:"""Multiply integer_1 and integer_2 Args: integer_1, integer_2: arrays with equal number of digits Returns: product of the integers, count """digits=len(integer_1)ifdigits==1:returnMultiplicationOutput(integer_1*integer_2,1)middle=digits//2most_significant_1,least_significant_1=integer_1[:middle],integer_1[middle:]most_significant_2,least_significant_2=integer_2[:middle],integer_2[middle:]most_plus_least_1=most_significant_1+least_significant_1most_plus_least_2=most_significant_2+least_significant_2# a hack to keep them the same number of digits after the additionmost_plus_least_1.set_equal_length(most_plus_least_2)left,count_left=karatsuba(most_significant_1,most_significant_2)summed,count_summed=karatsuba(most_plus_least_1,most_plus_least_2)right,count_right=karatsuba(least_significant_1,least_significant_2)center=summed-left-rightoutput=left*10**digits+center*10**middle+rightifoutput<0:raiseRuntimeError(f"left: {left} center: {center} right: {right}")returnMultiplicationOutput(output,count_left+count_summed+count_right)
defkaratsuba_multiplication(integer_1:int,integer_2:int,count_padding:bool=True)->PlotMultiplicationOutput:"""Sets up and runs the Karatsuba Multiplication Args: integer_1, integer_2: the two values to multiply count_padding: whether the digit count should include the padding Returns: product, count, digits """assertinteger_1>=0assertinteger_2>=0integer_1=IntegerDigits(integer_1)integer_2=IntegerDigits(integer_2)ifnotcount_padding:forindex,digitinenumerate(integer_1.digits):ifdigit>0:original_1=len(integer_1.digits[index:])breakforindex,digitinenumerate(integer_2.digits):ifdigit>0:original_2=len(integer_2.digits[index:])breakoriginal_digits=max(original_1,original_2)# make them have the same number of digitsinteger_1.set_equal_length(integer_2)ifcount_padding:original_digits=len(integer_1)output=karatsuba(integer_1,integer_2)returnPlotMultiplicationOutput(product=output.product,count=output.count,digits=original_digits,factor_1=integer_1.integer,factor_2=integer_2.integer)
Let's use the Master Method to find the theoretical upper bound for Karatsuba Multiplication.
The basic form of the Master Method is this:
\[ T(n) = a T(\frac{n}{b}) + O(n^d) \]
Variable
Description
Value
\(a\)
Recursive calls within the function
3
\(b\)
Amount the input is split up
2
\(d\)
Exponent for the work done outside of the recursion
1
We make three recursive calls within the Karatsuba function and split the data in half within each call. The amount of work done outside the recursion is constant so \(O\left(n^d\right) = O\left(n^1\right)\). \(a > b^d\) so we have the case where the sub-problems grow faster than the input is reduced, giving us:
frame=pandas.DataFrame.from_dict({"Karatsuba Count":[output.countforoutputinkaratsuba_outputs],"Digits":[output.digitsforoutputinkaratsuba_outputs],"digits^log2(3)":[output.digits**(math.log2(3))foroutputinkaratsuba_outputs],"6 x digits^log2(3)":[6*output.digits**(math.log2(3))foroutputinkaratsuba_outputs]})melted=frame.melt(id_vars=["Digits"],value_vars=["Karatsuba Count","digits^log2(3)","6 x digits^log2(3)"],var_name="Source",value_name="Multiplications")chart=altair.Chart(melted).mark_line(point=altair.OverlayMarkDef()).encode(x="Digits",y="Multiplications",color="Source",tooltip=["Digits",altair.Tooltip("Multiplications",format=",")]).properties(title="Basic Multiplications vs Digits (with Padding)",width=800,height=525)save_it(chart,"karatsuba-multiplications")
Since when I added the padding I made sure that the number of digits was a power of two, the numbers are bunched up around those powers of two (so there's a lot of wasted computation, maybe) but the multiplication counts still fall within a constant multiple of our theoretical runtime.
Without Padding
Since I didn't make the karatsuba work without padding this will just show the points spaced out, but the counts will still be based on there being padding.
frame=pandas.DataFrame.from_dict({"Karatsuba Count":[output.countforoutputinunpadded_outputs],"Digits (pre-padding)":[output.digitsforoutputinunpadded_outputs],"digits^log2(3)":[output.digits**(math.log2(3))foroutputinkaratsuba_outputs],"6 x digits^log2(3)":[6*output.digits**(math.log2(3))foroutputinkaratsuba_outputs],"6 x digits^log2(3) (no padding)":[6*output.digits**(math.log2(3))foroutputinunpadded_outputs],"n^2 (no padding)":[output.digits**2foroutputinunpadded_outputs],})melted=frame.melt(id_vars=["Digits (pre-padding)"],value_vars=["Karatsuba Count","digits^log2(3)","6 x digits^log2(3)","6 x digits^log2(3) (no padding)","n^2 (no padding)"],var_name="Source",value_name="Multiplications")chart=altair.Chart(melted).mark_line().encode(x="Digits (pre-padding)",y="Multiplications",color="Source",tooltip=[altair.Tooltip("Digits (pre-padding)",type="quantitative"),altair.Tooltip("Multiplications",format=",")]).properties(title="Basic Multiplications vs Digits (without Padding)",width=800,height=525)save_it(chart,"karatsuba-multiplications-unpadded")
Since I don't have an easy way to turn off using padding the Multiplication counts are still based on using padding, but this view spreads the digit-counts out so it's a little easier to see. The Multiplication counts are broken up into bands because the padding is based on keeping the number of digits a power of two.
Just for reference, here's the last product we multiplied.
# pythonfromcollectionsimportnamedtuplefromdatetimeimportdatetimefromfunctoolsimportpartialfrommathimportinfasinfinityfrommathimportlog2fromtypingimportSequence,Tupleimportrandom# pypiimportaltairimportpandas# my stufffromgraeaeimportTimerfromgraeae.visualization.altair_helpersimportoutput_path,save_chart
The motivation given by the text is that we have some data that varies over time and we want to find the pair of points that maximizes the gain from the first point to the second.
The first way we're going to find the maximum gain is using Brute Force - we're going to calculate the gain for each pair of points and picking the pair with the best gain. Since this involves checking every pair we're essentially looking at a Handshake Problem so the number of comparisons should be \(\frac{n(n - 1)}{2}\) or \(\Theta(n^2)\).
defbrute_force_maximum_subarray(source:Sequence)->Output:"""Finds the subarray with maximum gain Args: source: sequence with sub-array to maximize Returns: (left-max-index, right-max-index, gain, count) """best_left=best_right=Nonebest_gain=-infinitycount=0forleftinrange(len(source)-1):forrightinrange(left+1,len(source)):count+=1gain=source[right]-source[left]ifgain>best_gain:best_left,best_right,best_gain=left,right,gainreturnOutput(best_left,best_right,best_gain,count)
Looking at the brute-force solution you might think - "That was a check of point-pairs, why is this called 'max-subarray'?". Well, we're going to improve upon the brute-force implementation by doing a data-transformation. If you look at the data that we set up you might notice that there's the point values and then there's the change in value from one point to another. The changes are there because we can re-think our problem not as the maximum of increases between pair points, but instead as the maximum of point-change-sums.
In our CASE_ONE inputs, the biggest gain came from point 7 (with a value of 63) and point 11 (with a value of 106). If we look at the difference between consecutive points from points 7 to 11 we get -23, 18, 20, -7, 12 which sums to 20, which is the maximum sub-array total for this data set. To solve this problem we're going to need to find this sub-array. Since we stated that we're going to use a divide-and-conquer approach we know that we're going to repeatedly break the inputs up into (in this case) two separate sub-sets to solve. Every time we break the inputs into two groups we end up with three possibilities as to where the maximum sub-array lies:
in the left half
in the right half
crossing from the left half over the split to the right half
Max-Crossing-Subarray
This is a function that will find the best subarray in the left and in the right halves of the sub-array and combines them to find the gain across the mid-point of the subarray.
deffind_max_crossing_subarray(source:Sequence,low:int,mid:int,high:int)->Output:"""Find the max subarray that crosses the mid-point Args: source: the array to search for the sub-sequence in low: the lower-bound for the indices to search within mid: the mid-index between low and high high: the upper-bound for the indices to search within Returns: left-index, right-index, gain, count """count,best_left,best_right=0,None,Noneleft_gain=right_gain=-infinitygain=0forleftinrange(mid,low,-1):count+=1gain+=source[left]ifgain>left_gain:left_gain=gainbest_left=leftgain=0forrightinrange(mid+1,high):count+=1gain+=source[right]ifgain>right_gain:right_gain=gainbest_right=rightreturnOutput(best_left,best_right,left_gain+right_gain,count)
Max-Subarray
The find_maximum_subarray function finds the start and end indices for the best gain.
deffind_maximum_subarray(source:Sequence,low:int,high:int)->Output:"""Find the sub-array that maximizes gain Args: source: sequence to maximize low: lower-bound for indices in source high: upper-bound for indices in source Returns: left-index, right-index, gain, count """ifhigh==low:start,end,gain,count=low,high,source[low],1else:mid=(low+high)//2left=find_maximum_subarray(source,low,mid)right=find_maximum_subarray(source,mid+1,high)cross_mid=find_max_crossing_subarray(source,low,mid,high)count=left.runtime+right.runtime+cross_mid.runtimebest_gain=max(left.gain,right.gain,cross_mid.gain)ifleft.gain==best_gain:start,end,gain,count=left.start,left.end,left.gain,countelifright.gain==best_gain:start,end,gain,count=right.start,right.end,right.gain,countelse:start,end,gain,count=(cross_mid.start,cross_mid.end,cross_mid.gain,count)returnOutput(start,end,gain,count)
The maximum_subarray converts our list of values to a list of changes between consecutive values so that we can use our divide-and-conquer function.
defmaximum_subarray(source:Sequence)->Output:"""Finds the sub-array with maximum gain Args: source: array to maximize Returns: left-index, right-index, gain, count """start,end=0,len(source)-1changes=[source[index+1]-source[index]forindexinrange(end)]output=find_maximum_subarray(changes,start,end-1)# our 'changes' has one fewer entry than the original list so up the right index by 1end=output.end+1returnOutput(output.start,end,output.gain,output.runtime)
n = 17
left: 7, right: 11, gain: 43
count = 50
n(n-1)/2 = 136
n log n : 69.49
n^2 = 289
By transforming the problem to one that lets us use divide-and-conquer we reduced the number of comparisons from 136 to 50. More generally, if we look at find_maximum_subarray we see that we're splitting the input in half before each of the recursive calls so we're going to make \(log_2 n\) splits and at each level of the recursions tree we're going to have \(2n\) inputs (left and right are each \(\frac{1}{2} n\) and cross_mid uses \(n\)) so we're going from \(\Theta(n^2)\) for the brute-force verios to \(\Theta(n \log n)\) for the divide-and-conquer version.
Alternate Version
Let's try another version that uses the idea of summing the changes between consecutive points but doesn't use recursion (see Wikipedia: Kardane's algorithm).
defmax_subarray_2(source:Sequence)->Output:"""Gets the maximal subarray This is an alternate version that doesn't use recursion or brute-force Args: source: sequence to maximize Returns: left-index, right-index, gain, count """count=1best_total=-infinitybest_start=best_end=0current_total=0changes=[source[index+1]-source[index]forindexinrange(len(source)-1)]forhere,value_hereinenumerate(changes):count+=1ifcurrent_total<=0:current_start=herecurrent_total=value_hereelse:current_total+=value_hereifcurrent_total>best_total:best_total=current_totalbest_start=current_startbest_end=here+1returnOutput(best_start,best_end,best_total,count)
So, without using divide-and-conquer we get an even better runtime - it was the data transformation that was the most valuable part in improving the performance.
Comparing the Methods
defrun_thing(thing,inputs,name):print(f"*** {name} ***")start=datetime.now()runtime=thing(inputs).runtimestop=datetime.now()print(f"\tElapsed Time: {stop-start}")returnruntimebrutes=[]divided=[]linear=[]counts=[]UPPER=6forexponentinrange(1,UPPER):count=10**exponenttitle=f"n = {count:,}"underline="="*len(title)print(f"\n{title}")print(underline)inputs=list(range(count))inputs=random.choices(inputs,k=count)brutes.append(run_thing(brute_force_maximum_subarray,inputs,"Brute Force"))divided.append(run_thing(maximum_subarray,inputs,"Divide and Conquer"))linear.append(run_thing(max_subarray_2,inputs,"Linear"))counts.append(count)
n = 10
======
*** Brute Force ***
Elapsed Time: 0:00:00.001054
*** Divide and Conquer ***
Elapsed Time: 0:00:00.000249
*** Linear ***
Elapsed Time: 0:00:00.000127
n = 100
=======
*** Brute Force ***
Elapsed Time: 0:00:00.002403
*** Divide and Conquer ***
Elapsed Time: 0:00:00.002198
*** Linear ***
Elapsed Time: 0:00:00.000175
n = 1,000
=========
*** Brute Force ***
Elapsed Time: 0:00:00.002527
*** Divide and Conquer ***
Elapsed Time: 0:00:00.017359
*** Linear ***
Elapsed Time: 0:00:00.002204
n = 10,000
==========
*** Brute Force ***
Elapsed Time: 0:00:00.071900
*** Divide and Conquer ***
Elapsed Time: 0:00:00.044104
*** Linear ***
Elapsed Time: 0:00:00.000224
n = 100,000
===========
*** Brute Force ***
Elapsed Time: 0:00:07.506323
*** Divide and Conquer ***
Elapsed Time: 0:00:00.023113
*** Linear ***
Elapsed Time: 0:00:00.001233
Plot It
runtimes=pandas.DataFrame({"Brute Force":brutes,"Divide and Conquer":divided,"Linear":linear,"Input Size":counts})melted=runtimes.melt(id_vars=["Input Size"],value_vars=["Brute Force","Divide and Conquer","Linear"],var_name="Algorithm",value_name="Comparisons")chart=altair.Chart(melted).mark_line(point=altair.OverlayMarkDef()).encode(x=altair.X("Input Size",type="quantitative"),y=altair.Y("Comparisons",type="quantitative"),color="Algorithm").properties(title="Comparison Counts",width=PLOT.width,height=525)save_it(chart,"algorithm_comparisons")
Stepping up to a million with Brute Force takes too long (I've never let it run to the end to see how long). Let's see if the linear and divide and conquer can handle it, though.
linear_more=linear[:]divided_more=divided[:]counts_more=counts[:]forexponentinrange(6,10):count=10**exponenttitle=f"n = {count:,}"underline="="*len(title)print(f"\n{title}")print(underline)inputs=list(range(count))inputs=random.choices(inputs,k=count)linear_more.append(run_thing(max_subarray_2,inputs,"Linear"))divided_more.append(run_thing(maximum_subarray,inputs,"Divide and Conquer"))counts_more.append(count)
n = 1,000,000
=============
*** Linear ***
Elapsed Time: 0:00:00.008010
*** Divide and Conquer ***
Elapsed Time: 0:00:00.294771
n = 10,000,000
==============
*** Linear ***
Elapsed Time: 0:00:00.169138
*** Divide and Conquer ***
Elapsed Time: 0:00:02.018406
n = 100,000,000
===============
*** Linear ***
Elapsed Time: 0:00:00.850640
*** Divide and Conquer ***
Elapsed Time: 0:00:21.290975
n = 1,000,000,000
=================
*** Linear ***
Elapsed Time: 0:00:07.666758
*** Divide and Conquer ***
Elapsed Time: 0:03:32.097027
They do pretty well, it seems to be the brute force that dies out.
# pythonfrompprintimportpprintfromqueueimportPriorityQueue# pypifromexpectsimportbe,equal,expect# this projectfrombowling.data_structures.graphs.shortest_pathsimport(Edge,Graph,Vertex,initialize_single_source,relax,)
Dijkstra's Algorithm
defdijkstras_shortest_paths(graph:Graph,source:Vertex)->None:"""Find the shortest paths beginning at the source Args: graph: the vertices and edges to use source: the starting vertex for the paths """initialize_single_source(graph,source)shortest=set()queue=PriorityQueue()forvertexingraph.vertices:queue.put(vertex)whilenotqueue.empty():vertex=queue.get()shortest.add(vertex)foredgeingraph.adjacent[vertex]:relax(edge)return
@dataclass(order=True)classVertex:"""A vertex for the single-source shortest paths problem Args: identifier: something to distinguish the vertex path_estimate: the estimate of the path weight predecessor: the vertex that precedes this one in the path """identifier:str=field(compare=False)path_estimate:float=INFINITEpredecessor:Vertex=field(default=None,compare=False)
classEdge:"""A directed edge Args: source: tail vertex of the edge target: head vertex of the edge weight: the weight of the edge """def__init__(self,source:Vertex,target:Vertex,weight:float)->None:self.source=sourceself.target=targetself.weight=weightreturn
classGraph:"""Directed Graph for shortest path problem"""def__init__(self)->None:self.vertices=set()self.edges=set()self.adjacent=defaultdict(set)return
Add Edge
defadd_edge(self,edge:Edge)->None:"""Add the edge to the graph Args: edge: the directed edge to add """self.edges.add(edge)self.vertices.add(edge.source)self.vertices.add(edge.target)self.adjacent[edge.source].add(edge)return
Initialize Single Source
definitialize_single_source(graph:Graph,source:Vertex)->None:"""Setup the vertices of the graphs for single-source shortest path Args: graph: graph with vertices to set up source: the vertex to use as the start of the shortest paths tree """forvertexingraph.vertices:vertex.path_estimate=INFINITYvertex.predecessor=Nonesource.path_estimate=0return
Relax
defrelax(edge:Edge)->None:"""Check if target Vertex is improved using the source vertex Args: edge: directed edge with source, target, and weight """ifedge.target.path_estimate>edge.source.path_estimate+edge.weight:edge.target.path_estimate=edge.source.path_estimate+edge.weightedge.target.predecessor=edge.sourcereturn
Bellman-Ford
Set Up
# pythonfrompprintimportpprint# pypifromexpectsimportbe,be_true,equal,expect# this projectfrombowling.data_structures.graphs.shortest_pathsimport(Edge,Graph,Vertex,initialize_single_source,relax,)SUCCEEDED,NEGATIVE_WEIGHT_CYCLE=True,False
The Function
defbellman_ford(graph:Graph,source:Vertex)->bool:"""Find the shortest paths using the Bellman-Ford algorithm Args: graph: the graph to process source: the vertex to start the paths from Returns: True if finished, False if there was a negtive-weight cycle in the grahp """initialize_single_source(graph,source)for_inrange(1,len(graph.vertices)):foredgeingraph.edges:relax(edge)foredgeingraph.edges:ifedge.target.path_estimate>edge.source.path_estimate+edge.weight:returnNEGATIVE_WEIGHT_CYCLEreturnSUCCEEDED
classVertex:"""A node in the graph Args: identifier: something to distinguish the node parent: The node pointing to this node rank: The height of the node """def__init__(self,identifier,parent:Vertex=None,rank:int=0)->None:self.identifier=identifierself.parent=parentself.rank=rankreturn@propertydefis_root(self)->bool:"""Checks if this vertex is the root of a tree Returns: true if this is a root """returnself.parentisselfdef__repr__(self)->str:returnf"{self.identifier}"
Edges
classEdge:"""A weighted edge between two nodes Args: node_1: vertex at one end node_2: vertex at the other end weight: weight of the edge """def__init__(self,node_1:Vertex,node_2:Vertex,weight:float)->None:self.node_1=node_1self.node_2=node_2self.weight=weightreturndef__repr__(self)->str:returnf"{self.node_1} --{self.weight}-- {self.node_2}"
A Graph
classGraph:"""An Undirected Graph """def__init__(self)->None:self.vertices=set()self.edges=set()returndefadd_edge(self,node_1:Vertex,node_2:Vertex,weight:float)->None:"""Add the nodes and an edge between them Note: although the graph is undirected, the (node_2, node_1) edge isn't added, it's assumed that the order of the arguments doesn't matter """self.vertices.add(node_1)self.vertices.add(node_2)self.edges.add(Edge(node_1,node_2,weight))return
Disjoint Sets
classDisjoint:"""Methods to treat the tree as a set"""@classmethoddefmake_sets(cls,vertices:set)->None:"""Initializes the vertices as trees in a forest Args: vertices: collection of nodes to set up """forvertexinvertices:cls.make_set(vertex)return@classmethoddefmake_set(cls,vertex:Vertex)->None:"""Initialize the values Args: vertex: node to set up """vertex.parent=vertexvertex.rank=0return@classmethoddeffind_set(cls,vertex:Vertex)->Vertex:"""Find the root of the set that vertex belongs to Args: vertex: member of set to find Returns: root of set that vertex belongs to """ifnotvertex.is_root:vertex.parent=cls.find_set(vertex.parent)returnvertex.parent@classmethoddefunion(cls,vertex_1:Vertex,vertex_2:Vertex)->None:"""merge the trees that the vertices belong to Args: vertex_1: member of first tree vertex_2: member of second tree """cls.link(cls.find_set(vertex_1),cls.find_set(vertex_2))return@classmethoddeflink(cls,root_1:Vertex,root_2:Vertex)->None:"""make lower-ranked tree root a child of higher-ranked Args: root_1: root of a tree root_2: root of a different tree """ifroot_1.rank>root_2.rank:root_2.parent=root_1else:root_1.parent=root_2ifroot_1.rank==root_2.rank:root_2.rank+=1return
Kruskal's Algorithm
defkruskal(graph:Graph)->set:"""Create a Minimum Spanning Tree out of Vertices Args: graph: the graph from which we create the MST Returns: set of edges making up the minimum spanning tree """spanning_tree=set()Disjoint.make_sets(graph.vertices)edges=sorted(graph.edges,key=lambdaedge:edge.weight)foredgeinedges:tree_1=Disjoint.find_set(edge.node_1)tree_2=Disjoint.find_set(edge.node_2)if(tree_1isnottree_2):spanning_tree.add(edge)Disjoint.union(edge.node_1,edge.node_2)returnspanning_tree
{a --4-- b,
a --8-- h,
c --7-- d,
c --2-- i,
c --4-- f,
d --9-- e,
f --2-- g,
g --1-- h}
Note: I originally thought I could check the tree structure, but the disjoint-set methods use Path Compression, so all the nodes end up having the same parent (in this case node "h").
# pypifromexpectsimport(be_a,expect,contain_exactly,contain_only,)# this projectfrombowling.data_structures.graphs.depth_first_searchimport(DepthFirstSearch,DFSVertex,DirectedGraph,)frombowling.data_structures.graphs.transposeimportTranspose
The Graph Transpose
A transpose of a directed graph (\(G^T\)) contains the same nodes and the same edges but the direction of each edge is reversed.
Imports for the Transpose
# pythonfromcollectionsimportdefaultdict# this projectfrombowling.data_structures.graphs.depth_first_searchimport(DepthFirstSearch,DFSVertex,DirectedGraph)
The Transpose
classTranspose:"""Creates the transpose of a graph Args: graph: the directed graph to transpose """def__init__(self,graph:DirectedGraph)->None:self.graph=graphself._adjacent=Noneself._vertices=Nonereturn
The Adjacency Sets
@propertydefadjacent(self)->dict:"""Vertex: set of adjacente vertices dictionary"""ifself._adjacentisNone:self._adjacent=defaultdict(set)forvertex,neighborsinself.graph.adjacent.items():forneighborinneighbors:print(f"Adding neighbor {vertex} to {self.vertices}")self._adjacent[neighbor].add(vertex)returnself._adjacent
The Vertices
@propertydefvertices(self)->set:"""The vertices in the directed graph"""ifself._verticesisNone:self._vertices=self.graph.verticesreturnself._vertices
A Strongly Connected Component of a Directed Graph is a sub-graph where each pair of vertices has an edge in both directions (if a has an edge to b then b has an edge to a).
The Steps to Build Them
Call DFS(G) to compute the finishing times.
Compute \(G^T\)
Call DFS(\(G^T\) ), going through the vertices in decreasing finish time in the main loop.
Output the Depth-First forest created in step 3 as the Strongly Connected Components.
The Builder
classStronglyConnectedComponents:"""Build the strongly connected components sub-trees Args: graph: directed graph to process """def__init__(self,graph:DirectedGraph)->None:self.graph=graphself._transpose=Noneself._forest=Nonereturn
The Transpose
@propertydeftranspose(self)->Transpose:"""The transpose of the original graph"""ifself._transposeisNone:self._transpose=Transpose(self.graph)returnself._transpose
Find the Child
Unlike with a Binary Search Tree, our vertices don't keep track of their child nodes, just the predecessor node so this is a helper to get all the children of a node.
deffind_child(self,vertices:set,predecessor:DFSVertex,path:list)->list:"""Find the child nodes of the given predecessor Note: this is a helper to find the children in a Strongly Connected Component For it to make sense the vertices should have the forest already built Args: vertices: source of nodes predecessor: node in the graph to find the child of path: list to append child nodes to """forvertexinvertices:ifvertex.predecessorispredecessor:path.append(vertex)self.find_child(vertices,vertex,path)returnpath
The Call
@propertydefforest(self)->dict:"""creates the forest with the strongly connected components Returns: adjacency dict for the strongly connected components """ifself._forestisNone:# do the search to get the finish timessearcher=DepthFirstSearch(self.graph)searcher()# change the transpose vertices to be in reversed finish ordervertices=sorted(self.graph.vertices,key=lambdavertex:vertex.finished,reverse=True)self.transpose._vertices=dict(zip(vertices,(None,)*len(vertices)))# do a depth first search on the graph transposesearcher.graph=self.transposesearcher()# at this point the strongly connected components are set up in # self.transpose, but you have to do some figuring to get the trees# get the roots of the trees in the forestroots=(vertexforvertexinself.transpose.verticesifvertex.predecessorisNone)# build a forest for each root by finding its childrenforest=(self.find_child(self.transpose.vertices,root,[root])forrootinroots)# build a new adjacency dictself._forest=dict()# the neighbors are all the original adjacent nodes without the # tree nodesfortreeinforest:neighbors=set()fornodeintree:neighbors=neighbors.union(self.graph.adjacent[node])neighbors=neighbors-set(tree)self._forest[tuple(tree)]=neighborsreturnself._forest
Since the Depth-First Search created a forest of strongly connected components, we can see how many trees are in the forest by finding the roots (the nodes with no predecessor).
forrootinforest:print(root)
abe
cd
fg
h
[autoreload of bowling.data_structures.graphs.transpose failed: Traceback (most recent call last):
File "/home/bravo/.virtualenvs/Bowling-For-Data/site-packages/IPython/extensions/autoreload.py", line 245, in check
superreload(m, reload, self.old_objects)
File "/home/bravo/.virtualenvs/Bowling-For-Data/site-packages/IPython/extensions/autoreload.py", line 394, in superreload
module = reload(module)
File "/usr/lib/pypy3/lib-python/3/imp.py", line 314, in reload
return importlib.reload(module)
File "/usr/lib/pypy3/lib-python/3/importlib/__init__.py", line 169, in reload
_bootstrap._exec(spec, module)
File "<frozen importlib._bootstrap>", line 639, in _exec
File "<builtin>/frozen importlib._bootstrap_external", line 737, in exec_module
File "<builtin>/frozen importlib._bootstrap_external", line 873, in get_code
File "<builtin>/frozen importlib._bootstrap_external", line 804, in source_to_code
File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
File "/home/bravo/Bowling-For-Data/bowling/data_structures/graphs/transpose.py", line 126
self._forest[tuple(tree])] = neighbors
^
SyntaxError: closing parenthesis ']' does not match opening parenthesis '('
]
This differs from the CLRS solution. The ordering depends on the order in which the edges are encountered (in this case it means the order in which the edges are added to the graph) so the sort won't always be the same. The important feature is that no item later in the list needs to come before any that precede it.
# pythonfrom__future__importannotationsfromenumimportEnum# from pypifromattrsimportdefine# this projectsfrombowling.data_structures.graphsimportgraph
DFS Vertex
The Vertices used in the Depth-First Search also keep track of the "times" so it has two more attributes than our original vertex - discovered and finished. Although, as with color and all the other attributes, we could just let the program stick the attributes into the object, I thought it'd be better to create a new class to make it more obvious that there's a difference. I couldn't get the sets to add the vertices when I used inheritance so I'm going to start from scratch instead of inheriting from the original Vertex.
@define(eq=False)classDFSVertex(graph.Vertex):"""The Depth-First Search Vertex Args: identifier: something to identify the node color: the 'discovery' state of the node distance: The number of edges to the root predecessor: The 'parent' of the node in a tree discovered: when the node was discovered finished: when the node's adjacent nodes where checked """discovered:int=Nonefinished:int=Nonedef__str__(self)->str:return(super().__str__()+f", discovered: {self.discovered}, finished: {self.finished}")
defadd(self,source:DFSVertex,target:DFSVertex)->None:"""Add an edge from source to target Args: source: the vertex where the edge starts target: the vertex where the edge ends """self.vertices.add(source)self.vertices.add(target)self.adjacent[source].add(target)return
Depth First Search
The Class
@defineclassDepthFirstSearch:"""depth-first-searcher Args: graph: the graph to process """graph:graph.Graphtime:int=0
The Call
def__call__(self)->None:"""Performs the depth-first-search"""forvertexinself.graph.vertices:vertex.color=graph.Color.WHITEvertex.predecessor=Noneself.time=0forvertexinself.graph.vertices:ifvertex.colorisgraph.Color.WHITE:self.visit_adjacent(vertex)return
Visiting Adjacent Nodes
defvisit_adjacent(self,vertex:DFSVertex)->None:"""Visit the nodes adjacent to the given node Args: vertex: vertex whose adjacency to visit """self.time+=1vertex.discovered=self.timevertex.color=graph.Color.GRAYforneighborinself.graph.adjacent[vertex]:ifneighbor.colorisgraph.Color.WHITE:neighbor.predecessor=vertexself.visit_adjacent(neighbor)vertex.color=graph.Color.BLACKself.time+=1vertex.finished=self.timereturn
Testing It
# pypifromexpectsimportbe,equal,expect# software under testfrombowling.data_structures.graphsimportgraphfrombowling.data_structures.graphs.depth_first_searchimport(DepthFirstSearch,DFSVertex,DirectedGraph)graph_1=DirectedGraph()node_u=DFSVertex(identifier="u")node_v=DFSVertex(identifier="v")node_w=DFSVertex(identifier="w")node_x=DFSVertex(identifier="x")node_y=DFSVertex(identifier="y")node_z=DFSVertex(identifier="z")graph_1.add(node_u,node_v)graph_1.add(node_u,node_x)graph_1.add(node_x,node_v)graph_1.add(node_v,node_y)graph_1.add(node_y,node_x)graph_1.add(node_w,node_y)graph_1.add(node_w,node_z)graph_1.add(node_z,node_z)searcher=DepthFirstSearch(graph=graph_1)searcher()fornodeingraph_1.vertices:expect(node.color).to(be(graph.Color.BLACK))expect(node_u.discovered).to(equal(1))expect(node_u.finished).to(equal(8))expect(node_v.discovered).to(equal(2))expect(node_v.finished).to(equal(7))expect(node_w.discovered).to(equal(9))expect(node_w.finished).to(equal(12))expect(node_x.discovered).to(equal(4))expect(node_x.finished).to(equal(5))expect(node_y.discovered).to(equal(3))expect(node_y.finished).to(equal(6))expect(node_z.discovered).to(equal(10))expect(node_z.finished).to(equal(11))expect(node_u.predecessor).to(be(None))expect(node_v.predecessor).to(be(node_u))expect(node_y.predecessor).to(be(node_v))expect(node_x.predecessor).to(be(node_y))expect(node_z.predecessor).to(be(node_w))expect(node_w.predecessor).to(be(None))
# pythonfrom__future__importannotationsfromqueueimportQueue# pypifromattrsimportdefine# this projectfrombowling.data_structures.graphs.graphimportGraph,Vertexfrombowling.data_structures.graphsimportgraph
Constants
INFINITY=float("inf")
The Class
@defineclassBreadthFirstSearch:"""Creates a shortest-path tree from a source node to all reachable nodes Note: The vertices should be BFSVertex instances Args: graph: the graph with the adjacency dict for all the vertices """graph:Graph
The Call
def__call__(self,source:BFSVertex)->None:"""Does the breadth first search to create the shortest paths tree"""# reset all the search attributesforvertexinset(self.graph.adjacent)-{source}:vertex.color,vertex.distance,vertex.predecessor=(graph.Color.WHITE,INFINITY,None)source.color,source.distance,source.predecessor=(graph.Color.GRAY,0,None)queue=Queue()queue.put(source)whilenotqueue.empty():predecessor=queue.get()forvertexinself.graph.adjacent[predecessor]:ifvertex.colorisgraph.Color.WHITE:vertex.color,vertex.distance,vertex.predecessor=(graph.Color.GRAY,predecessor.distance+1,predecessor)queue.put(vertex)predecessor.color=graph.Color.BLACKreturn
Testing
# pypifromcollectionsimportdefaultdictfromexpectsimportbe,be_none,equal,expectfrompathlibimportPathimportnetworkx# software under testfrombowling.data_structures.graphsimportgraphfrombowling.data_structures.graphs.breadth_first_searchimport(BreadthFirstSearch)importbowling.data_structures.graphs.breadth_first_searchasbfstest=graph.Graph()source=BFSVertex(identifier="s")v1=BFSVertex(identifier=1)v2=BFSVertex(identifier=2)v3=BFSVertex(identifier=3)test.add(v1,v2)test.add(v2,source)test.add(v3,source)search=BreadthFirstSearch(test)search(source)fornodein(source,v1,v2,v3):expect(node.color).to(be(graph.Color.BLACK))expect(source.predecessor).to(be_none)expect(source.distance).to(equal(0))expect(v3.predecessor).to(be(source))expect(v3.distance).to(be(1))expect(v2.predecessor).to(be(source))expect(v2.distance).to(be(1))expect(v1.predecessor).to(be(v2))expect(v1.distance).to(be(2))
CLRS Example
We'll start with the same Graph from the previous example and add more nodes.