123456790 发表于 2022-3-26 10:29:49

Java BlockingQueue阻塞队列是线程安全的吗 是线程安全的

welcome to my blog

问题描述 Java BlockingQueue 阻塞队列的take()和put()方法是线程安全的吗? 多线程下调用take()或者put()方法会出问题吗?

看了BlockingQueue的三个实现类, 发现对应的方法中都使用了锁, 所以不会出现线程安全问题

ArrayBlockingQueue

        //ArrayBlockingQueue的put()方法    public void put(E e) throws InterruptedException {      Objects.requireNonNull(e);      final ReentrantLock lock = this.lock;      //获取锁      lock.lockInterruptibly();      try {            while (count == items.length)                    //队列满了, 进入阻塞状态               notFull.await();            enqueue(e);      } finally {            //释放锁            lock.unlock();      }    }//ArrayBlockingQueue的take()方法    public E take() throws InterruptedException {      final ReentrantLock lock = this.lock;      //获取锁      lock.lockInterruptibly();      try {            while (count == 0)                    //队列为空,进入阻塞状态                notEmpty.await();            //弹出元素            return dequeue();      } finally {              //释放锁            lock.unlock();      }    } LinkedBlockingQueue

        //LinkedBlockingQueue的put()方法    public void put(E e) throws InterruptedException {      if (e == null) throw new NullPointerException();      final int c;      final Node node = new Node(e);      final ReentrantLock putLock = this.putLock;      final AtomicInteger count = this.count;      //获取锁      putLock.lockInterruptibly();      try {            /*             * Note that count is used in wait guard even though it is             * not protected by lock. This works because count can             * only decrease at this point (all other puts are shut             * out by lock), and we (or some other waiting put) are             * signalled if it ever changes from capacity. Similarly             * for all other uses of count in other wait guards.             */            while (count.get() == capacity) {                notFull.await();            }            //元素入队            enqueue(node);            c = count.getAndIncrement();            if (c + 1 < capacity)                notFull.signal();      } finally {                //释放锁            putLock.unlock();      }      if (c == 0)            signalNotEmpty();    }        //LinkedBlockingQueue的take()方法    public E take() throws InterruptedException {      final E x;      final int c;      final AtomicInteger count = this.count;      final ReentrantLock takeLock = this.takeLock;      //获取锁      takeLock.lockInterruptibly();      try {            while (count.get() == 0) {                notEmpty.await();            }            x = dequeue();            c = count.getAndDecrement();            if (c > 1)                notEmpty.signal();      } finally {                //释放锁            takeLock.unlock();      }      if (c == capacity)            signalNotFull();      return x;    } PriorityBlockingQueue

//PriorityBlockingQueue的put()方法public void put(E e) {                //锁的操作在offer()方法中      offer(e); // never need to block    }//PriorityBlockingQueue的offer()方法public boolean offer(E e) {      if (e == null)            throw new NullPointerException();      final ReentrantLock lock = this.lock;      //获取锁      lock.lock();      int n, cap;      Object[] es;      while ((n = size) >= (cap = (es = queue).length))            tryGrow(es, cap);      try {            final Comparator
页: [1]
查看完整版本: Java BlockingQueue阻塞队列是线程安全的吗 是线程安全的