java里的Thread可以启动一个Runnable:
- class MyRunnable implements Runnable {
- public void run() {
- System.out.println("OK");
- }
- }
- void testRunnable() {
- Thread t = new Thread(new MyRunnable());
- t.start();
- }
Thread本身也实现了
Runnable:
- class MyThread extends Thread {
- public void run() {
- System.out.println("OK");
- }
- }
- void testThread() {
- Thread t = new MyThread();
- t.start();
- }
你可以设置Thread的
优先级:
- Thread t1 = new MyThread();
- t1.setPriority(Thread.MAX_PRIORITY);
- Thread t2 = new MyThread();
- t2.setPriority(Thread.NORM_PRIORITY);
- Thread t3 = new MyThread();
- t3.setPriority(Thread.MIN_PRIORITY);
也可以把一个线程设为
后台线程(daemon):
- Thread t = new MyThread();
- t.setDaemon(true);
- t.start();
一个程序会等待所有非后台线程运行结束后才会退出,而后台线程会在所有非后台线程运行结束后立即终止。
调用Thread.join()方法可以等待某线程的结束:
- Thread t = new MyThread();
- t.start();
- t.join();
- System.out.println("Thread quits");
线程可以通过yield来放弃当前的时间片,通过sleep来睡眠一段时间:
- class MyRunnable implements Runnable {
- public void run() {
- try {
- for (int n = 0; n < 1000000; ++n) {
- if (n % 50000 == 0)
- Thread.yield();
- else if (n % 20000 == 0)
- Thread.sleep(500);
- else if (n % 1000 == 0)
- TimeUnit.MICROSECONDS.sleep(200);
- }
- } catch (InterruptedException e) {}
- }
- }
使用Executors来使用线程池:
- ExecutorService es = Executors.newCachedThreadPool();
- for (int i = 0; i < 5; ++i)
- es.execute(new MyRunnable());
上面的代码等价于创建了5个Thread并调用它们的start。你可以提供工厂类(
ThreadFactory)来定制创建的Thread:
- ExecutorService es = Executors.newCachedThreadPool(new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread();
- t.setDaemon(true);
- return t;
- }
- });
- es.execute(new MyRunnable());
cachedThreadPool可以根据需要创建线程,当一个线程结束时并不释放它而是把它回收到池里,以便下次使用。而
fixedThreadPool一次性创建固定数量的线程,如果某任务需要线程时而池里没有剩余线程时,则该任务要等待直接某个线程运行结束才能获得线程。给出一个例子:
- void testFixedThreadPool() {
- class InnerRunnable implements Runnable {
- InnerRunnable(int id) {
- this.id = id;
- }
- public void run() {
- for (int i = 0; i < 10; ++i)
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {}
- System.out.println("thread-" + id + " ends");
- }
- private int id;
- }
- ExecutorService es = Executors.newFixedThreadPool(5);
- for (int i = 0; i < 10; ++i)
- es.execute(new InnerRunnable(i));
- }
另一种线程池是
singleThreadPool,它等同于线程数为1的FixedThreadPool。你可以用
Executors.newSingleThreadExecutor()来创建。
Runnable不能返回值,如果需要返回值,使用Callable,并通过Executors.submit来启动线程:
- class MyThread extends Thread {
- public void run() {
- System.out.println("OK");
- }
- }
- void testCallable() throws InterruptedException, ExecutionException {
- ExecutorService es = Executors.newCachedThreadPool();
- Future<Integer> f = es.submit(new MyCallable());
- int i = f.get();
- System.out.println(i); // 10
- }
线程里的异常是不能被其它线程捕获的。比如下面的代码是不能工作的:
- void testThreadException() {
- try {
- new Thread() {
- public void run() {
- throw new RuntimeException();
- }
- }.start();
- } catch (RuntimeException re) {
- System.out.println("never be called!!");
- }
- }
如果要捕获线程抛出的异常,调用
Thread.setUncaughtExceptionHandler:
- void testUncaughtException() {
- Thread t = new Thread() {
- public void run() {
- throw new RuntimeException();
- }
- };
- t.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
- public void uncaughtException(Thread t, Throwable e) {
- System.out.println("Catch the exception!!");
- }
- });
- t.start();
- }
为了制造出线程间的共享资源的情况,下面给出一个例子:
- class MyClass{
- void print(int id) {
- for (int i = 0; i < 5; ++i)
- {
- System.out.print(id + " ");
- Thread.yield();
- }
- }
-
- class MyThread extends Thread {
- MyThread(int id) { this.id = id; }
- public void run() {
- print(id);
- }
-
- int id;
- }
-
- static void testSynchronized() {
- MyClass mc = new MyClass();
- for (int i = 0; i < 5; ++i) {
- mc.new MyThread(i).start();
- }
- }
- }
上面的5个线程共享同一个MyClass对象,并调用它的testSynchronized方法。该方法会输出类似于以下的字符串:1 3 0 2 4 0 2 4 0 2 4 0 2 4 0 2 4 1 1 1 1 3 3 3 3 ,可见print方法被其它线程中断。为了同步print方法,可以用
synchroized关键字修饰上面的print方法:
- synchronized void print(int id) {...}
这样输出的字符串会类似于:0 0 0 0 0 4 4 4 4 4 2 2 2 2 2 3 3 3 3 3 1 1 1 1 1 ,说明print已经成了一个原子操作。其实synchronized等价于给整个方法加了一个锁,我们完全可以用锁来实现:
- class MyClass {
- Lock l = new ReentrantLock();
- void print_lock(int id) {
- l.lock();
- for (int i = 0; i < 5; ++i)
- {
- System.out.print(id + " ");
- Thread.yield();
- }
- l.unlock();
- }
- }
我们可以用trylock来防止阻塞:
- void print_trylock(int id) {
- try {
- boolean b = l.tryLock(300, TimeUnit.MILLISECONDS);
- if (b) {
- for (int i = 0; i < 5; ++i)
- {
- System.out.print(id + " ");
- Thread.yield();
- }
- l.unlock();
- }
- } catch (InterruptedException e) {}
- }
很明显还是synchroized来得简单些。synchronized也可以同步某一段代码,而不是整个方法:
- void print_synchronized_part(int id) {
- synchronized(this) {
- for (int i = 0; i < 5; ++i)
- {
- System.out.print(id + " ");
- Thread.yield();
- }
- }
- }
synchronized可以接受一个Object作为参数,要注意需要同步的参数必须是同一个对象,比如两个方法f() { synchronized(object_one) {} }和g() { synchronized(object_one){} }是可以同步的,但f() { synchronized(object_one) {} } 和 g() {synchronized(object_two) {} }是不能同步的。
现在我们再来模拟一下多个线程同时对同一资源进行修改的情况。下面的程序让两个线程分别对共享的整型数加N次和减N次,预期的正确结果是当两个线程运行结束后,整型值应该为0:
- class MyClass{
- int resource;
- void changeValue(boolean increase) {
- for (int i = 0; i < 600000; ++i)
- if (increase)
- ++resource;
- else
- --resource;
- }
- class InnerThread extends Thread {
- public InnerThread(boolean increase) {this.increase = increase;};
- private boolean increase;
- public void run() {
- changeValue(increase);
- }
- }
- static void testAtomicOperation() throws InterruptedException {
- MyClass mc = new MyClass();
- Thread t1 = mc.new InnerThread(true);
- Thread t2 = mc.new InnerThread(false);
- t1.start();
- t2.start();
- t1.join();
- t2.join();
-
- System.out.println(mc.resource); // might be 321296, but it's supposed to be zero
- }
- }
java提供了一个关键字
volatile,它的本意是表示某个变量的修改操作是原子操作,不需要同步的,
但是它是不工作的。你可以在上面的代码里给resource加入volatile修饰,但得到的结果仍可能不是0。
- volatile int resource = 0; // thread unsafe
可以使用
AtomicInteger来表示原子操作。修改上例的部份代码:
- AtomicInteger resource = new AtomicInteger(0);
- void changeValue(boolean increase) {
- for (int i = 0; i < 600000; ++i)
- if (increase)
- resource.addAndGet(1);
- else
- resource.addAndGet(-1);
- }
你还可以在
java.util.concurrent.atomic包里找到其它的原子操作的类,比如
AtomicBoolean和
AtomicLong等。
其实线程冲突的最根本的原因有共享的资源,如果每个线程都有自己的变量,那自然不会有冲突。我们可以用
ThreadLocal定义这样的变量:
- ThreadLocal<Integer> resource = new ThreadLocal<Integer>() {
- @Override
- protected Integer initialValue() {
- return 0;
- }
- };
- void changeValue(boolean increase) {
- for (int i = 0; i < 600000; ++i)
- resource.set(resource.get() + 1);
- System.out.println(resource.get()); // each thread should shows 600000
- }
注意一定要覆盖
ThreadLocal的initialValue方法,否则你的变量(resource)在新线程里会为null。
线程有四种状态:新建(new)、就绪(runnable)、阻塞(blocked)和死亡(dead)。当线程睡眠、等待、申请锁或响应I/O时,可以从就绪状态进入阻塞状态。睡眠和等待是可以被中断的。当被中断时,会得到一个InterruptException。下面中断一个sleep操作:
- void testInterruptSleep() throws InterruptedException {
- Thread t = new Thread() {
- public void run() {
- long time = System.currentTimeMillis();
- try {
- Thread.sleep(100000);
- } catch (InterruptedException e) {
- System.out.println("Interrupt!!");
- } finally {
- time -= System.currentTimeMillis();
- System.out.println("Slept for " + Math.abs(time) + " milliseconds");
- }
- }
- };
- t.start();
- Thread.sleep(2000);
- t.interrupt();
- }
等待(wait)与
睡眠(sleep)的区别在于,等待的线程可以被
通知(notify)或
信号(signal)唤醒。同时线程在等待时会放弃它获得的锁,而睡眠不会,所以wait操作必须和同步一起使用。下面用Future对象来中断一个等待阻塞:
- void testInterruptWait() throws InterruptedException {
- Runnable r = new Runnable() {
- public void run() {
- long time = System.currentTimeMillis();
- try {
- synchronized (this) {
- wait(8888);
- }
- } catch (InterruptedException e) {
- System.out.println("Interrupt!!");
- } finally {
- time -= System.currentTimeMillis();
- System.out.println("Slept for " + Math.abs(time) + " milliseconds");
- }
- }
- };
- ExecutorService es = Executors.newCachedThreadPool();
- Future<?> f = es.submit(r);
- Thread.sleep(2000);
- f.cancel(true); // interrupt it
- }
我们用它去
中断一个处于就绪状态的线程:
- void testInterrupted() throws InterruptedException {
- Runnable r = new Runnable() {
- public void run() {
- while (!Thread.interrupted()) {
- System.out.println("I'm still alive");
- Thread.yield();
- }
- System.out.println("interrupted!!");
- }
- };
- ExecutorService es = Executors.newCachedThreadPool();
- Future<?> f = es.submit(r);
- Thread.sleep(2000);
- es.shutdownNow();
- }
线程可以通过判断Thread.interrupted来确定自己是否被中断。ExecutorService.shutdownNow()可以中断线程池里所有的阻塞线程。
下面给个例子看怎么样使用wait和notify:
- class NotifyTester {
- synchronized void turnOn() throws InterruptedException {
- while (!Thread.interrupted()) {
- while (on)
- wait();
- on = true;
- System.out.println("turn on!!");
- notify();
- }
- }
-
- synchronized void turnOff() throws InterruptedException {
- while (!Thread.interrupted()) {
- while (!on)
- wait();
- on = false;
- System.out.println("turn off!!");
- notify();
- }
- }
-
- class TurnOnThread extends Thread {
- public void run() {
- try {
- turnOn();
- } catch (InterruptedException e) {}
- }
- }
-
- class TurnOffThread extends Thread {
- public void run() {
- try {
- turnOff();
- } catch (InterruptedException e) {}
- }
- }
-
- static void test() {
- NotifyTester nt = new NotifyTester();
- Thread t_on = nt.new TurnOnThread();
- Thread t_off = nt.new TurnOffThread();
-
- t_on.start();
- t_off.start();
-
- try {
- Thread.sleep(9000);
- } catch (InterruptedException e) {}
-
- t_on.interrupt();
- t_off.interrupt();
- }
- boolean on;
- }
notify只是唤醒一个等待的线程,而
notifyAll会唤醒所有等待的线程。我们还可以用Lock和Condition来完成同样的工作:
- class LockSignalTester {
- private Lock l = new ReentrantLock();
- private Condition c = l.newCondition();
- private boolean on;
-
- void turnOn() {
- while (!Thread.interrupted()) {
- l.lock();
- try {
- while (on)
- c.await();
- on = true;
- System.out.println("turn on!!");
- c.signal();
- } catch (InterruptedException ie) {
- } finally {
- l.unlock();
- }
- }
- }
-
- void turnOff(){
- while (!Thread.interrupted()) {
- l.lock();
- try {
- while (!on)
- c.await();
- on = false;
- System.out.println("turn off!!");
- c.signal();
- } catch (InterruptedException ie) {
- } finally {
- l.unlock();
- }
- }
- }
-
- class TurnOnThread extends Thread {
- public void run() { turnOn(); }
- }
-
- class TurnOffThread extends Thread {
- public void run() { turnOff(); }
- }
-
- static void test() {
- NotifyTester nt = new NotifyTester();
- Thread t_on = nt.new TurnOnThread();
- Thread t_off = nt.new TurnOffThread();
-
- t_on.start();
- t_off.start();
-
- try {
- Thread.sleep(9000);
- } catch (InterruptedException e) {}
-
- t_on.interrupt();
- t_off.interrupt();
- }
- }
Condition.await与
Condition.signal/signalAll和
Object.wait与
Object.notify/notifyAll等价。
CountDownLatch可以用来同步多个线程。它会有一个初始数,每次调用countDown的时候数值减一但不阻塞,而调用await的时候会阻塞直到数值被减为0为止:
- class CountDownLatchTester {
- int counter = 0;
- CountDownLatch cdl = new CountDownLatch(5);
-
- class IncreaseThread extends Thread {
- public void run() {
- try {
- Thread.sleep(new Random().nextInt(10) * 1000);
- } catch (InterruptedException e1) {}
- synchronized (this) {
- ++counter;
- }
- try {
- cdl.countDown();
- cdl.await();
- } catch (InterruptedException e) {}
- System.out.println(counter);
- }
- }
-
- static void test() {
- CountDownLatchTester cdlt = new CountDownLatchTester();
- for (int i = 0; i < 5; i++)
- cdlt.new IncreaseThread().start();
- }
- }
上面代码里的5个线程都会睡眠随机的一段时间,之后增加counter的值,再调用
countDown来降低CountDownLatch的值,最后调用
await来等待CountDownLatch的值变为0,即所有其它的线程全部结束。程序的输出是5个线程同时打印出数字“5”。
CountDownLatch只做一次同步操作,如果计数器减为0后,所有的await调用都会立即返回。CyclicBarrier可以循环地进行同步,当计数器降为0后,会自动设置为初值以便下一次的同步。CyclicBarrier没有countDown操作,它的await会将计数器减一并阻塞,如果计数器减为0所有阻塞在await操作上的线程都会被唤醒:
- class CyclicBarrierTester {
- int counter = 0;
- CyclicBarrier cb = new CyclicBarrier(5);
-
- class IncreaseThread extends Thread {
- public void run() {
- for (int i = 0; i < 3; ++i)
- {
- try {
- Thread.sleep(new Random().nextInt(10) * 1000);
- } catch (InterruptedException e1) {}
- synchronized (this) {
- ++counter;
- }
- try {
- cb.await();
- } catch (InterruptedException e) {
- } catch (BrokenBarrierException e) {
- }
- System.out.println(counter);
- }
- }
- }
-
- static void test() {
- CyclicBarrierTester cbt = new CyclicBarrierTester();
- for (int i = 0; i < 5; i++)
- cbt.new IncreaseThread().start();
- }
- }
上例我在thread的run方法里添加了一个循环,从运行结果可以看到这些线程确实进行了三次同步。
DelayQueue可以用来存放Delayed对象。Delayed是一个接口,它可以通过getDelay方法来表示有多久到期或者已经过期了多久。DelayQueue是一个优先级队列,优先级更高的Delayed会最先取出,不管它过期的时间如何。在同等优先级的情况下,会取出某个过期的Delayed。如果当前还有元素但没有元素过期,DelayQueue.take就会阻塞。下面给一个例子:
- class DelayQueueTester {
- DelayQueue<MyDelayed> dq = new DelayQueue<MyDelayed>();
- long currentTime = 0;
-
- class MyDelayed implements Delayed {
- long start = System.currentTimeMillis();
- int priority = new Random().nextInt(5);
-
- @Override
- public int compareTo(Delayed o) {
- return priority - ((MyDelayed)o).priority;
- }
- @Override
- public long getDelay(TimeUnit unit) {
- return unit.convert(start - currentTime, TimeUnit.MILLISECONDS);
- }
- }
-
- static void test() throws InterruptedException {
- DelayQueueTester dqt = new DelayQueueTester();
- for (int i = 0; i < 10; i++)
- {
- TimeUnit.MILLISECONDS.sleep(200);
- dqt.dq.offer(dqt.new MyDelayed());
- }
-
- Thread.sleep(1000);
- dqt.currentTime = System.currentTimeMillis();
-
- while (dqt.dq.size() > 0) {
- MyDelayed d = dqt.dq.take();
- System.out.println(d.priority + "\t" + d.start);
- }
- }
- }
Delay接口的两个方法compareTo和getDelay。前者用于比较优先级,值越小的优先级越高;后者用于返回离到期的时间,正值说明还没到期,负值说明已经过期。从输出上看,优先级高的会先被取出,
但是过期时间更长的并不优先取出,它只保证当前可以取出一个过期元素,但不保证取出的顺序。(书上说的可能不对)
PirorityBlockingQueue当队列里没有元素而试图取元素时,会发生阻塞,同时优先级最高(值最小)的元素会最先被取出:
- class PriorityBlockingQueueTester {
- PriorityBlockingQueue<Integer> pbq = new PriorityBlockingQueue<Integer>();
-
- class ReadThread extends Thread {
- public void run() {
- for (int i = 0; i < 1000; i++)
- try {
- int n = pbq.take();
- System.out.println(n);
- } catch (InterruptedException e) {}
- }
- }
-
- class WriteThread extends Thread {
- public void run() {
- for (int i = 0; i < 200; i++)
- {
- for (int j = 0; j < 5; j++)
- pbq.offer(10 - j);
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {}
- }
- }
- }
-
- static void test() throws InterruptedException {
- PriorityBlockingQueueTester pbqt = new PriorityBlockingQueueTester();
- pbqt.new ReadThread().start();
- pbqt.new WriteThread().start();
- }
- }
上面的代码会循环输出“6 7 8 9 10”。
DelayQueue和
PriorityBlockingQueue都实现了接口
BlockingQueue,这种队列都是可以阻塞线程的。其它实现了这个接口的类还有
ArrayBlockingQueue(队列大小固定)、
LinkedBlockingQueue和
SynchronousBlockingQueue(每个put方法都会阻塞直到一个take被调用,这个队列本身没有容量,一次只能传递一个数据)。在BlockingQueue引入java前,PipedWriter和PipedReader用来在线程间传递数据。
ShceduledThreadPoolExecutor可以让一个线程延后一段时间再启动:
- class ScheduledExecutorTester {
- static class MyRunnable implements Runnable {
- public void run() {
- System.out.println("Start time: " + new Date(System.currentTimeMillis()));
- }
- }
-
- static void test() {
- ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(10);
- System.out.println("Sheduled time: " + new Date(System.currentTimeMillis()));
- stpe.schedule(new MyRunnable(), 2, TimeUnit.SECONDS);
- }
- }
上面的schedule方法只执行一次,而
scheduleAtFixedRate方法会在固定的频率启动一个新线程:
- stpe.schedule(new MyRunnable(), 2, 3, TimeUnit.SECONDS);
第二个参数是第一次启动的延迟,第三个参数是第一次启动后,每次启动的间隔数。上面一行的意思是2秒后执行Runnable,之后每隔3秒执行新的Runnable。
另一个类似的方法是
scheduledWithFixedRate,它与scheduleAtFixedRate的区别在于,它在
前一个线程结束后过一段时间间隔,才会启动新的线程。stpe.scheduleWithFixedDelay(new MyRunnable(), 1, 3, TimeUnit.SECONDS);上面这行代码的意思是,第一个线程在1秒后启动,之后在线程结束后,再等3秒启动新线程,如此反复。
信号量(Semaphore)允许多个线程同时访问一个资源(锁只允许一个线程)。用Semaphore.aquire来获得资源访问权,如果获得该资源的线程数已经超出限制的话就阻塞,直到有别的线程调用Semaphore.release来释放该资源。
- class SemaphoreTester {
- static Semaphore s = new Semaphore(5, true);
-
- static class MyThread extends Thread {
- int id;
- MyThread(int id) { this.id = id; }
- public void run() {
- try {
- s.acquire();
- System.out.println("start - " + id);
- Thread.sleep(3000);
- System.out.println("end - " + id);
- s.release();
- } catch (InterruptedException e) {};
- }
- }
-
- static void test() {
- for (int i = 0; i < 20; ++i)
- new MyThread(i).start();
- }
- }
Extranger可以让线程之间交换数据:
- class ExchangerTester {
- static Exchanger<Integer> e = new Exchanger<Integer>();
- static class MyThread extends Thread {
- int id;
- int extrangedId;
- MyThread(int id) { this.id = id; }
- public void run() {
- try {
- extrangedId = e.exchange(id);
- } catch (InterruptedException e) {}
- }
-
- public void print() {
- System.out.println("My id: " + id + " - Exchanged id: " + extrangedId);
- }
- }
-
- static void test() {
- MyThread t1 = new MyThread(1);
- MyThread t2 = new MyThread(2);
- t1.start();
- t2.start();
-
- try {
- t1.join();
- t2.join();
- } catch (InterruptedException e) {}
-
- t1.print(); // My id: 1 - Exchanged id: 2
- t2.print(); // My id: 2 - Exchanged id: 1
- }
- }
阅读(1298) | 评论(0) | 转发(0) |