python3 queue多线程通信

2022-07-12 18:57:44
目录
queue分类例子一、生产消费模式例子二、task_done和join例子三、多线程里用queue

queue分类

python3>

    先进先出队列后进先出的栈优先级队列

    他们的导入方式分别是:

    from queue import Queue
    from queue import LifoQueue
    from queue import

    具体方法见下面引用说明。

    例子一、生产消费模式

    Queue>

    例如:

    from queue import Queue
    from threading import Thread
    # 用来表示终止的特殊对象
    _sentinel = object()
    # A thread that produces data
    def producer(out_q):
    for i in range(10):
    print("生产")
    out_q.put(i)
    out_q.put(_sentinel)
    # A thread that consumes data
    def consumer(in_q):
    while True:
    data = in_q.get()
    if data is _sentinel:
    in_q.put(_sentinel)
    break
    else:
    print("消费", data)
    # Create the shared queue and launch both threads
    q = Queue()
    t1 = Thread(target=consumer, args=(q,))
    t2 = Thread(target=producer, args=(q,))
    t1.start()
    t2.start()

    结果:

    本例中有一个特殊的地方:消费者在读到这个特殊值之后立即又把它放回到队列中,将之传递下去。这样,所有监听这个队列的消费者线程就可以全部关闭了。 尽管队列是最常见的线程间通信机制,但是仍然可以自己通过创建自己的数据结构并添加所需的锁和同步机制来实现线程间通信。最常见的方法是使用 Condition变量来包装你的数据结构。下边这个例子演示了如何创建一个线程安全的优先级队列。

    import heapq
    import threading
    class PriorityQueue:
    def __init__(self):
    self._queue = []
    self._count = 0
    self._cv = threading.Condition()
    def put(self, item, priority):
    with self._cv:
    heapq.heappush(self._queue, (-priority, self._count, item))
    self._count += 1
    self._cv.notify()
    def get(self):
    with self._cv:
    while len(self._queue) == 0:
    self._cv.wait()
    return heapq.heappop(self._queue)[-1]

    例子二、task_done和join

    使用队列来进行线程间通信是一个单向、不确定的过程。通常情况下,你没有办法知道接收数据的线程是什么时候接收到的数据并开始工作的。不过队列对象提供一些基本完成的特性,比如下边这个例子中的task_done()>join():

    from queue import Queue
    from threading import Thread
    class Producer(Thread):
    def __init__(self, q):
    super().__init__()
    self.count = 5
    self.q = q
    def run(self):
    while self.count > 0:
    print("生产")
    if self.count == 1:
    self.count -= 1
    self.q.put(2)
    else:
    self.count -= 1
    self.q.put(1)
    class Consumer(Thread):
    def __init__(self, q):
    super().__init__()
    self.q = q
    def run(self):
    while True:
    print("消费")
    data = self.q.get()
    if data == 2:
    print("stop because data=", data)
    # 任务完成,从队列中清除一个元素
    self.q.task_done()
    break
    else:
    print("data is good,data=", data)
    # 任务完成,从队列中清除一个元素
    self.q.task_done()
    def main():
    q = Queue()
    p = Producer(q)
    c = Consumer(q)
    p.setDaemon(True)
    c.setDaemon(True)
    p.start()
    c.start()
    # 等待队列清空
    q.join()
    print("queue is complete")
    if __name__ == '__main__':
    main()

    结果:

    例子三、多线程里用queue

    设置俩队列,一个是要做的任务队列todo_queue,一个是已经完成的队列done_queue
    每次执行线程,先从todo_queue队列里取出一个值,然后执行完,放入done_queue队列。
    如果todo_queue为空,就退出。

    import logging
    import logging.handlers
    import threading
    import queue
    
    log_mgr = None
    todo_queue = queue.Queue()
    done_queue = queue.Queue()
    class LogMgr:
    def __init__(self, logpath):
    self.LOG = logging.getLogger('log')
    loghd = logging.handlers.RotatingFileHandler(logpath, "a", 0, 1)
    fmt = logging.Formatter("%(asctime)s %(threadName)-10s %(message)s", "%Y-%m-%d %H:%M:%S")
    loghd.setFormatter(fmt)
    self.LOG.addHandler(loghd)
    self.LOG.setLevel(logging.INFO)
    def info(self, msg):
    if self.LOG is not None:
    self.LOG.info(msg)
    class Worker(threading.Thread):
    global log_mgr
    def __init__(self, name):
    threading.Thread.__init__(self)
    self.name = name
    def run(self):
    while True:
    try:
    task = todo_queue.get(False)
    if task:
    log_mgr.info("HANDLE_TASK: %s" % task)
    done_queue.put(1)
    except queue.Empty:
    break
    return
    def main():
    global log_mgr
    log_mgr = LogMgr("mylog")
    for i in range(30):
    todo_queue.put("data"+str(i))
    workers = []
    for i in range(3):
    w = Worker("worker"+str(i))
    workers.append(w)
    for i in range(3):
    workers[i].start()
    for i in range(3):
    workers[i].join()
    total_num = done_queue.qsize()
    log_mgr.info("TOTAL_HANDLE_TASK: %d" % total_num)
    exit(0)
    if __name__ == '__main__':
    main()

    输出日志文件结果:

    到此这篇关于python3 queue多线程通信的文章就介绍到这了,更多相关python queue多线程通信内容请搜索易采站长站以前的文章或继续浏览下面的相关文章希望大家以后多多支持易采站长站!