Author Archives: Yecol

GraphLab:新的面向机器学习的并行框架

GraphLab是一种新的面向机器学习的并行框架。

1.1 GraphLab简介

在海量数据盛行的今天,大规模并行计算已经随处可见,尤其是MapReduce框架的出现,促进了并行计算在互联网海量数据处理中的广泛应用。而针对海量数据的机器学习对并行计算的性能、开发复杂度等提出了新的挑战。

机器学习的算法具有下面两个特点:数据依赖性强,运算过程各个机器之间要进行频繁的数据交换;流处理复杂,整个处理过程需要多次迭代,数据的处理条件分支多。

而MapReduce是典型的SIMD模型,Map阶段集群的各台机器各自完成负载较重的计算过程,数据并行度高,适合完成类似矩阵运算、数据统计等数据独立性强的计算,而对于机器学习类算法并行性能不高。

另一个并行实现方案就是采用纯MPI(Native MPI)的方式。纯MPI实现通过精细的设计将并行任务按照MPI协议分配到集群机器上,并根据具体应用,在计算过程中进行机器间的数据通信和同步。纯MPI的优点是,可以针对具体的应用,进行深度优化,从而达到很高的并行性能。但纯MPI存在的问题是,针对不同的机器学习算法,需要重写其数据分配、通信等实现细节,代码重用率低,机器拓展性能差,对编程开发人员的要求高,而且优化和调试成本高。因而,纯MPI不适合敏捷的互联网应用。

为解决机器学习的流处理,Google提出了Pregel框架,Pregel是严格的BSP模型,采用“计算-通信-同步”的模式完成机器学习的数据同步和算法迭代。Goolge曾称其80%的程序使用MapReduce完成,20%的程序使用Pregel实现。因而,Pregel是很成熟的机器学习流处理框架,但Google一直没有将Pregel的具体实现开源,外界对Pregel的模仿实现在性能和稳定性方面都未能达到工业级应用的标准。

2010年,CMU的Select实验室提出了GraphLab框架,GraphLab是面向机器学习的流处理并行框架[1]。同年, GraphLab基于最初的并行概念实现了1.0版本,在机器学习的流处理并行性能方面得到很大的提升,并引起业界的广泛关注,在2012年GraphLab升级到2.1版本,进一步优化了其并行模型,尤其对自然图的并行性能得到显著改进。

在本章的余下章节,将详细介绍GraphLab的并行框架和具体的源码实现。

1.2 GraphLab并行框架

GraphLab将数据抽象成Graph结构,将算法的执行过程抽象成Gather、Apply、Scatter三个步骤。其并行的核心思想是对顶点的切分,以下面的例子作为一个说明。

1. Graph对并行思想

示例中,需要完成对V0邻接顶点的求和计算,串行实现中,V0对其所有的邻接点进行遍历,累加求和。而GraphLab中,将顶点V0进行切分,将V0的边关系以及对应的邻接点部署在两台处理器上,各台机器上并行进行部分求和运算,然后通过master顶点和mirror顶点的通信完成最终的计算。

1.2.1 数据模型:GRAPH

顶点是其最小并行粒度和通信粒度,边是机器学习算法中数据依赖性的表现方式。

对于某个顶点,其被部署到多台机器,一台机器作为master顶点,其余机器上作为mirror。Master作为所有mirror的管理者,负责给mirror安排具体计算任务;mirror作为该顶点在各台机器上的代理执行者,与master数据的保持同步。

对于某条边,GraphLab将其唯一部署在某一台机器上,而对边关联的顶点进行多份存储,解了边数据量大的问题。

同一台机器上的所有edge和vertex构成local graph,在每台机器上,存在本地id到全局id的映射表。vertex是一个进程上所有线程共享的,在并行计算过程中,各个线程分摊进程中所有顶点的gather->apply->scatter操作。

下面这个例子说明,GraphLab是怎么构建Graph的。

图2 Graph的构建形式

1.2.2 执行模型:GATHER-APPLY-SCATTER

每个顶点每一轮迭代经过gather->apple->scatter三个阶段。

1)       Gather阶段

工作顶点的边 (可能是所有边,也有可能是入边或者出边)从领接顶点和自身收集数据,记为gather_data_i,各个边的数据graphlab会求和,记为sum_data。这一阶段对工作顶点、边都是只读的。

2)       Apply阶段

Mirror将gather计算的结果sum_data发送给master顶点,master进行汇总为total。Master利用total和上一步的顶点数据,按照业务需求进行进一步的计算,然后更新master的顶点数据,并同步mirror。Apply阶段中,工作顶点可修改,边不可修改。

3)       Scatter阶段

工作顶点更新完成之后,更新边上的数据,并通知对其有依赖的邻结顶点更新状态。这scatter过程中,工作顶点只读,边上数据可写。

在执行模型中,graphlab通过控制三个阶段的读写权限来达到互斥的目的。在gather阶段只读,apply对顶点只写,scatter对边只写。并行计算的同步通过master和mirror来实现,mirror相当于每个顶点对外的一个接口人,将复杂的数据通信抽象成顶点的行为。

下面这个例子说明GraphLab的执行模型:

                                                                           图3. Gather-Apply-Scatter

1.3 GraphLab的源码实现

Graphlab的实现可以分为四层:基础组件层,抽象层,引擎层,应用层。

 

4. GraphLab源码结构

1.3.1 基础组件层

提供Graphlab数据传输、多线程管理等基础并行结构的组件模块,下面将主要介绍其通信、数据序列化、数据交换、多线程管理四个功能模块。

1)       通信(dc_tcp_comm.cpp)

Graphlab基于TCP协议的长连接在机器之间进行数据通信。在Graphlab初始化阶段,所有机器建立连接,将socket数据存储在std::vector<socket_info> sock 结构中。

Graphlab使用单独的线程来接收和发送数据,其中接收或发送都可以配置多个线程,默认每个线程中负责与64台机器进行通信。在接收连接中,tcp_comm基于libevent采用epoll的方式获取连接到达的通知,效率高。

需要补充的是,Graphlab在数据通信中,并没有采用MPI的接口,但在源码中封装了MPI_tools,其用途是在distributed_control::init时,获取系统参数(包括机器IP和端口)提供两种方式,一种是系统配置中初始化,一种是通过MPI接口实现(dc_init_from_mpi::init_param_from_mpi)。

2)       数据序列化(oarchive & iarchive)

Oarchive通过重载操作符>>将对象序列化后写入ostream中,在Graphlab中对于POD( Plain Old Data)和非POD数据区分对待, POD类型的数据直接转为为char*写入ostream, 而非POD数据需要用户实现save方法,否则将抛出异常。iarchive的过程与oarchive的过程相反。

所有通过rpc传输的数据都通过oarchive和iarchive转化为stream,比如vertex_program, vertex_data。

 

图5. 数据序列化

3)       数据传输流(buffered_stream_send2.cpp)

Oarchive,iarchive是数据序列化的工具, 在实际的传输过程中,数据并没有立即发送出去,而是缓存在buffered_stream_send。

4)       Pthread_tools:

Thread类封装了lpthread的方法

提供thread_group管理线程队列

封装了锁、信号量、条件变量等同步方法。

1.3.2 抽象层

1)      dc_dist_object是GraphLab对所有分布式对象的一个抽象,其目标是将分布式处理的数据对象对用户抽象成普通对象,以希望在使用的时候不需要关心其分布式细节。

2)      buffer_exchange是基于dc_dist_object对需要在顶点间交换的数据提供一个容器。

3)      distribute_controller是基于dc_dist_object实现的一个整个分布式系统的控制器,提供了机器数据、顶点关系等全局信息。

1.3.3引擎层

1.3.3.1同步引擎

                         图6. 同步引擎

1) Excange message阶段,master接受来⾃自mirror的消息;

2) Receive Message阶段,master接收上一轮Scatter发送的消息和mirror发送的消息,将有message的master激活, 对于激活的顶点,master通知mirror激活,并将vectex_program同步到mirrors;

3) Gather阶段,多线程并行gather, 谁先完成,多线程并行localgraph中的顶点,mirror将gather的结果到master;

4) Apply阶段,master执行apply(apply()),并将apply的结果同步到mirror (sync_vertex_data()).

5)Scatter阶段,master和mirror基于新的顶点数据,更新边上数据,并以signal的形式通知相邻顶点。

下面这个例子形象地说明了同步引擎的工作过程:

 

图7. 顶点2的GraphLab执行过程

1.3.3.2异步引擎

8. mastermirror状态转移过程

异步引擎中,每个顶点是消息驱动的状态机。

1) 在每一轮执行开始时,Master从全局的调度器(Sceduler)获取消息,获取消息后,master获得锁,并进入Locking状态。同时,master通知mirror获取锁,进入Locking状态。

2) master和mirror分别进行Gathering操作,mirror将gathering结果汇报给master,由master完成汇总。

3) master完成applying之后,将结果同步到mirror上。

4) master和mirror独立的执行scattering,执行完成之后释放锁进入None状态,等待新的任务到来。

5) mirror在scattering状态时,可能再次接收到来自master的locking请求,这种情况下,mirror在完成scattering之后将不会释放锁,而直接进入下一轮任务中。

【转载】http://androidfuture.com/blog/?p=215

Graphlab实现分析:图的存储一

前一段时间参与了一个迭代计算平台的开发,对于内存计算和图计算产生了比较浓厚的兴趣,这期间也阅读了spark和pregel的相关论文,了解一下BSP模型,但总觉得看论文太抽象了,于是选择阅读graphlab源码,作为深入了解图计算的一个契机。接下去如果有时间的话,会详细记录下我对graphlab的一些肤浅的理解。

——————————————————————————————-

在graphlab中,采用邻接矩阵来表示顶点之间的相邻关系,给定一个图G(V, E),使用一个一维数组存储V的顶点信息,使用一个稀疏矩阵来存储E的边信息。

在graphlab中,图是分布在多个机器之上,每个机器中存储着图的一部分,在这里我们讨论graphlab中,每个节点是如何实现图的本地存储。

在graphlab的图相关接口中有两个接口,分别是获取顶点的in edges和out edges。那么在graphlab中需要考虑如何有效地存储一个图的边集合,并可以快速地对顶点的in edges和out edges进行快速索引,并尽可能地减少空间开销。

Graphlab中采用的思路是同时采用稀疏矩阵的csr(compressed sparse row)和csc(compressed sparse column)存储格式来存储图的边集合,并高效地实现获取顶点的in edges和out edges的接口。

Graphlab分别实现了图的静态存储和动态存储,静态存储是指一旦完成对图的顶点和边的存储之后,不会添加新的顶点和边。而动态存储,可以动态地往图中新增顶点和边,这两者都没有删除顶点和边的操作。静态存储和动态存储的思路都是同时采用稀疏矩阵的csr和csc格式来存储边集合,不过csr和csr采用的数据结构不一样,静态存储采用数组实现,动态存储采用链表实现。在本篇博客中,只对静态存储进行介绍,动态存储会在下一篇博客中进行介绍。

本篇博客首先会介绍一下稀疏矩阵的csr和csc格式以及计数排序,然后会举一个实际的例子来分析graphlab图的静态存储,最后介绍一下graphlab实现图静态存储的相关类。

1 稀疏矩阵csr和csc格式和计数排序简介

1.1 csr和csc格式介绍

csr是使用三个数组来表示一个稀疏矩阵,稀疏矩阵用A表示,三个数组分别是values、rowptrs和columns;values中按行顺序存储着A中的非零单元的值。Columns中存储着values数组中的单元的列索引,values(k) = A(i, j),则columns[k] = j。Rowptrs中存储着行在values中的起始地址,如果values(k) = A(i, j),则rowptrs(i) <= k <rowptrs(i + 1),行i中的非零单元的数目为rowptrs(i + 1) – rowptrs(i)。

比如稀疏矩阵A = clip_image002

假设下标都从0开始,那么行是{0,1,2},列也是{0,1,2};稀疏矩阵A的csr格式就可以用如下三个数组表示:

image

csc格式类似于,只不过是把行换成了列,csc可以用values,columnptrs和rows表示矩阵A。values中按列顺序存储着A中的非零值;rows中存储着values数组中单元的行索引,values(k) = A(i, j),则rows(k) = i;columnptrs中存储着列在values中的起始地址,values(k) = A(i,j),则columns(j) <= k < columns(j + 1),j列的非零单元数目为columns(j + 1) – columns(j)。

image

关于csr的详细描述见:http://web.eecs.utk.edu/~dongarra/etemplates/node373.html

1.2 计数排序

计数排序的思路如下:假设n个输入元素中的每一个都是介于0-k的整数,此处k为某个整数。对每一个输入元素x,统计小于x的数目s,那么可以通过s来确定x在最终输出数组中的位置。

在graphlab中,计数排序的输入是一个未经排序的原始数组A;输出是两个数组,分别是P和I;P数组长度等于原始数组的长度,是按从小到大对原始数组进行排序后生成的序列数组,P[i]表示排序后的第i个值在原始数组中的下标;I数组表示值为i的整型在排序后的数组中的起始位置,I数组的长度为max{A[i]} + 1(+1的原因是从0开始计数)。

Graphlab中计数排序算法的伪码:

clip_image008

比如给定一个原始数组A,数组长度为7,数组中存储着整型值(可能有重复),如下图所示:

clip_image010

运行结果:

在counting_sort函数中12-13行的循环运行完后,原始数组(A)和统计数组(c)如下所示:

clip_image012

c[i]存储着在A中,值小于等于i的元素数目。

第15-16的运行步骤如下,总共有:

clip_image014

最后P数组存储着排序后的数值在原数组中的下标。c数组中的每个单元c[i]中则存储着在A数组中,值小于i的元素数目。i在A中的数目等于:c[i + 1] – c[i],i < k或n – c[i] ,i == k;c[i]表示i值在P数组出现的第一个值的下标。

最终I数组的结果等于stem 6中的c:

clip_image016

这三个数组之间的关系如下:

image

给定一个值2,那么2在A中的数目为:I[3] – I[2] = 1;2在A中的位置为A[P[I[2]] ] = A[1]。

2 使用csr和csc存储图

我们可以将边集合表示为一个邻接矩阵,使用稀疏矩阵的csr和csc格式来存储邻接矩阵。

因为稀疏矩阵的csr存储格式是对row进行压缩,可以根据row来快速对稀疏矩阵的某一行进行检索,所以使用csr来对out_edges进行检索(边(v,w)是顶点v的out edges,顶点v对于边(v,w)相当于行)。同理,稀疏矩阵的csc存储格式是对column进行压缩,可以根据column来快速对稀疏矩阵的某一列进行检索,所以使用csc对in_edges进行检索。

我们先单独分别从csr和csc角度考虑边集合的存储。然后再分析graphlab是如何同时使用csr和csc巧妙地实现对边集合进行存储,并实现对顶点的in edges和out edges快速检索。

2.1 CSR格式存储

image

如上图所示,给定以一个有向图G(V,E),V为顶点集合,E为边集合。一条边包括顶点对(边从source vertex指向targe vertex)和值,边集合可以表示成如下的邻近矩阵,对于边(v,w),将v作为行,w作为列(source vertex对应行,target vertex对应列)。

image

假设E中边的输入顺序如下所示:

image

那么我们就可以用如下三个数组来表示输入的边集合E:

image

那么如何将输入的E转化为按照csr格式存储的稀疏矩阵呢?

1. 将source vertex数组作为输入数组,使用1.2张中的counting_sort进行排序,输出的数组为P和I。因为source vertex相当于邻接矩阵的行,这一步骤等同于将稀疏矩阵的非零单元按照行顺序存储在一个数组中(这里不需要考虑同一行内的各个边的顺序)。那么P是按行的从小到大顺序对原始数组进行排序后生成的序列数组;I等于csr中的rowptrs;

2. 使用P对输入边集合E的target vertex数组和value数组按照行大小进行重新排序,那么排序后的target vertex数组就是csr中的columns,value数组就是csr的values。这里的排序可以使用不同的方式实现,最简单的方法就是引入一个临时数组,按照P数组中的下标对target vertex和value进行排序。

counting_sort具体过程见1.2章(1.2张的例子就是本例),最终E的CSR格式如下图所示。

image

1.edges_values数组:是按行顺序进行排序后边集合的值数组。

2.rowptrs数组:保存行在edges_values中的起始偏移地址, rowptrs[i]是第i行在edges_values中的起始偏移位置;那么第i行的边数目等于rowptrs[i + 1] –rowptrs[i]或edges_values长度 – rowptrs[i ];rowptrs数组的长度为顶点的最大值。

3.columns数组:列索引,columns[i]是edges_values[i]值对应的边的列的值。如edges_values[2]的列为columns[2],等于3。

那么用csr存储的边集合E,给定一个顶点v,可以快速检索v的所有out edges的值。v的值相当于行,那么v的所有out edges的值可以通过如下的方式获取:

clip_image028

拿上面的例子,顶点1的out edges的数目为rowptrs[2] – rowptrs[1] = 2,那么可以得到顶点1的两个out edges在edges_values数组的下标分别为1和2,那么out edges集合为{edges_values[1], edges_values[2]} = {(1,2), (1, 3)}。

2.2 CSC格式存储

使用csc来存储边集合E的边关系和值,与csr基本相同。首先将target vertex数组作为输入数组进行counting_sort,得到P和I,I为csc的columnptrs。使用P对E的source vertex数组和value数组进行排序,生成了csc的rows和values。E以csc格式存储的最终结果如下所示。

image

1.edges_values数组:是按列顺序进行排序后边集合的值数组。

2.columnptrs数组:保存列在edges_values中的起始偏移地址,columnptrs[i]是第i列在edges_values中的起始偏移位置;

3.rows数组:列索引,rows[i]是edges_values[i]值对应的边的列的值。

通过csc获取一个顶点的in edges类似于在csr中获取out edges,不在赘述。

2.3 Graphlab图的静态存储

Graphlab对图的静态存储是同时采用了csr和csc格式。在graphlab中,会首先对边集合按照csr方式进行存储(通过对source vertex进行counting_sort),然后再建立csc格式,通过shuffle方式,在csc和csr之间进行转换。把csr和csc整合到一起,同时实现对顶点的out edges和in edges的快速索引。如下图所示。

image

edges_value同CSR中的rowptrs。

rowptrs同CSR中的rowptrs。

columns同CSR中的columns。

shuffleptrs这个数组用于将按列顺序排列的稀疏矩阵转换为按行顺序排列的稀疏矩阵。Shuffleptrs[i]表示按列顺序排序的边集合的第i条边在edges_value数组中的下标。

rows同CSC中的rows。

columnptrs同CSC中的columnptrs。

如上图所示,在内存中存储边集合E,需要维持边的值数组,csr和csc。CSR有两个整型数组,rowptrs和columns,分别用来存储行偏移地址和列索引。CSC有三个整型数组,shuffleptrs、rows和columnptrs,分别存储着从按列顺序排序的稀疏矩阵到按行顺序排列的稀疏矩阵转换的下标,行索引和列偏移地址,shuffleptrs和rows具有相同的下标,可以合并成一个数组。

具体步骤如下:

E的原始输入由三个相同长度的数组组成,source_arr、target_arr和data_arr,分别存储着边的source vertex、target vertex和边的值。source vertex相当于邻接矩阵的行,target vertex相当于邻近矩阵的列。如果要形成最终的结果,需要以下这些步骤,才能形成上图中的存储。

1. counter_sort(source_arr, P, rowptrs)

2. sort(P, E)

//使用P按照行顺序对E中的三个数组进行排序,P数组是按照行的顺序保存着E的下标,

3. columns = target_arr

4. csr = {rowptrs, columns}

5. counter_sourt(target_arr, P, columnptrs)

6. sort(P, source_arr)

//对source_arr按列顺序进行排列,最后作为行索引

7. rows = source_arr; shuffleptrs = P.

8. csc = {columnptrs, rows, shuffleptrs}

Graphlab中的具体类:

在graphlab中,图的本地静态存储是由local_graph来实现,local_graph中保存图使用了四个数据结构:

std::vector<VertexData> vertices:存储顶点数据的数组,顶点的ID为0到数组的长度。

std::vector<EdgeData> edges:存储边的值的数组,相当于edges_values。

csr_type _csr_storage:表示csr,由csr_storage这个类来实现。

csc_type _csc_storage:表示csc,由csr_storage这个类来实现。

csr_storage中有两个成员变量,分别是:

std::vector<sizetype> value_ptrs;

std::vector<valuetype> values;

当csr_storage表示csr时,value_ptrs等同于rowptrs,是一个uint64_t数组;values等同于columns,也是一个uint64_t数组。

当csr_storage表示csc时,value_ptrs等同于columnptrs,是一个uint64_t数组;values则被定义成std::vector< std::pair<lvid_type, edge_id_type> >,相当于将rows和shuffleptrs存储在同一个vector中。

【转载】自 http://my.oschina.net/zhengyang841117/blog/194826