使用 synchronized 关键字 和 java.util.concurrent.atomic 包来控制线程执行的顺序。使用 java.util.concurrent 集合和类,包括 CyclicBarrier 和 CopyOnWriteArrayList。

同步Synchronization

确保一次只有一个线程可以访问导致问题的代码,同步该代码块。
每个对象都有一个内置的锁,因为每个对象只有一个锁,所以在任何时候只有一个线程可以持有该锁。在第一个线程释放锁之前,其他线程无法获得锁。
通过在块或方法上使用 synchronized 关键字定义锁。当线程进入未被占用的同步块或方法时获取该锁。

synchronized关键字锁对象(引用变量)

Object o = new Object();

synchronized (o) { // Get the lock of Object o
    // Code guarded by the lock
}

synchronized+this关键字锁代码块

// Get the lock of the object this code belongs to
synchronized (this) {
   // Code guarded by the lock
}

synchronized 锁方法

锁属于在其上声明方法的对象,并在方法结束时释放,即锁的是调用该方法的对象

public synchronize void method() {
    // Code guarded by the lock
}

其等价于

public void method() {
    synchronized (this) {
        // Code guarded by the lock
    }
}

synchronized 锁静态static方法

静态代码也可以是同步的,但是它不是用来获取类实例的锁,而是在类的每个实例都关联的单个class类

public void method() {
    synchronized (this) {
        // Code guarded by the lock
    }
}

其等价于

class MyClass {
    synchronized static void method() {
        /** .. */
    }
}
  • 示例
static int n = 0;
    static int n1=0;
    static void add() {
        n++;
        System.out.println(n);
    }
    static synchronized void add1() {
        n1++;
        System.out.println(n1+"synchronized");
    }
    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(4);
        Runnable r = () -> add();
        Runnable r1=()->add1();
        for(int i = 0; i < 4; i++) {
            service.execute(r);
            service.execute(r1);
        }
        service.shutdown();
    }

image.png
加了synchronized同步的静态方法执行不存在并发问题,没有加锁的方法存在并发问题

Atomic Classes原子类

java.concurrent.atomic包含执行原子操作的类,这些操作在单个步骤中执行,不受多线程的干预。
image.png
每个类都有以原子方式执行增量或更新等操作的方法。
但并不能保证并发问题,原子类只能保证数据的一致性

  • 例如AtomicInteger类
static AtomicInteger n2 = new AtomicInteger(0);
    static void add2() {
        System.out.println(n2.incrementAndGet()+"Atomic");
    }
    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(4);
        Runnable r = () -> add();
        Runnable r1 = () -> add1();
        Runnable r2 = () -> add2();
        for (int i = 0; i < 4; i++) {
            service.execute(r);
            service.execute(r1);
            service.execute(r2);
        }
        service.shutdown();

image.png
得到所有的数字,但不是按顺序排列的,因为原子类只能保证数据的一致性

并发集合

java.util.concurrent包提供了一些线程安全的类,等价于java.util的集合类。不需要显式地同步操作,且性能优于synchronized关键字

Map<String, Integer> map = new HashMap<>();
...
void putIfNew(String key, Integer val) {
    if(map.get(key) == null) {
        map.put(key, val);
    }
}

可以写为

Map<String, Integer> map = new ConcurrentHashMap<>();
...
map.putIfAbsent(key, val);

Java的JUC并发包提供了许多并发集合,根据它们实现的接口来对它们进行分类:

  • BlockingQueue
  • BlockingDeque
  • ConcurrentMap
  • ConcurrentNavigableMap
    由于这些接口是从java.util.Collection接口扩展而来的,它们继承了我们已经知道的方法,因此在JUC接口中这里您只能找到支持并发的添加方法。对于列表,您可以使用类copyonwriterraylist。

BlockingQueue接口

一个线程安全队列,如果队列是空的,则等待插入元素(带有可选的超时) ; 如果队列已满,则等待删除元素:

  • 插入
void put(E e)
boolean offer(E e, long timeout, TimeUnit unit)
  • 移除
E take()
E poll(long timeout, TimeUnit unit)

实现类

image.png

  • ArrayBlockingQueue
    由数组支持的有界阻塞队列
  • DelayQueue
    无限制的阻塞队列元素,其中一个元素只能在其延迟到期后才能使用
  • LinkedBlockingQueue
    基于链接节点的可选有界阻塞队列
  • LinkedTransferQueue
    基于连接节点
  • PriorityBlockingQueue
    无界阻塞队列
  • SynchronousQueue
    阻塞队列,其中每个插入操作必须等待另一个线程的相应删除操作,反之亦然

BlockingDeque接口

继承自Deque接口
它表示一个线程安全的 deque (一个双端队列,一个队列,你可以从两端插入和获取元素)。

ConcurrentMap接口

线程安全的map,实现类有ConcurrentHashMap
image.png

默认default方法

  • 原子地使用 BiFunction 计算指定键的值
V compute(K key, BiFunction<? super K,? super V,? extends V> remappingFunction)

是接口的默认default实现方法
image.png

  • 只有当k不存在于映射中时,原子地计算其值
V computeIfAbsent(K key, Function<? super K,? extends V> mappingFunction)
  • 只有键出现在映射中时,才能原子计算它的值
V computeIfPresent(K key, BiFunction<? super K,? super V,? extends V> remappingFunction)
  • 返回键的值,如果键不存在,则返回默认值
V getOrDefault(Object key, V defaultValue)

ConcurrentNavigableMap接口

线程安全的 NavigableMap (像 TreeMap 一样) ,由 ConcurrentSkipListMap 类实现,根据键的自然顺序排序,或者由其构造函数中提供的比较器进行排序
image.png

CopyOnWriteArrayList类

image.png
线程安全的 List,类似于 ArrayList,但是当它被修改时(使用 add ()或 set ()等方法) ,就会创建一个底层数组的新副本(因此得名)
当迭代 ArrayList 时,像 remove ()、 add ()或 set ()这样的方法可以抛出一个 java.util.ConcurrentModificationException。对于 copyonwriteraylist,不会引发此异常,因为迭代器使用的是未修改的列表副本。但这也意味着不支持调用 Iterator 上的 remove ()方法(它抛出一个 UnsupportedOperationException)。

  • 列表的最后一个元素在每次迭代中都会改变,但原始值仍在迭代器中打印。迭代器使用的是未修改的列表副本,所以值虽然改变但迭代里面的也是打印出的原始值
List<Integer> list = new CopyOnWriteArrayList<>();
list.add(10); list.add(20); list.add(30);
Iterator<Integer> it = list.iterator();
while(it.hasNext()) {
    int i = it.next();
    System.out.print(i + " ");
    // No exception thrown
    list.set(list.size() -1, i * 10);
    // it.remove(); throws an exception
}
System.out.println(list);

image.png
image.png

  • 使用 copyonwriearraylist,对读操作没有锁定,因此这个操作更快。出于这个原因,copyonwriearraylist 在更新和插入很少、并发读取很多的情况下非常有用。

ConcurrentSkipListSet类

线程安全的 NavigableSet (比如 TreeSet)

CopyOnWriteArraySet类

一个线程安全的 Set,并在内部使用一个 CopyOnWriteArrayList 对象执行所有操作(因此这些类非常类似)

Collections工具类包装同步普通集合

image.png
image.png

static <T> Collection<T> synchronizedCollection(Collection<T> c)
static <T> List<T> synchronizedList(List<T> list)
static <K,V> Map<K,V> synchronizedMap(Map<K,V> m)
static <T> Set<T> synchronizedSet(Set<T> s)

在通过 Iterator 或 Stream 遍历这些集合时,需要手动同步这些集合

Collection c =
   Collections.synchronizedCollection(myCollection);
...
synchronized (c) {
   Iterator i = c.iterator();
   ...
}

如果其在迭代中被修改,它们也会抛出异常。
只有在必须在并发环境中处理现有集合时才使用这些方法(否则,从一开始就使用 java.util.concurrent包集合)。

CyclicBarrier

Synchronized 关键字帮助我们协调多个线程对共享资源的访问。
协调复杂的并发任务需要花费大量的精力。Java 提供了高级类来更容易地实现这些类型的同步任务。
其中一个类是 CyclicBarrier。它提供了一个同步点(一个障碍点) ,在这里线程可能需要等待,直到所有其他线程也到达这个点。

构造函数

  • 创建一个cyclibarrier具有指定数量等待它的线程的
CyclicBarrier(int threads)
  • 创建一个CyclicBarrier (int parties,Runnable barrierAction)具有指定数量的线程等待的
CyclicBarrier(int parties, Runnable barrierAction)

方法

阻塞一个线程,直到所有其他线程都调用了 await ()(到达 barrier) ,或者可选地,直到指定的等待时间过去(发生这种情况时,会抛出 TimeoutException)。
如果当前线程在等待时被中断,这些方法会抛出一个 InterruptedException; 如果另一个线程被中断或超时,或者屏障被重置(使用 reset ()方法) ,或者屏障操作由于异常而失败,则会抛出一个 BrokenBarrierException。

  • 举例
class CyclicBarrier1 {
    static void checkStep(
            CyclicBarrier barrier, String step) {
        // Do something to prepare the step
        System.out.println(step + " is ready");
        try {
            // Wait the other threads
            barrier.await();
        } catch (Exception e) { /** ... */}
    }
    public static void main(String[] args) {
        String[] steps = {"Read the recipe",
                "Gather the ingredients",
                "Wash hands"};
        System.out.println("Preparing everything:");
        Runnable allSet = () ->
                System.out.println("Everything's ready. Let's begin.");
        ExecutorService executor =
                Executors.newFixedThreadPool(steps.length);
        CyclicBarrier barrier =
                new CyclicBarrier(steps.length, allSet);
        for (String step : steps) {
            executor.submit(
                    () -> checkStep(barrier, step)
            );
        }
        executor.shutdown();
    }
}

image.png

总结

  • 同步与锁一起工作。每个对象都有一个内置锁,因为每个对象只有一个锁,所以任何时候只有一个线程可以持有该锁。通过在块或方法中使用synchronized关键字来获取锁。
  • 静态代码也可以同步,但不是使用它来获取类实例的锁,而是在每个类都关联的class获取。
  • java.util.concurrent.atomic包包含用于执行原子操作的类(如AtomicInteger),这些操作在一个步骤中执行,不需要超过个线程的干预。
  • BlockingQueue表示线程安全队列,BlockingDeque表示线程安全队列。如果队列/队列为空,则等待插入元素(可选超时),如果队列/队列已满,则等待删除元素。
  • ConcurrentMap表示线程安全映射,它由ConcurrentHashMap类实现。
  • ConcurrentNavigableMap表示线程安全的NavigableMap(如TreeMap),由ConcurrentSkipListMap类实现。
  • CopyOnWriteArrayList表示线程安全列表,类似于ArrayList,但是当它被修改(使用add()或set()等方法)时,将创建基础数组的新副本(因此得名)。
  • CyclicBarrier,提供一个同步点(障碍点),线程可能需要在这里等待,直到所有其他线程到达该点。

这个家伙很懒,啥也没有留下😋