必威体育Betway必威体育官网
当前位置:首页 > IT技术

tf.train.Coordinator

时间:2019-10-15 12:43:18来源:IT技术作者:seo实验室小编阅读:74次「手机版」
 

coordinator

tensorflow中协调器 tf.train.Coordinator 和入队线程启动器 tf.train.start_queue_runners

2018年04月01日 18:51:57

阅读数:1285

TensorFlow的session对象是支持多线程的,可以在同一个会话(Session)中创建多个线程,并行执行。在Session中的所有线程都必须能被同步终止,异常必须能被正确捕获并报告,会话终止的时候, 队列必须能被正确地关闭。

TensorFlow提供了两个类来实现对Session中多线程的管理:tf.Coordinator和 tf.QueueRunner,这两个类往往一起使用。

Coordinator类用来管理在Session中的多个线程,可以用来同时停止多个工作线程并且向那个在等待所有工作线程终止的程序报告异常,该线程捕获到这个异常之后就会终止所有线程。使用 tf.train.Coordinator()来创建一个线程管理器(协调器)对象。

QueueRunner类用来启动tensor的入队线程,可以用来启动多个工作线程同时将多个tensor(训练数据)推送入文件名称队列中,具体执行函数是 tf.train.start_queue_runners , 只有调用 tf.train.start_queue_runners 之后,才会真正把tensor推入内存序列中,供计算单元调用,否则会由于内存序列为空,数据流图会处于一直等待状态。

tf中的数据读取机制如下图:

  1. 调用 tf.train.slice_input_producer,从 本地文件里抽取tensor,准备放入Filename Queue(文件名队列)中;
  2. 调用 tf.train.BATch,从文件名队列中提取tensor,使用单个或多个线程,准备放入文件队列;
  3. 调用 tf.train.Coordinator() 来创建一个线程协调器,用来管理之后在Session中启动的所有线程;
  4. 调用tf.train.start_queue_runners, 启动入队线程,由多个或单个线程,按照设定规则,把文件读入Filename Queue中。函数返回线程ID的列表,一般情况下,系统有多少个核,就会启动多少个入队线程(入队具体使用多少个线程在tf.train.batch中定义);
  5. 文件从 Filename Queue中读入内存队列的操作不用手动执行,由tf自动完成;
  6. 调用sess.run 来启动数据出列和执行计算;
  7. 使用 coord.should_stop()来查询是否应该终止所有线程,当文件队列(queue)中的所有文件都已经读取出列的时候,会抛出一个 OutofRangeERROR 的异常,这时候就应该停止Sesson中的所有线程了;
  8. 使用coord.request_stop()来发出终止所有线程的命令,使用coord.join(threads)把线程加入主线程,等待threads结束。

以上对列(Queue)和 协调器(Coordinator)操作示例:

[Python] view plain copy print?

  1. # -*- coding:utf-8 -*-  
  2. import tensorflow as tf  
  3. import numpy as np  
  4.   
  5. # 样本个数  
  6. sample_num=5  
  7. # 设置迭代次数  
  8. epoch_num = 2  
  9. # 设置一个批次中包含样本个数  
  10. batch_size = 3  
  11. # 计算每一轮epoch中含有的batch个数  
  12. batch_total = int(sample_num/batch_size)+1  
  13.   
  14. # 生成4个数据和标签  
  15. def generate_data(sample_num=sample_num):  
  16.     labels = np.asarray(range(0, sample_num))  
  17.     images = np.random.random([sample_num, 224, 224, 3])  
  18.     print('image size {},label size :{}'.format(images.shape, labels.shape))  
  19.     return images,labels  
  20.   
  21. def get_batch_data(batch_size=batch_size):  
  22.     images, label = generate_data()  
  23.     # 数据类型转换为tf.float32  
  24.     images = tf.cast(images, tf.float32)  
  25.     label = tf.cast(label, tf.int32)  
  26.   
  27.     #从tensor列表中按顺序或随机抽取一个tensor准备放入文件名称队列  
  28.     input_queue = tf.train.slice_input_producer([images, label], num_epochs=epoch_num, shuffle=False)  
  29.   
  30.     #从文件名称队列中读取文件准备放入文件队列  
  31.     image_batch, label_batch = tf.train.batch(input_queue, batch_size=batch_size, num_threads=2, capacity=64, allow_smaller_final_batch=False)  
  32.     return image_batch, label_batch  
  33.   
  34. image_batch, label_batch = get_batch_data(batch_size=batch_size)  
  35.   
  36.   
  37. with tf.Session() as sess:  
  38.   
  39.     # 先执行初始化工作  
  40.     sess.run(tf.global_variables_initializer())  
  41.     sess.run(tf.local_variables_initializer())  
  42.   
  43.     # 开启一个协调器  
  44.     coord = tf.train.Coordinator()  
  45.     # 使用start_queue_runners 启动队列填充  
  46.     threads = tf.train.start_queue_runners(sess, coord)  
  47.   
  48.     try:  
  49.         while not coord.should_stop():  
  50.             print '************'  
  51.             # 获取每一个batch中batch_size个样本和标签  
  52.             image_batch_v, label_batch_v = sess.run([image_batch, label_batch])  
  53.             print(image_batch_v.shape, label_batch_v)  
  54.     except tf.errors.OutOfRangeError:  #如果读取到文件队列末尾会抛出此异常  
  55.         print("done! now lets kill all the threads……")  
  56.     finally:  
  57.         # 协调器coord发出所有线程终止信号  
  58.         coord.request_stop()  
  59.         print('all threads are asked to stop!')  
  60.     coord.join(threads) #把开启的线程加入主线程,等待threads结束  
  61.     print('all threads are stopped!')  

# -*- coding:utf-8 -*-
import tensorflow as tf
import numpy as np

# 样本个数
sample_num=5
# 设置迭代次数
epoch_num = 2
# 设置一个批次中包含样本个数
batch_size = 3
# 计算每一轮epoch中含有的batch个数
batch_total = int(sample_num/batch_size)+1

# 生成4个数据和标签
def generate_data(sample_num=sample_num):
    labels = np.asarray(range(0, sample_num))
    images = np.random.random([sample_num, 224, 224, 3])
    print('image size {},label size :{}'.format(images.shape, labels.shape))
    return images,labels

def get_batch_data(batch_size=batch_size):
    images, label = generate_data()
    # 数据类型转换为tf.float32
    images = tf.cast(images, tf.float32)
    label = tf.cast(label, tf.int32)

    #从tensor列表中按顺序或随机抽取一个tensor准备放入文件名称队列
    input_queue = tf.train.slice_input_producer([images, label], num_epochs=epoch_num, shuffle=False)

    #从文件名称队列中读取文件准备放入文件队列
    image_batch, label_batch = tf.train.batch(input_queue, batch_size=batch_size, num_threads=2, capacity=64, allow_smaller_final_batch=False)
    return image_batch, label_batch

image_batch, label_batch = get_batch_data(batch_size=batch_size)


with tf.Session() as sess:

    # 先执行初始化工作
    sess.run(tf.global_variables_initializer())
    sess.run(tf.local_variables_initializer())

    # 开启一个协调器
    coord = tf.train.Coordinator()
    # 使用start_queue_runners 启动队列填充
    threads = tf.train.start_queue_runners(sess, coord)

    try:
        while not coord.should_stop():
            print '************'
            # 获取每一个batch中batch_size个样本和标签
            image_batch_v, label_batch_v = sess.run([image_batch, label_batch])
            print(image_batch_v.shape, label_batch_v)
    except tf.errors.OutOfRangeError:  #如果读取到文件队列末尾会抛出此异常
        print("done! now lets kill all the threads……")
    finally:
        # 协调器coord发出所有线程终止信号
        coord.request_stop()
        print('all threads are asked to stop!')
    coord.join(threads) #把开启的线程加入主线程,等待threads结束
    print('all threads are stopped!')

输出:

[python] view plain copy print?

  1. ************  
  2. ((3, 224, 224, 3), array([0, 1, 2], dtype=int32))  
  3. ************  
  4. ((3, 224, 224, 3), array([3, 4, 0], dtype=int32))  
  5. ************  
  6. ((3, 224, 224, 3), array([1, 2, 3], dtype=int32))  
  7. ************  
  8. done! now lets kill all the threads……  
  9. all threads are asked to stop!  
  10. all threads are stopped!  

************
((3, 224, 224, 3), array([0, 1, 2], dtype=int32))
************
((3, 224, 224, 3), array([3, 4, 0], dtype=int32))
************
((3, 224, 224, 3), array([1, 2, 3], dtype=int32))
************
done! now lets kill all the threads……
all threads are asked to stop!
all threads are stopped!

以上程序在 tf.train.slice_input_producer 函数中设置了 num_epochs 的数量, 所以在文件队列末尾有结束标志,读到这个结束标志的时候抛出 OutofRangeError 异常,就可以结束各个线程了。

如果不设置 num_epochs 的数量,则文件队列是无限循环的,没有结束标志,程序会一直执行下去。

相关阅读

NTFS文件系统详解 之 文件定位

一如既往的叨叨   首先要对硬盘分区(MBR、GPT)和文件系统(NTFS、FAT32等)有一定的认识,要知道MBR扇区以及DBR扇区的基本结构,如

文件大小单位转换函数-getFileSize($bytes)

function getFileSize($bytes){ if ($bytes >= pow(2,40)) { # code... $return = round($bytes/pow(1024, 4),2); $suffi

Process对象.waitFor()的阻塞问题(坑)

有时需要在程序中调用可执行程序或脚本命令: Process process = Runtime.getRuntime().exec(shPath); int exitCode = process .w

Broadcast的Intentfilter过滤策略

一、注册方式作为Android四大组件之一的广播有两种注册方式:静态注册和动态注册。在注册之前,我们应该有自己的BroadcastReceiver,即

编码格式简介(ANSI、GBK、GB2312、UTF-8、GB18030和 UN

转发:http://blog.jobbole.com/30526/来源:潜行者m 的博客编码一直是让新手头疼的问题,特别是 GBK、GB2312、UTF-8 这三个比较常见的

分享到:

栏目导航

推荐阅读

热门阅读