1.5 TensorFlow的队列

队列(queue)是一种最为常用的数据输入/输出方式,其通过先进先出的线性数据结构,一端只负责增加TensorFlow的队列中的数据元素,而数据的输出和删除在队列的另一端实现。能够增加数据元素的队列一端被称为队尾,而输出和删除数据元素的一端被称为队首。

TensorFlow应用队列作为数据的一种基本输入/输出方式,可以将新的数据插入到队列的队尾,而在队首将数据输出和删除。当然,在TensorFlow中可以这样认为,队列在TensorFlow中处于一种有状态节点的地位,随着其他节点在图中状态的改变,队列这个“节点”的状态可以随之改变。

1.5.1 队列的创建

表1-1为队列常用的方法汇总。

表1-1 队列常用的方法汇总

一般而言,创建一个队列首先要选定数据出入类型。例如,是使用FIFOQueue函数设定数据为先入先出,还是使用RandomShuffleQueue这种随机元素出列的方式。

函数的第一个参数是队列中数据的个数,第二个参数是队列中元素的类型。之后要对队列中元素进行初始化和运行操作,需要特别注意的是,TensorFlow中任何操作都是在“会话”中进行的,因此其基本操作都要由会话(Session)完成。

enqueue_many函数将上文中创建的FIFOQueue函数进行了填充,因为q被设置成包含3个元素的函数,因此其一次性被填充进3个数据。但是实际上,此时的数据填充并没有完成,而是进行了一个预备工作,真正的工作要在会话中完成,因此还需要运行会话中的run函数。

例1-1为创建队列实例演示,在实例中首先设定一个“先入先出”的队列,之后被填充数据。dequeue函数将其中的数据弹出,此时为了能够让这个队列操作完成,这步操作被命名为init2,下面的init3同样是在对话中完成的。之后通过对话操作对这3个步骤进行处理。

【例1-1】创建队列实例演示。

size函数获取了当前队列的数据个数,之后通过一个for循环将队列中的数据弹出。最终打印结果如下:

由结果可以看到,第一次init的3个数值中0.3被dequeue弹出,取而代之的是enqueue函数进去的1这个数值。

提示:dequeue是一个可以阻塞队列的函数,如果其中没有数据被弹出,则会阻塞队列直到数据填充之后被弹出。

从例1-1中可以看出,队列操作是在主线程的对话中依次完成的。这样做的好处是不易阻塞队列,出了bug容易查找等。例如,数据执行入队操作后从硬盘上将数据输入到内存中供后续使用,但是这样的操作会造成数据读取和输入较慢,处理相对困难。

TensorFlow中提供了QueueRunner函数用以解决异步操作问题,如例1-2所示。其可创建一系列的线程同时进入主线程内进行操作,数据读取与操作是同步的,即主线程在进行训练模型工作的同时将数据从硬盘读入。

【例1-2】利用QueueRunner函数解决异步操作问题。

在程序中首先创建了1个数据处理函数,add_op的操作是将整数1叠加到变量counter上。为了执行这个操作,qr创建了一个队列管理器QueueRunner,其调用了2个线程去完成此项任务。create_threads函数对线程进行了启动,此时线程已经开始运行。

而在for循环中,主程序同时也对队列进行操作,即不停地将数据从队列中弹出,结果如下:

从例1-2中可以看出,程序首先是正确输出的,但是在后半部分程序执行时会报错。

如果换一种表述形式,结果会怎样呢?代码如下:

可以看到此时的会话并没有报错,但是程序也没有结束,而是被挂起。造成这种情况的原因是add操作和入队操作没有同步,即TensorFlow在队列设计时为了优化I/O系统,队列的操作一般使用批处理,这样入队线程没有发送结束的信息,而程序主线程期望将程序结束,因此造成线程阻塞,程序被挂起。

提示:TensorFlow中一般遇到程序被挂起的情况指的是数据输入与处理没有同步,即需要数据时却没有数据被输入到队列中,那么线程就会被整体挂起。而此时tf也不会报错而是一直处于等待状态。

1.5.2 线程同步与停止

可以看到,TensorFlow的会话是支持多线程的,多线程可以很方便地在一个会话下共同工作,并行地相互执行。但是通过程序演示也看到,这种同步会造成某个线程想要关闭对话时,对话被强行关闭而未完成工作的线程也被强行关闭。

TensorFlow为了解决多线程的同步和处理问题,提供了Coordinator和QueueRunner函数来对线程进行控制和协调。在使用上,这两个类必须同时工作,共同协作来停止会话中所有线程,并向等待所有工作的线程终止程序报告。例1-3为线程同步与停止实例演示。

【例1-3】线程同步与停止实例演示。

在程序中,create_threads函数被添加了一个新的参数:线程协调器,用于协调线程之间的关系。当启动线程后,线程协调器在最后负责对所有线程接收和处理,当一个线程结束时,线程协调器会对所有的线程发出通知,协助其完毕。

1.5.3 队列中数据的读取

TensorFlow支持很多种样例输入的方式。第一种方法最容易,即使用placeholder,但这需要手动传递numpy.array类型的数据;第二种方法就是使用二进制文件和输入队列的组合形式。这种方法不仅节省了代码量,避免了进行data augmentation和读文件操作,可以处理不同类型的数据,而且也不再需要人为地划分开“预处理”和“模型计算”。在使用TensorFlow进行异步计算时,队列是一种强大的机制。

正如TensorFlow中的其他组件一样,队列就是TensorFlow图中的节点。这是一种有状态的节点,就像变量一样:其他节点可以修改它的内容。具体来说,其他节点可以把新元素插入到队尾,也可以把队首的元素删除,如FIFOQueue和RandomShuffleQueue等对象。

在TensorFlow中对Tensor进行异步计算非常重要。例如,一个典型的输入结构是使用一个RandomShuffleQueue来作为模型训练输入的,多线程准备训练样本,并且把这些样本压入队列,一个训练线程执行一个训练操作,此操作会从队列中移除最小批次的样本(mini-batches),这种结构具有许多优点。

TensorFlow的Session对象是可以支持多线程的,因此多线程可以很方便地使用同一个会话(Session)并且并行执行操作。然而,在Python程序实现这样的并行运算却并不容易。所有线程都必须能被同步终止,异常必须能被正确捕获并报告。当会话终止的时候,队列必须能被正确地关闭。所幸TensorFlow提供了两个类来帮助多线程的实现:tf.Coordinator和 tf.QueueRunner。从设计上这两个类必须被一起使用。Coordinator类可以用来同时停止多个工作线程并且向那个在等待所有工作线程终止的程序报告异常。QueueRunner类用来协调多个工作线程同时将多个Tensor压入同一个队列中。

图1-6所示为队列读取数据流程。

图1-6 队列读取数据流程

在图1-6中,首先由一个单线程把文件名压入队列,两个Reader同时从队列中取文件名并读取数据,然后Decoder将读出的数据解码后压入样本队列,最后单个或批量取出样本(图1-6中没有展示样本出列)。

此处,搭建数据读取图需要5个步骤:

(1)将我们的数据转成相应的文件放入磁盘。

(2)把文件名压入队列。

(3)从队列中取文件名并读取数据。

(4)Decoder将读出的数据解码。

(5)解码后压入样本队列,最后单个或批量取出样本。

下面,就使用代码一步步来解释上述过程。

(1)构造训练数据,使用tf.python_io.TFRecordWriter创建一个专门存储TensorFlow数据的writer,扩展名为'.tfrecord',最后存储的文件是序列化的文件:

(2)把文件名压入队列,从队列中取文件名并读取数据,Decoder将读出的数据解码都放在一个函数里面,与刚才的writer不同,这个reader是符号化的,只有在Session中运行才会执行:

(3)解码后压入样本队列,最后单个或批量取出样本,是使用tf.train.shuffle_batch来实现取出样本的: