当前位置:首页 > 资讯 > 正文

进程间通信 --- 队列, 管道. Manager模块 进程池: Pool

进程间通信 --- 队列, 管道.  Manager模块  进程池: Pool

一, 进程间通信 --- 队列, 管道.

  ps: IPC == 进程之间的通信

  1, 队列:

    1, 正常队列: from queue import Queue
      queue.Queue 学名(FIFO) 先进先出 -- 维护秩序的时候用的比较多,
      q = Queue()        实例化队列对象, 可以传一个最大元素个数  
      put()            放入 如果超出该队列最大元素个数 则 堵塞
      put_nowait()      放入, 如果超出,会报错.
      get()           拿出 等同于list.pop, 如果没有值 会一直堵塞.
      get_nowait()            拿出, 只不过当没有值的时候 会报错.
      qsize()             查询队列中元素的个数.
      full()           判断队列是否满了 返回布尔值
      empty()         判断段队列是否为空 返回布尔值

    ps: 栈 先进后出 -- 常用于算法.

    2, 进程中的队列: from multiprocessing import Queue

      方法同上, 唯一不同的是 full() empty()在进程中的值不够准确

  

3, 生产者消费者模型:解决创造(生产)数据 和处理(消费)数据的效率不平衡的问题

      把创造数据 和 处理 数据放在不同的进程中, 根据他们的效率来调整进程的个数

      1, 生产数据快, 消费数据慢, 内存空间的浪费
      2, 消费数据快, 生产数据慢, 效率低下.
      ps: consumer 消费者 producer 生产者.

  3, 让消费者自动停下来:
        1, 在所有的生产者生产结束后, 向队列中放一个结束符
        2, 有几个consumer(消费者) 就向队列中放几个结束符
        3, 在消费者消费的过程中, 接收到结束符, 就结束消费进程

  

 4, 此方法 看起来比较low 不够自动化, 还得让 程序员先去确认有几个 消费者, 要知道消费者 本身就是一个不确定因素, 所以大神们 写出来一个模块叫做 JoinableQueue

      4, JoinableQueue 比Queue 多了两个方法,并且内部多了一个计数器.:  此模块用来解决消费者被堵塞的问题

        ps: 这里的消费者要作为主进程的一个守护进程存在.
        1, join() 堵塞等消费者消费完毕之后结束堵塞状态
        2, task_done() 让队列中的计数器-1

  

ps: 1, 只有在 multiprocessing 中的队列, 才能帮助你实现PIC
        2, 永远不可能出现 数据不安全的情况, 多个进程不会同时get同一个数据
        3, 由于先进先出的特点+ 进程通信的功能+ 数据进程安全, 经常用它来完成进程之间的通信

  2, 管道: 队列就是基于管道实现的.
    队列 数据是安全的.
    管道 数据不安全的
    队列 = 管道 + 锁

    方法:
      from multiprocessing import Pipe
      1, left,right = Pipe() 此实例化会产生两个端点对象
      2, (left/right).recv() 接受另外一端发来的消息
      3, (left/right).send() 向另外一端发送消息

        ps: 因为 管道是在文件中进行传递消息, 所以 不用要求 必须是bytes类型的数据, 所以 reve/send 两个方法没有太多的要求.
      4, (left/right).close() 关闭某一端
    注意: 在不同的进程中, 每用 left/right 都会在管道中开一个接口,所以,必须要把没用的接口全部关闭

  EOF异常的触发:
      1, 在一个进程中,如果不在用 left/right 端口则应该用close()关闭该端口
      2, 如果其他端点都被关闭, 只剩下一个端点在 reve()的时候,该端点就能够知道不会再有新的消息传来
      3, 此时 recv() 不会在进行堵塞, 则抛出 EOFError异常
      4, 注意: close() 并没有将整个管道关闭, 实际,是给操作系统对管道端点的计数处理的功能执行-1的操作

  二, 进程之间的数据共享: Manager模块
  此模块中包含一些平常常用的数据类型,可以将原本不可以在进程与进程之间共享的数据, 进行共享
  因为此共享是基于管道来实现, 所以, 此模块中的数据类型除了队列,是不安全的.

  

  

三, 进程池: Pool    (面向高计算型的场景, 采用多进程.)

 

  进程池出现的原因:

    进程不能不限制的开启, 会给os的调度增加负担,且 正真能被同时执行的进程最多也就和 CPU个数相等, 所以此时就出现了进程池的概念, 同时打开和CUP个数相等的进程, 每执行完一个再进来一个进程, 这样限制了同时打开的进程的个数, 有利于os系统的调度, 同时也能节省内存.

  apply(func) 将某个函数 执行 同步, 有返回值.

  apply_async() 带着'async'都是异步 返回一个队列对象.
    1, 如果是异步的提交任务, 那么任务提交之后进程池和主进程也异步了, 主进程不会自动等待进程池中的任务执行完毕
    2, 如果需要主进程等待, 则用 join(), 但是join()是依赖close()
    3, 如果函数有返回值,
      可以通过ret.get() 来获取返回值.
      但如果一边提交一遍获取返回值会让程序变成同步的
      所以想要保留异步的效果, 应该将返回对象保存在列表里, 所有任务提交完成之后,再来取结果
      这种方式也可以去掉join.
  close() 关闭进程池, 让任务不能继续提交.
  join() 堵塞等待子进程结束

  回调函数: -- 在主进程中执行.
    在发起任务的时候, 指定callback参数, 在每个进程执行完任务之后, 返回值会直接作为参数传递给callback的函数, 然后执行 callback函数中的代码