TransWikia.com

How to speed up Minimum Spanning Tree algorithm?

Mathematica Asked by crobartie on July 25, 2021

How to speed up MST algorithm? I work with graphs and I have adopted Prim’s algorithm for my needs, from here.

The graphs which I use are very dense (number of vertexes: $V=500$ and every vertex is connected with other $N-$1 vertexes).
For such graph my computer (i5 [email protected]) needs about 40 seconds to compute MST (Prim).

I read this: Performance Tuning in Mathematica

What is the best way to optimize this code?

  1. Compile? I tried this but I can’t force it to work…
  2. Maybe change some procedural parts of code to functional? If it possible, how?
    For me, Prim, seems to be procedural in its roots.
  3. Or should I change algorithm to Kruskal or Boruvka?

For now, I successfully use Parallelize function, so I can compute many different graphs in parallel on 4 cores. But how make it faster when computing each single graph?

This is my code (adopted for my needs;):

getMstPrim[metricMatrix_] :=
 Module[{groupOne, groupTwo, kk, paths, current, rule = {}, n},
  n = Length[metricMatrix[[2]]];
  groupOne = {RandomInteger[{1, n}]} ;                               (* 
  initialaze from any points, groupOne is inside points of the tree *)
  groupTwo = Complement[Range[1, n], groupOne];           (* 
  groupTwo is outside points of the tree *)
  For[kk = 1, kk < n, kk++,
   paths = 
    Table[{metricMatrix[[i, j]], {i, j}}, {i, groupOne}, {j, 
      groupTwo}]; (* 
   current possible connections between group one and two *)
   current = Last[First[Sort[Flatten[paths, 1]]]];   (* 
   sort for shortest weight*)
   groupOne = Union[groupOne, current];                              (* 
   add the current path to goupe one *)
   groupTwo = Complement[groupTwo, current];                   (* 
   remove the current path from goupe two *)
   rule = 
    Append[rule, 
     current]                                                   (* 
   store the paths for final use *)
   ];
  rule
  ]  

metricMatrix is list of lists (~500 x 100).

One Answer

Here we are given a set of points and start by computing distances between them; those will be our edge weights. If you already have edge weights then this preprocessing step is not needed. It is by far the bottleneck in the code below; the rest is a fraction of a second on 1000 vertices. Also I did not try to optimize the preprocessing (let alone the non-bottleneck parts) for speed e.g. using packable arrays and Compile.

So here is what might be a correct implementation of Prim's algorithm.

--- edit ---

I am replacing what I had with a corrected version, based on findings by the original poster. It uses a binary heap so I need code for that as well.

It will return the total length of the spanning tree as well as the list of edges. As before it takes a list of points in the plane, and the graph is by assumption the complete graph with vertices being the points and edge weights being the distance between the points. One can modify for other graphs without too much trouble.

I also coded in a slightly (more than usual) awkward way. The intent is to make it less difficult to use Compile. I do not have time to pursue that myself. As it stands, this seems to be around 4x slower than the Kruskal implementation I showed. At least the asymptotics look about right.

heapbottompointer = 1;
heaptoppointer = 2;
makeHeap[len_, elemsize_] := ConstantArray[0., {len + 1, elemsize}]

Clear[addToHeap, removeFromHeap]

SetAttributes[addToHeap, HoldFirst];
addToHeap[heap_, elem : {_Real, __}] := Module[
  {j1, j2},
  heap[[heapbottompointer, 1]] += 1;
  j1 = heap[[heapbottompointer, 1]];
  heap[[j1 + 1]] = elem;
  While[(j2 = Floor[j1/2]) >= 1 && 
    heap[[j2 + 1, 1]] > heap[[j1 + 1, 1]],
   heap[[{j1 + 1, j2 + 1}]] = heap[[{j2 + 1, j1 + 1}]];
   j1 = j2;
   ];
  ]

SetAttributes[removeFromHeap, HoldFirst];
removeFromHeap[heap_] := Module[
  {prev = 2, j1 = 2, j2 = 3, top = heap[[heaptoppointer]], last, next},
  last = heap[[heapbottompointer, 1]];
  While[j1 <= last,
   If[j2 <= last,
    next = If[heap[[j1 + 1, 1]] <= heap[[j2 + 1, 1]], j1, j2],
    next = j1
    ];
   heap[[prev]] = heap[[next + 1]];
   prev = next + 1;
   {j1, j2} = 2*next + {0, 1};
   ];
  heap[[heapbottompointer, 1]] -= 1;
  heap[[prev]] = heap[[last + 1]];
  j1 = prev - 1;
  While[(j2 = Floor[j1/2]) >= 1 && 
    heap[[j2 + 1, 1]] > heap[[j1 + 1, 1]],
   heap[[{j1 + 1, j2 + 1}]] = heap[[{j2 + 1, j1 + 1}]];
   j1 = j2;
   ];
  top
  ]

Prim[pts_] := Module[
  {edges, n = Length[pts], vert, heap, nedges = 1, tdist = 0., 
   treelist, top, used, verts, addedges},
  edges = Table[EuclideanDistance[pts[[i]], pts[[j]]], {i, n}, {j, n}];
  used = ConstantArray[False, n];
  used[[1]] = True;
  heap = makeHeap[n^2, 3];
  Do[
   addToHeap[heap, {edges[[1, j]], 1., N[j]}], {j, 2, n}];
  treelist = ConstantArray[{0, 0}, n - 1];
  While[heap[[1, 1]] >= 1 && nedges <= n - 1,
   top = removeFromHeap[heap];
   verts = Round[Rest[top]];
   If[used[[verts[[1]]]] && ! used[[verts[[2]]]] || ! 
       used[[verts[[1]]]] && used[[verts[[2]]]],
    If[! used[[verts[[1]]]] && used[[verts[[2]]]], 
     verts = Reverse[verts]];
    used[[verts[[2]]]] = True;
    tdist += top[[1]];
    treelist[[nedges]] = verts;
    nedges++;
    addedges = edges[[verts[[2]]]];
    Do[If[! used[[j]], 
      addToHeap[heap, {addedges[[j]], N[verts[[2]]], N[j]}]], {j, 
      n}];
    ]
   ];
  {tdist, treelist}
  ]

Example:

n = 1000;
pts = RandomReal[{-10, 10}, {n, 2}];

Timing[tree = Prim[pts];]

(* {20.64, Null} *)

--- end edit ---

Alternatively, use Kruskal's algorithm. Same preprocessing as above is used here. Again, I beleive this is a correct implementation but caveat emptor.

Kruskal[pts_] := Module[
  {n = Length[pts], vpairs, jj = 0, hh, pair, dist, c1, c2, c1c2}, 
  Do[hh[k] = {k}, {k, n}];
  vpairs = 
   Sort[Flatten[
     Table[{(pts[[k]] - pts[[l]]).(pts[[k]] - pts[[l]]), {k, l}}, {k, 
       1, n - 1}, {l, k + 1, n}], 1]];
  First[Last[Reap[While[jj < Length[vpairs], jj++;
      {dist, pair} = vpairs[[jj]];
      {c1, c2} = {hh[pair[[1]]], hh[pair[[2]]]};
      If[c1 =!= c2, Sow[Apply[Rule, vpairs[[jj, 2]]]];
       c1c2 = Union[c1, c2];
       Do[hh[c1c2[[k]]] = c1c2, {k, Length[c1c2]}];
       If[Length[hh[pair[[1]]]] == n, Break[]];];]]]]]

Timing[krus = Kruskal[pts];]

(* Out[65]= {5.090000, Null} *)

--- edit 2021-05-16 ---

A proper implementation of the "lazy" version of Prim's algorithm will use a priority queue. Which is what I used above. The "eager" version, which tends to be faster, needs an indexed priority queue. A proper version of Kruskal's algorithm (which I did not quite achieve above) will use a data structure called a merge-find set. I have reference implementations of these data structures, along with minimum spanning tree examples, in the Wolfram Function Repository.

PriorityQueue

IndexedQueue

MergeFindSet

None of these suffer from being blindingly fast. The merge-find set implementation of Kruskal's algorithm indeed seems to be slightly slower than the more naive implementation in this note. I do not know offhand what might be the bottleneck(s). Testing, as those the function pages will show, indicates at least that the asymptotics are as expected.

--- end edit ---

Correct answer by Daniel Lichtblau on July 25, 2021

Add your own answers!

Ask a Question

Get help from others!

© 2024 TransWikia.com. All rights reserved. Sites we Love: PCI Database, UKBizDB, Menu Kuliner, Sharing RPP