# Distributed Machine Learning: Parallel Implementation of PageRank Algorithm (PySpark)

## 1. Two serial iterative solution algorithms for PageRank

we are blogging "Numerical Analysis: Power Iteration and PageRank Algorithm (Numpy Implementation)" The algorithm mentioned the use of the power method to solve PageRank.
Given a directed graph We can write its Markov probability transition matrix $$M$$ (the $$i$$th column corresponds to the neighbors of the $$i$$ node and is normalized along the column)

$\left(\begin{array}{lll} 0 & 0 & 1 \\ \frac{1}{2} & 0 & 0 \\ \frac{1}{2} & 1 & 0 \end{array}\right)$

Then we define the Google matrix as

$G=\frac{q}{n} E+(1-q) M$

Here $$q$$ is the probability that the surfer moves from one page to another random page (usually 0.15), $$1-q$$ is the probability of clicking the link on the current page, and $$E$$ is the element $$n\times n$$ matrix with all 1s ( $$n$$ is the number of nodes).

The PageRank algorithm can be regarded as solving the eigenvector corresponding to the dominant eigenvalue of the Google matrix (for a random matrix, that is, 1). Let the initialization Rank vector be $$x$$ ( $$x_i$$ is the Rank value of the page$$i$$), then we can use the power method to solve:

$x_{t+1}=G x_{t}$

(normalize after each iteration)

Most of the graphs in real scenarios are sparse graphs, that is, $$M$$ is a sparse matrix. Calculate $$(1-q)Mx_t$$ in the power method, and use reduceByKey() (the key is the node number) operation for the node $$i$$ . Calculating $$\frac{q}{n}{E}x_t$$ requires a reduce() operation on the Rank of all nodes, which is quite complicated.

PageRank also has a solution algorithm (the name is called "iterative algorithm"), and its iterative form is as follows:

$x_{t+1} = \frac{q}{n}\bm{1} + (1-q)Mx_t$

It can be seen that this iterative method avoids the calculation $$\frac{q}{n}Ex_t$$, and the communication overhead is smaller. We will use this iterative form next.

## 2. Two methods of graph division

At present, the main idea of ​​parallelizing graph algorithms is to divide a large graph into multiple sub-graphs, and then distribute these sub-graphs to different machines for parallel computing, and perform cross-machine communication synchronization calculations when necessary to obtain results. Academia and industry have proposed a variety of division methods for dividing large graphs into subgraphs, including two main ones, Edge Cut and Vertex Cut.

### 2.1 Edge Division

As shown in the figure below, edge division is to segment some edges in the graph. Specifically in the Pregel graph computing framework, each partition contains some nodes and the outgoing edges of the nodes; in the GraphLab graph computing framework, each partition contains some nodes, the outgoing and incoming edges of the nodes, and these Node's neighbor nodes. The advantage of edge division is that it can retain the neighbor information of the node, but the disadvantage is that it is prone to unbalanced division. For example, for a node with a high degree, its associated edges are divided into one partition, resulting in few edges in other partitions. In addition, as shown in the rightmost figure in the figure below, edge partitioning may have edge redundancy. ### 2.2 Point division

As shown in the figure below, point division is to segment some points in the graph to obtain multiple graph partitions, each of which contains a part of edges and nodes associated with the edges. Specifically, PowerGraph, GraphX and other frameworks use point division, and the divided nodes exist in multiple partitions. The advantages and disadvantages of point division are opposite to those of edge division. Edges can be distributed evenly among different machines, but the neighbor relationship of nodes is not preserved. In summary, edge partitioning distributes nodes across machines (potentially unbalanced partitioning), while point partitioning distributes edges across machines (more balanced partitioning). Next, the algorithm we use is a Pregel-like division method, using edge division. Our algorithm below is a simplified version that does not deal with the problem of dangling nodes.

## 3. Parallelization of Iterative Algorithms

We initialize the Rank vector with a uniform distribution (it can also be initialized with all 1s, but it is no longer presented in the form of a probability distribution), and set the number of partitions to 3. The overall iterative process of the algorithm can be expressed as follows: Note that in the flatMap() step in the graph, the node $$i$$ calculates its contribution: $$(x_t)_i/|\mathcal{N}_i|$$, and sends the contribution to the neighbor set Every node in $$\mathcal{N}_i$$. After that, the contribution received by all nodes is reduced by reduceByKey() (node ​​number is key) to get the vector $$\hat{x}$$, and the corresponding relationship between $$Mx_t$$ in the serial algorithm is shown in the following figure Show: And according to the formula $$x_{t+1} = \frac{q}{n} + (1-q)\hat{x}$$ to calculate the Rank vector of the node. Then proceed to the next round of iterative process.

## 4. Programming implementation

Use PySpark to implement parallel programming of PageRank. The code is as follows:

import re
import sys
from typing import Iterable, Tuple

from pyspark.resultiterable import ResultIterable
from pyspark.sql import SparkSession

n_slices = 3  # Number of Slices
n_iterations = 10  # Number of iterations
q = 0.15 #the default value of q is 0.15

def computeContribs(neighbors: ResultIterable[int], rank: float) -> Iterable[Tuple[int, float]]:
# Calculates the contribution(rank/num_neighbors) of each vertex, and send it to its neighbours.
num_neighbors = len(neighbors)
for vertex in neighbors:
yield (vertex, rank / num_neighbors)

if __name__ == "__main__":
# Initialize the spark context.
spark = SparkSession\
.builder\
.appName("PythonPageRank")\
.getOrCreate()

[(1, 2), (1, 3), (2, 3), (3, 1)],
n_slices
)

# count the number of vertexes

# init the rank of each vertex, the default is 1.0/n_vertexes
ranks = adj_list.map(lambda vertex_neighbors: (vertex_neighbors, 1.0/n_vertexes))

# Calculates and updates vertex ranks continuously using PageRank algorithm.
for t in range(n_iterations):
# Calculates the contribution(rank/num_neighbors) of each vertex, and send it to its neighbours.
vertex_neighbors_rank, vertex_neighbors_rank  # type: ignore[arg-type]
))

# Re-calculates rank of each vertex based on the contributions it received
ranks = contribs.reduceByKey(add).mapValues(lambda rank: q/n_vertexes + (1 - q)*rank)

# Collects all ranks of vertexs and dump them to console.
for (vertex, rank) in ranks.collect():
print("%s has rank: %s." % (vertex, rank))

spark.stop()


The results are as follows:

1 has rank: 0.38891305880091237.
2 has rank: 0.214416470596171.
3 has rank: 0.3966704706029163.


This Rank vector is approximately equal to the Rank vector $$R=(0.38779177,0.21480614,0.39740209)^{T}$$ obtained by using the serial power method, indicating that our parallelization algorithm is running correctly.

## refer to

•  Malewicz G, Austern M H, Bik A J C, et al. Pregel: a system for large-scale graph processing[C]//Proceedings of the 2010 ACM SIGMOD International Conference on Management of data. 2010: 135-146.

•  Low Y, Gonzalez J, Kyrola A, et al. Distributed graphlab: A framework for machine learning in the cloud[J]. arXiv preprint arXiv:1204.6078, 2012.

•  Gonzalez J E, Low Y, Gu H, et al. {PowerGraph}: Distributed {Graph-Parallel} Computation on Natural Graphs[C]//10th USENIX symposium on operating systems design and implementation (OSDI 12). 2012: 17-30.

•  Xu Lijie, Fang Yafen. Design and Implementation of Big Data Processing Framework Apache Spark [M]. Electronic Industry Press, 2021.

Tags: Spark

Posted by sethadam1 on Fri, 03 Jun 2022 18:37:44 +0530