并发队列:ArrayBlockingQueue实际运用场景和原理

ArrayBlockingQueue实际应用场景

之前在某公司做过一款情绪识别的系统,这套系统通过调用摄像头接口采集人脸信息,将采集的人脸信息做人脸识别和情绪分析,最终经过一定的算法将个人情绪数据转化具体行为指标值。其中采集图片的部分就用到了并发队列ArrayBlockingQueue。

image.png

如上图所示:摄像头有n个,单线程采集的效率会比较慢,所以在采集摄像头的过程中是多线程的,另外采集到的图片需要存储到图片服务器,对图片服务器写也有很高的要求,图片服务器是集群的,也需要用到也多线程的。将图片入库后需要将图片数据打到人脸分析服务器 上去处理,这部分涉及到了分布式消息,所以是黑色虚线 部分用kafka来传递消息。其中红色虚线部分 多线程图片采集将信息传递到多线程图片存储用到了ArrayBlockingQueue ,它是并发安全队列

ArrayBlockingQueue简化类图结构

image.png 从类图可以看出Queue接口提供了add,offer入队列的方法,提供 poll出队列的方法! BlockingQueue接口增加了put 入队列的方法,提供 take出队列的方法! 补充说明:UML类图结构:

  • 继承:实线空箭头。
  • 实现:虚线虚箭头。

并发队列阻塞和非阻塞概念

从上面类图名字可以看到Queue 提供的方法是非阻塞的!而BlockingQueue 提供的put, take 方法是阻塞的!下面按老思路,们用代码说明阻塞 非阻塞下! 非阻塞

import java.util.concurrent.ArrayBlockingQueue;
/**
 * @author :jiaolian
 * @date :Created in 2021-02-02 20:16
 * @description:ArrayBlockingQueue阻塞非阻塞测试
 * @modified By:
 * 公众号:叫练
 */
public class ArrayBlockingQueueTest {
    public static void main(String[] args) {
        ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(1);
        arrayBlockingQueue.offer("叫练");
        arrayBlockingQueue.offer("叫练");
        //输出arrayBlockingQueue的长度
        System.out.println(arrayBlockingQueue.size());
    }
}

如上代码:设置ArrayBlockingQueue长度为1,通过offer方法向队列添加2个元素,最后打印arrayBlockingQueue的长度?答案是1,不会阻塞 ,因为offer方法丢弃 了第二个元素"叫练”,们说出队和入队能够让其继续执行的队列们称为非阻塞。如果换成add方法呢?就会报错队列溢出,如下图所示!但是还不是阻塞的。下面们看看什么阻塞!

image.png

阻塞

import java.util.concurrent.ArrayBlockingQueue;
/**
 * @author :jiaolian
 * @date :Created in 2021-02-02 20:16
 * @description:ArrayBlockingQueue阻塞非阻塞测试
 * @modified By:
 * 公众号:叫练
 */
public class ArrayBlockingQueueTest {
    public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(1);
        arrayBlockingQueue.put("叫练");
        arrayBlockingQueue.put("叫练");
        //输出arrayBlockingQueue的长度
        System.out.println(arrayBlockingQueue.size());
    }
}

如上代码:ArrayBlockingQueue长度为1,通过put方法向队列添加2个元素,最后输出arrayBlockingQueue的长度是多少?答案是控制台一直运行,因为在添加第二个"叫练"时程序阻塞了。们说**出队和入队不能够让其继续执行的队列们称为阻塞,**add方法,poll方法,take方法们就不一一举例了,大家可以写代码做下最简单的测试!

好啦,们对几个方法做个总结吧!

  • 入队: offer:队列满了丢弃。 add :队列满了报错。 put :阻塞。
  • 出队: poll :如果队列为空则返回null。 take :阻塞。

ArrayBlockingQueue实现原理浅析


image.png

如上图,ArrayBlockingQueue 是用数组实现的,ReentrantLock独占锁控制数组的入队和出队。notEmpty,notFull是ReentrantLock的两个条件队列,用来控制队列是否进入阻塞状态,是生产者和消费者模型。下面们看看take,put方法流程,其他的方法同理。

  • take方法:多个线程 竞争 独占锁获取 items[ taskIndex ]队首元素 ,其中A线程 成功 获取锁,其他线程阻塞等待A线程执行完成释放锁,如果队列不为空,A线程获取items[ taskIndex ]元素返回移除并释放锁让其他阻塞线程继续竞争;如果队列为空,A线程调用notEmpty.await方法进入条件队列并释放锁 让其他阻塞线程继续竞争, 其他线程发现队列为空也会进入notEmpty条件队列,等待put线程入队通知notEmpty阻塞线程。
  • put方法: 多个线程 竞争 独占锁设置 items[putIndex ]队尾元素, 其中A线程 成功 获取锁,其他线程阻塞等待A线程执行完成释放锁,如果队列不满【队列长度】,A线程添加items[ putIndex ]元素返回并释放锁让其他阻塞线程继续竞争;如果队列满了,A线程调用notFull.await方法进入条件队列并释放锁 让其他阻塞线程继续竞争, 其他线程发现队列为空也会进入 notFull 条件队列,等待take线程出队通知 notFull 阻塞线程 

完全非阻塞队列ConcurrentLinkedQueue

ConcurrentLinkedQueue也实现了Queue接口,提供offer,add,poll方法都是非阻塞的,另外从名字可以看出,底层是链表结构,入队和出队用的是自旋的cas。

List 多线程安全方案:LinkedBlockingQueue

image.png LinkedBlockingQueue和ArrayBlockingQueue 类似,LinkedBlockingQueue是有界的,长度是Integer.MAX_VALUE **,**实现上,LinkedBlockingQueue是链表,而且是双锁,如上图所示,takeLock独占锁控制队列头部,putLock控制队列尾部,互不影响,目的是提高LinkedBlockingQueue的并发度。

总结


今天们介绍了并发队列重要的几个概念,整理出来希望能对你有帮助,写的比不全,同时还有许多需要修正的地方,希望亲们加以指正和点评,年前这段时间会继续输出线程池这些概念等。最后喜欢的请点赞加关注哦。点关注,不迷路, 叫练【公众号】,边叫边练。

参考书籍:《Java并发编程之美》

原文创作:叫练

原文链接:https://www.cnblogs.com/jiaolian/p/14365647.html

文章列表

更多推荐

更多
  • Azure上Linux管理-十、使用 Azure Kubernetes 服务 技术要求,开始使用 AKS,与 Helm 一起工作,使用草稿,管理 Kubernetes,问题,进一步,使用 WSL 和 VS Code,安装依赖,kubectl 安装,使用 Azure CLI 创建集群,AKS 首次部署,创建服务,多
    Apache CN

  • Azure上Linux管理-十一、故障排除和监控您的工作负载 module(load="imuxsock"),技术要求,访问您的系统,Azure 日志分析,性能监控,问题,进一步,不允许远程访问,正在端口上工作,使用 nftables,引导诊断,Linux 登录,配置日志分析服务,安装 Azure
    Apache CN

  • Azure上Linux管理-十二、附录 第一章:探索微软 Azure 云,第二章:Azure 云入门,第三章:Linux 基础管理,第 4 章:管理 Azure,第五章:高级 Linux 管理,第七章:部署虚拟机,第八章:探索连续配置自动化,第 9 章:Azure 中的容器虚
    Apache CN

  • Azure上Linux管理-九、Azure 中的容器虚拟化 cloudconfig,集装箱技术导论,系统生成,Docker,Azure 容器实例,Buildah, Podman, and Skopeo,容器和储存,问题,进一步,容器历史,chroot 环境,OpenVZ,LXC,创建一个带启动的
    Apache CN

  • Azure上Linux管理-七、部署你的虚拟机 ResourceGroup 不存在,创建它:,vnet 不存在,创建 vnet,cloudconfig,Vagrant.config 结束,部署场景,Azure 中的自动部署选项,初始配置,流浪,封隔器,自定义虚拟机和 vhd,问题,进
    Apache CN

  • Azure上Linux管理-八、探索持续配置自动化 了解配置管理,使用 Ansible,使用地球形态,使用 PowerShell DSC,Azure 策略客户端配置,其他解决方案,问题,进一步,技术要求,Ansible 的安装,SSH 配置,裸最小配置,库存文件,Ansible 剧本和模
    Apache CN

  • Azure上Linux管理-五、高级 Linux 管理 技术要求,软件管理,网络,存储,systemd,问题,进一步,RPM 软件管理器,YUM 软件管理,基于 DNF 的软件管理,DPKG 软件管理器,运用 apt 进行软件管理,ZYpp 软件管理,识别网络接口,IP 地址识别,显示路由表
    Apache CN

  • Azure上Linux管理-六、管理 Linux 安全与身份 SELINUX=可以接受以下三个值之一:,permissiveSELinux 打印警告而不是强制执行。,disabled 没有加载 SELinux 策略。,SELINUXTYPE=可以接受以下三个值之一:,targeted 目标进程
    Apache CN

  • Azure上Linux管理-四、管理 Azure 使用 Azure CLI 和 PowerShell 管理 Azure 资源,技术要求,管理存储资源,管理网络资源,管理计算资源,虚拟机资源,问题,进一步,存储帐户,托管磁盘,Azure 文件,Azure Blob,虚拟网络,子网,网络安
    Apache CN

  • Azure上Linux管理-三、Linux 基础管理 Linux Shell,获取帮助,使用文本文件,在文件系统中找到你的方式,流程管理,自由访问控制,问题,执行命令,命令行编辑,与历史一起工作,自动完成,球状,重定向,处理变量,Bash 配置文件,使用手册页,使用信息文档,其他文档,文本
    Apache CN

  • 近期文章

    更多
    文章目录

      推荐作者

      更多