使用 synchronized 关键字 和 java.util.concurrent.atomic 包来控制线程执行的顺序。使用 java.util.concurrent 集合和类,包括 CyclicBarrier 和 CopyOnWriteArrayList。
同步Synchronization
确保一次只有一个线程可以访问导致问题的代码,同步该代码块。
每个对象都有一个内置的锁,因为每个对象只有一个锁,所以在任何时候只有一个线程可以持有该锁。在第一个线程释放锁之前,其他线程无法获得锁。
通过在块或方法上使用 synchronized 关键字定义锁。当线程进入未被占用的同步块或方法时获取该锁。
synchronized关键字锁对象(引用变量)
Object o = new Object();
synchronized (o) { // Get the lock of Object o
// Code guarded by the lock
}
synchronized+this关键字锁代码块
// Get the lock of the object this code belongs to
synchronized (this) {
// Code guarded by the lock
}
synchronized 锁方法
锁属于在其上声明方法的对象,并在方法结束时释放,即锁的是调用该方法的对象
public synchronize void method() {
// Code guarded by the lock
}
其等价于
public void method() {
synchronized (this) {
// Code guarded by the lock
}
}
synchronized 锁静态static方法
静态代码也可以是同步的,但是它不是用来获取类实例的锁,而是在类的每个实例都关联的单个class类
public void method() {
synchronized (this) {
// Code guarded by the lock
}
}
其等价于
class MyClass {
synchronized static void method() {
/** .. */
}
}
- 示例
static int n = 0;
static int n1=0;
static void add() {
n++;
System.out.println(n);
}
static synchronized void add1() {
n1++;
System.out.println(n1+"synchronized");
}
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(4);
Runnable r = () -> add();
Runnable r1=()->add1();
for(int i = 0; i < 4; i++) {
service.execute(r);
service.execute(r1);
}
service.shutdown();
}
加了synchronized同步的静态方法执行不存在并发问题,没有加锁的方法存在并发问题
Atomic Classes原子类
java.concurrent.atomic包含执行原子操作的类,这些操作在单个步骤中执行,不受多线程的干预。
每个类都有以原子方式执行增量或更新等操作的方法。
但并不能保证并发问题,原子类只能保证数据的一致性
- 例如AtomicInteger类
static AtomicInteger n2 = new AtomicInteger(0);
static void add2() {
System.out.println(n2.incrementAndGet()+"Atomic");
}
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(4);
Runnable r = () -> add();
Runnable r1 = () -> add1();
Runnable r2 = () -> add2();
for (int i = 0; i < 4; i++) {
service.execute(r);
service.execute(r1);
service.execute(r2);
}
service.shutdown();
得到所有的数字,但不是按顺序排列的,因为原子类只能保证数据的一致性
并发集合
java.util.concurrent包提供了一些线程安全的类,等价于java.util的集合类。不需要显式地同步操作,且性能优于synchronized关键字
即
Map<String, Integer> map = new HashMap<>();
...
void putIfNew(String key, Integer val) {
if(map.get(key) == null) {
map.put(key, val);
}
}
可以写为
Map<String, Integer> map = new ConcurrentHashMap<>();
...
map.putIfAbsent(key, val);
Java的JUC并发包提供了许多并发集合,根据它们实现的接口来对它们进行分类:
- BlockingQueue
- BlockingDeque
- ConcurrentMap
- ConcurrentNavigableMap
由于这些接口是从java.util.Collection接口扩展而来的,它们继承了我们已经知道的方法,因此在JUC接口中这里您只能找到支持并发的添加方法。对于列表,您可以使用类copyonwriterraylist。
BlockingQueue接口
一个线程安全队列,如果队列是空的,则等待插入元素(带有可选的超时) ; 如果队列已满,则等待删除元素:
- 插入
void put(E e)
boolean offer(E e, long timeout, TimeUnit unit)
- 移除
E take()
E poll(long timeout, TimeUnit unit)
实现类
- ArrayBlockingQueue
由数组支持的有界阻塞队列 - DelayQueue
无限制的阻塞队列元素,其中一个元素只能在其延迟到期后才能使用 - LinkedBlockingQueue
基于链接节点的可选有界阻塞队列 - LinkedTransferQueue
基于连接节点 - PriorityBlockingQueue
无界阻塞队列 - SynchronousQueue
阻塞队列,其中每个插入操作必须等待另一个线程的相应删除操作,反之亦然
BlockingDeque接口
继承自Deque接口
它表示一个线程安全的 deque (一个双端队列,一个队列,你可以从两端插入和获取元素)。
ConcurrentMap接口
线程安全的map,实现类有ConcurrentHashMap
默认default方法
- 原子地使用 BiFunction 计算指定键的值
V compute(K key, BiFunction<? super K,? super V,? extends V> remappingFunction)
是接口的默认default实现方法
- 只有当k不存在于映射中时,原子地计算其值
V computeIfAbsent(K key, Function<? super K,? extends V> mappingFunction)
- 只有键出现在映射中时,才能原子计算它的值
V computeIfPresent(K key, BiFunction<? super K,? super V,? extends V> remappingFunction)
- 返回键的值,如果键不存在,则返回默认值
V getOrDefault(Object key, V defaultValue)
ConcurrentNavigableMap接口
线程安全的 NavigableMap (像 TreeMap 一样) ,由 ConcurrentSkipListMap 类实现,根据键的自然顺序排序,或者由其构造函数中提供的比较器进行排序
CopyOnWriteArrayList类
线程安全的 List,类似于 ArrayList,但是当它被修改时(使用 add ()或 set ()等方法) ,就会创建一个底层数组的新副本(因此得名)
当迭代 ArrayList 时,像 remove ()、 add ()或 set ()这样的方法可以抛出一个 java.util.ConcurrentModificationException。对于 copyonwriteraylist,不会引发此异常,因为迭代器使用的是未修改的列表副本。但这也意味着不支持调用 Iterator 上的 remove ()方法(它抛出一个 UnsupportedOperationException)。
- 列表的最后一个元素在每次迭代中都会改变,但原始值仍在迭代器中打印。迭代器使用的是未修改的列表副本,所以值虽然改变但迭代里面的也是打印出的原始值
List<Integer> list = new CopyOnWriteArrayList<>();
list.add(10); list.add(20); list.add(30);
Iterator<Integer> it = list.iterator();
while(it.hasNext()) {
int i = it.next();
System.out.print(i + " ");
// No exception thrown
list.set(list.size() -1, i * 10);
// it.remove(); throws an exception
}
System.out.println(list);
- 使用 copyonwriearraylist,对读操作没有锁定,因此这个操作更快。出于这个原因,copyonwriearraylist 在更新和插入很少、并发读取很多的情况下非常有用。
ConcurrentSkipListSet类
线程安全的 NavigableSet (比如 TreeSet)
CopyOnWriteArraySet类
一个线程安全的 Set,并在内部使用一个 CopyOnWriteArrayList 对象执行所有操作(因此这些类非常类似)
Collections工具类包装同步普通集合
static <T> Collection<T> synchronizedCollection(Collection<T> c)
static <T> List<T> synchronizedList(List<T> list)
static <K,V> Map<K,V> synchronizedMap(Map<K,V> m)
static <T> Set<T> synchronizedSet(Set<T> s)
在通过 Iterator 或 Stream 遍历这些集合时,需要手动同步这些集合
Collection c =
Collections.synchronizedCollection(myCollection);
...
synchronized (c) {
Iterator i = c.iterator();
...
}
如果其在迭代中被修改,它们也会抛出异常。
只有在必须在并发环境中处理现有集合时才使用这些方法(否则,从一开始就使用 java.util.concurrent包集合)。
CyclicBarrier
Synchronized 关键字帮助我们协调多个线程对共享资源的访问。
协调复杂的并发任务需要花费大量的精力。Java 提供了高级类来更容易地实现这些类型的同步任务。
其中一个类是 CyclicBarrier。它提供了一个同步点(一个障碍点) ,在这里线程可能需要等待,直到所有其他线程也到达这个点。
构造函数
- 创建一个cyclibarrier具有指定数量等待它的线程的
CyclicBarrier(int threads)
- 创建一个CyclicBarrier (int parties,Runnable barrierAction)具有指定数量的线程等待的
CyclicBarrier(int parties, Runnable barrierAction)
方法
阻塞一个线程,直到所有其他线程都调用了 await ()(到达 barrier) ,或者可选地,直到指定的等待时间过去(发生这种情况时,会抛出 TimeoutException)。
如果当前线程在等待时被中断,这些方法会抛出一个 InterruptedException; 如果另一个线程被中断或超时,或者屏障被重置(使用 reset ()方法) ,或者屏障操作由于异常而失败,则会抛出一个 BrokenBarrierException。
- 举例
class CyclicBarrier1 {
static void checkStep(
CyclicBarrier barrier, String step) {
// Do something to prepare the step
System.out.println(step + " is ready");
try {
// Wait the other threads
barrier.await();
} catch (Exception e) { /** ... */}
}
public static void main(String[] args) {
String[] steps = {"Read the recipe",
"Gather the ingredients",
"Wash hands"};
System.out.println("Preparing everything:");
Runnable allSet = () ->
System.out.println("Everything's ready. Let's begin.");
ExecutorService executor =
Executors.newFixedThreadPool(steps.length);
CyclicBarrier barrier =
new CyclicBarrier(steps.length, allSet);
for (String step : steps) {
executor.submit(
() -> checkStep(barrier, step)
);
}
executor.shutdown();
}
}
总结
- 同步与锁一起工作。每个对象都有一个内置锁,因为每个对象只有一个锁,所以任何时候只有一个线程可以持有该锁。通过在块或方法中使用synchronized关键字来获取锁。
- 静态代码也可以同步,但不是使用它来获取类实例的锁,而是在每个类都关联的class获取。
- java.util.concurrent.atomic包包含用于执行原子操作的类(如AtomicInteger),这些操作在一个步骤中执行,不需要超过个线程的干预。
- BlockingQueue表示线程安全队列,BlockingDeque表示线程安全队列。如果队列/队列为空,则等待插入元素(可选超时),如果队列/队列已满,则等待删除元素。
- ConcurrentMap表示线程安全映射,它由ConcurrentHashMap类实现。
- ConcurrentNavigableMap表示线程安全的NavigableMap(如TreeMap),由ConcurrentSkipListMap类实现。
- CopyOnWriteArrayList表示线程安全列表,类似于ArrayList,但是当它被修改(使用add()或set()等方法)时,将创建基础数组的新副本(因此得名)。
- CyclicBarrier,提供一个同步点(障碍点),线程可能需要在这里等待,直到所有其他线程到达该点。
Comments | 0 条评论