线程间通信是使这些独立的线程个体成为系统间相交互的群体,大大提高CPU的利用率的同时也会对各线程处理过程进行有效把控

  • 如何使用wait/notify实现线程间通信
  • 生产者/消费者模式的实现
  • join方法的使用
  • ThreadLocal类的使用

wait/notify机制

线程间是可以相互通信和协作的,不使用任何特殊API,使用最原始的结构实现如下

传统方法,不使用wait机制

//不使用wait/notify
class A {
    private volatile int i;

    public void addI() {
        i++;
    }

    public int getI() {
        return this.i;
    }

    public static void main(String[] args) {
        A a = new A();
        Thread thread = new Thread(() -> {
            for (int i = 1; i < 10; i++) {
                a.addI();
                System.out.println("i递增" + i + "个元素");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        Thread thread1 = new Thread(() -> {
            try {
                for (; ; ) {
                    if (a.getI() == 8) {
                        System.out.println("i=8,exit!");
                        throw new InterruptedException();
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread.setName("A");
        thread1.setName("B");
        thread.start();
        thread1.start();
    }
}

注意try-catch代码块的位置,如果在循环里面那么永远也出不来
image.png

wait

需要实现的机制应该不是两个线程主动去访问花费大量时间操作同一个变量,而是当条件满足时主动告知消费者去进行消费
image.png

  • wait/notify是Object类的方法
    1.只能在同步代码块中调用wait方法,虽然可以不在同步代码块中写,但是没有对象锁会报RuntimeException。
    image.png
    2.同样的原因notify也需要在同步代码块中使用,需要先获得对象锁
    3.notify所在的线程notify之后并不会立刻释放锁给wait线程,而是执行完notify线程的剩余代码之后再释放。
    4.一次notify只能唤醒一个wait线程,其他等待的wait线程并不会被唤醒,顺序按照先wait先notify,但是具体还是取决于JVM实现
    5.notifyAll方法执行后,会唤醒所有线程,顺序不固定,取决于JVM的具体实现
  • 举例
    注意notify和wait的对象需要是同一个
    image.png

线程状态切换

image.png
1.new创建线程对象后调用start方法,系统会为此线程分配CPU资源,此时线程处于runnable状态,准备运行阶段
2.runnable状态和running状态可以切换,因为可能被其他线程抢占了CPU资源导致从running变为runnable

  • 调用start之后存在的状态:
    1.调用sleep方法之后超过指定的休眠时间
    2.线程成功获得同步监视器,即对象锁
    3.线程正在wait,其他线程notify
    4.处于挂起suspend的线程调用了resume恢复

3.线程阻塞blocked。遇到阻塞时,例如阻塞I/O,线程会由runnable变为blocked阻塞状态,等待I/O操作的结果。系统会把宝贵的CPU时间片分配给其他线程,当I/O结束后再进入runnable状态继续执行后面的任务

  • 阻塞存在的情况
    1.调用sleep主动放弃占用的CPU处理器资源,但并没有释放对象锁
    2.调用了阻塞式I/O,在方法返回前,该线程被阻塞
    3.试图获得对象锁,但此对象锁被其他线程持有
    4.等待notify
    5.调用了suspend挂起

4.run运行后进入销毁,线程执行完毕

wait、sleep、notify、notifyAll锁总结

执行同步代码块的过程中遇到异常导致线程终止,锁会被释放

  • wait
    wait会立即释放锁,释放锁后再对线程进行interrupt打断会出现InterruptedException异常
  • sleep
    sleep不会立即释放锁
  • notify
    notify不立即释放锁
    执行完notify方法后,按照执行wait方法的顺序唤醒其他线程,具体取决于JVM实现
  • notifyAll
    执行完notifyAll会唤醒通知所有线程,顺序具体取决于JVM实现

wait(long)方法

Object的wait(long)方法会在设定秒数内wait,如果没有被notify,那么其会自我notify。但前提是能获取对象锁,才能继续执行剩余的代码,否则只能继续等待对象锁

生产者/消费者模式的实现

注意notify和wait的操作对象需要是同一个

一生产一消费

因为String直接操作常量池,而new出来的对象也会改变,所以使用StringBuffer,其相关操作也是synchronized,锁方法所在this对象
image.png。最好不要使用String类作为对象锁,因为常量池的存在

//一生产,一消费
class B {
    private volatile CopyOnWriteArrayList<String> strings = new CopyOnWriteArrayList<>();

    public static void main(String[] args) {

        CopyOnWriteArrayList str = new B().strings;
        //生产者
        Thread supplier = new Thread(() -> {
            while (true) {
                synchronized (str) {
                    try {
                        if (0!=str.size()) {
                            str.wait();
                        }
                        String value = System.currentTimeMillis() + "_" + System.nanoTime();
                        str.add(value);
                        str.notify();
                        System.out.println("加入的值是" + str);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        //消费者
        Thread consumer = new Thread(() -> {
            while (true) {
                synchronized (str) {
                    if (0==str.size()) {
                        try {
                            str.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println("得到的值是" + str);
                    str.clear();
                    str.notify();
                    System.out.println("最后的值是" + str);

                }
            }
        });
        supplier.start();
        consumer.start();
    }
}

最后的结果如下
image.png

多生产多消费:假死情况

即所有线程均进入wating状态,程序不再执行任何业务功能,常出现在生产者/消费者模式中。
因为多生产和多消费者使用notify只能唤醒同一个,那么如果唤醒的是同一类,即同为生产者或者同为消费者,如果没有相关业务逻辑判断容易造成假死。解决办法可以加入相关逻辑判断或者使用NotifyAll

一生产多消费

//略

多生产一消费

//略

多生产

通过管道进行线程间通信

Java有各种输入/输出流,其中管道流(pipe stream)是一种特殊的流,用于在不同的线程间直接传送数据。
一个线程发送数据到管道,另一个线程从输入管道中读取数据,不需要借助于临时文件。
image.png

  • 字节流通信
class WriteData {
    public void writeMethod(PipedOutputStream out) {
        try (PipedOutputStream out1 = out) {
            System.out.println("write :");
            for (int i = 0; i < 300; i++) {
                String outData = "" + (i + 1);
                out1.write(outData.getBytes(StandardCharsets.UTF_8));
                System.out.print(outData);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

class ReadData {
    public void readMethod(PipedInputStream input) {
        try (PipedInputStream input1 = input) {
            System.out.println("read :");
            byte[] bytes = new byte[20];
            int read = input1.read(bytes);
            while (read != -1) {
                System.out.println(new String(bytes, 0, read));
                read = input1.read(bytes);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

class By {

    public static void main(String[] args) {
        WriteData writeData = new WriteData();
        ReadData readData = new ReadData();
        PipedInputStream inputStream = new PipedInputStream();
        PipedOutputStream outputStream = new PipedOutputStream();
        try {
            inputStream.connect(outputStream);
        } catch (IOException e) {
            e.printStackTrace();
        }
        Thread threadWrite = new Thread() {
            private WriteData write = writeData;
            private PipedOutputStream out = outputStream;

            @Override
            public void run() {
                write.writeMethod(out);
            }
        };
        Thread threadRead = new Thread() {
            private ReadData read = readData;
            private PipedInputStream input = inputStream;

            @Override
            public void run() {
                read.readMethod(input);
            }
        };
        threadWrite.start();
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println();
        threadRead.start();
    }
}

结果如下图所示
image.png

  • 字符流通信
    image.png
    同样地,也需要使用connect连接Reader和Writer,其他略

join()方法的使用

如果主线程创建的子线程之后,想等待子线程执行完毕之后再继续执行处理主线程代码可以使用join方法。

/**
 * @author jiangtao
 * @date 2021/10/7 21:28
 */
public class Join {
    public static void main(String[] args) {

        Thread thread = new Thread(() -> {
            long v = (long) (Math.random() * 10000);
            System.out.println(v / 1000);
            try {
                Thread.sleep(v);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread.start();
        System.out.println(LocalTime.now());
        try {
            thread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(LocalTime.now());
    }
}

结果如下
image.png

  • 其中join内部是调用wait方法实现等待的需要notify唤醒,和synchronized持有对象的锁同步不同
    image.png
  • 因为内部使用了wait放弃对象监视器即释放锁,所以和interrupt()方法使用会报InterruptedException异常

join(long)方法

设定等待的时间后,不管等待的线程是否执行完毕,只要时间达到,当前线程如果没有获得锁会一直尝试获得锁,且重新获得锁后会继续运行。类似与sleep(long),但与其有本质的不同
注意join(long)之后,等待的时间后如果线程还未执行完毕需要获得锁之后才能继续执行剩余代码,会争抢锁

join(long)和sleep(long)区别

  • join内部使用wait则会释放对象锁,sleep不会
  • join和sleep的使用场景目的不同

ThreadLocal的使用

线程Thread的共享变量可以在Thread类中static实现。
而每个线程Thread的私有变量可以通过ThreadLocal类来处理实现。
即ThreadLocal的static类ThreadLocalMap维护一个Map用于存放每个实例Thread对应的私有实例属性,其K为Thread对象,V为存储的值。
但是这些值还是存储在Thread类中,只是这些实例变量属性没有private,而为默认的,则其为default权限,意味着同包下的类可以访问。
image.png
image.png
此时如果想自己更改写一个同名包则可以直接引用其属性变量,IDE编译也不会报错

package java.lang;

/**
 * @author jiangtao
 * @date 2021/10/7 23:10
 */
public class Tset {
    public static void main(String[] args) {
        Thread thread = new Thread();
        int threadLocalRandomSecondarySeed = thread.threadLocalRandomSecondarySeed;
        System.out.println(threadLocalRandomSecondarySeed);
    }
}

最重结果如下ERROR,且报java.lang.SecurityException错
image.png
其源码里面ClassLoader是禁止加载java开头的包的
image.png

ThreadLocal初始化方法get返回null

image.png
其无参构造方法调用initialValue()方法,其为protect方法且返回null,可以通过子类继承ThreadLocal类后重写该方法,让其不返回null
注意的是ThreadLocal类存在的线程中的K子线程和父线程不一样,不通用,不能相互存取

InheritableThreadLocal类子父线程共享属性

可以让子父类线程共享值。
其子类InheritableThreadLocal只重写了3个方法
image.png
且没有重写set方法,但ThreadLocal类的set方法调用了其他2个方法
image.png
而其他2个方法调用的存储关系inheritableThreadLocals存储到Thread的实例变量,存储的对象属性已经改变


这个家伙很懒,啥也没有留下😋