分布式机器学习调研

Tensorflow目前支持两种分布式训练模式:

  • 同步训练(Synchronous training):各worker之间的训练步调(steps)一致;
  • 异步训练(Asynchronous training):各worker之间的步调(steps)不严格同步。

MultiWorkerMirroredStrategy 是最常见的用于多机的同步训练实现。

在 multi-worker 训练下,数据集划分对于确保模型收敛性有重要意义。

  • 问题一:目前的数据集划分方式是什么?
  • 问题二:每个batch中,不同worker间是否有使用到相同的训练样本?

基于数据流的分布式机器学习系统借鉴了基于DAG的大数据处理系统的灵活性,将计算任务描述成为一个有向无环的数据流图,图中的节点表示对数据的操作(计算或者通信),图中的边表示操作的依赖关系。系统自动提供对数据流图的分布式执行,因此用户所要关心的只是如何设计出适当的数据流图来表达想要执行的算法逻辑。

下面我们将以数据流系统的代表Tensorflow中的数据流图为例介绍典型的数据流图。在Tensorflow中每个节点描述了一种运算符(比如矩阵的运算和特定的优化算子等),输入节点也可以看作是一种特殊的运算符,而边代表了节点之间的数据以及它们的流动方向。当系统接收到用户定义的数据流图后,首先对该数据流图进行优化,包括裁剪移除一些无用操作、内存优化等。然后根据分配算法,将图中计算节点分配到实际的运算设备上分布式地执行这些计算任务。系统引擎依据数据流图依次调度图中每个节点来执行相应的任务。

当用户在一个数据流图中同时包含多路的训练逻辑,然后通过加入参数聚合节点来收集它们的模型更新结果,就可以完成分布式的训练逻辑。因此,从数据流的角度看,分布式机器学习只是某种数据流设计而已,并没有什么特别。从这个意义上讲,数据流架构要比参数服务器架构更加通用和灵活。

Tensorflow有两种数据对象,分别是张量(Tensor)和变量(Variable)。模型参数在机器学习训练中不断被更新,因此属于变量。Tensorflow运算过程中所有需要存储下来的变量合在一起被成为图的状态。而数据流图中节点的输出(通常称为中间结果)一般是不可变的数据,属于张量。张量和变量一样都可以参与运算,不同的是张量作为中间值用过以后就释放了,而变量会被系统存储下来,在下一轮迭代中继续使用。

MapReduce逻辑也可以表示成某种数据流图。但是相比而言,Tensorflow系统中数据流图与具体机器学习算法的结合更加紧密,并且底层操作的复用性和灵活性更强,用户可以基于底层操作搭建各式各样的算法模型,而不只是进行数据处理。

Tensorflow系统中,任务是通过会话(Session)来统一管理的。用户通过会话来启动数据流图,管理运行时资源的申请和分配,以及关闭计算任务。在使用Tensorflow系统时,我们以会话贯穿整个计算任务,因此源码中经常会出现下面的模式:

sess = tf.Session()
# 构建数据流图
# 构建数据
# 描述计算资源,以及数据流图和计算资源之间的对应关系
result = sess.run(graph)
sess.close()

在Tensorflow的执行过程中有几个不同的角色在起作用,包活客户端(Client)、主控程序(Master)和工作节点(Worker)。其中:

  • Client用来启动任务,它通过Session的接口与Master取得联系,告知整个系统要运行的任务内容以及所需要的资源情况。
  • Master是主控制节点,它负责分发任务、调度资源,并且把计算结果返回给客户端。Master是一个执行引擎,拿到数据流图的信息后,会结合数据流图和计算资源的情况将具体的计算分配给工作节点来完成。
  • Worker是工作节点,它负责管理和使用计算设备,具体而言就是机器中的GPU、CPU等设备。Master将计算算子发送给Worker,Worker负责用其管理的设备来执行这些算法并返回结果给Master。

Tensorflow对于Client-Master-Worker有很灵活的配置方式,既可以让它们工作在单个节点中,也可以让它们工作在分布式的环境中利用多机多卡进行训练。一般在单机环境下,这三者在同一个进程当中;而在分布式环境下,它们会通过远程过程调用(RPC)的方式连接起来。

在Tensorflow系统中,数据流图既会定义各个计算算子的输入/输出关系,也会定义其使用的计算资源。在默认情况下,Tensorflow会进行自动的设备分配。当用户需要自定义计算资源分配方案时,也可以显式告诉Tensorflow系统如何分配系统资源,手工指定各个操作和Worker之间的关系。

sess = tf.Session()
with tf.device("/gpu:1"):
    matrix1 = tf.constant([[3,3]])
    matrix2 = tf.constant([[2], [2]])
    product1 = tf.matmul(matrix1, matrix2)
with tf.device("/gpu:0"):
    matrix3 = tf.constant([[3,3]])
    matrix4 = tf.constant([[2], [2]])
    product2 = tf.matmul(matrix3, matrix4)
final = tf.matadd(product1, product2)
result = sess.run(final)
sess.close()

即便要进行多设备的分布式训练,用户也不需要处理多个设备之间的交互问题。只需定义好每个设备的数据流图,系统就会根据数据流图中算子本身的输入输出自动识别出多设备之间的数据依赖关系。设备之间的通信由Tensorflow系统来负责,当计算需要跨设备或者跨网络时,系统会自动加入发送(Send)和接收(Recv)算子。这样不但从用户体验上很友好,系统上也更容易统一优化,让通信环节更加高效。

Cluster-Job-Task:为了支持分布式机器学习,Tensorflow使用Cluster、Job和Task来定义一个计算集群。其中Cluster是最顶层的定义,包含任务将要运行的完整设置。具体而言,Cluster里面会定义有哪几种Job的角色,每个Job的角色下有多少实际的Task在工作。比如一个Job中可以有参数服务器(PS)这类存储和管理变量的任务,也可以有Worker这类进行具体模型训练的任务。每个Job角色会由一个或者多个实体的Worker来实际担当。举个例子,下面的代码片段中定义了一个Cluster,其中有两种Job:一种叫作worker,分别由三个具体的Task(worker0、worker1、worker2)组成;另一种叫作ps,由ps0这一个Task组成。

tf.train.ClusterSpec({
    "worker": [
        "worker0.example.com:2222",
        "worker1.example.com:2222",
        "worker2.example.com:2222",
    ],
    "ps": [
        "ps0.example.com:2222"
    ]
})

定义了集群之后需要进一步定义用于分布式学习的数据流图。Tensorflow提供了两种分布式训练模式,分别是In-Graph模式和Between-Graph模式。

In-Graph模式下整个任务由一张数据流图来描述:将模型训练中的运算部分手动复制后,作为不同的Task放在一个图里面,最后加入一个节点用来汇总信息。不同Task的