Collin Nam


java线程的取消与关闭

Frank 2019-04-26 1328浏览 0条评论
首页/ 正文
分享到: / / / /

目录

1.任务取消     

 1.1通过volatile类型的域来保存取消状态

1.2通过 future 的cancel取消线程

1.3使用interrupt()方法中断当前线程 

1.4使用shutdown()和shutdownNow()

  1.4.1 shutdown()

  1.4.2 shutdownNow()

1.5使用stop方法终止线程 

2.停止基于线程的服务

    2.1关闭ExecutorService 

    2.2毒丸对象

    3.3只执行一次的服务

3.JVM关闭钩子


要使任务和线程安全、快速、可靠的停止下来并不是一件容易的事情。java没有提供任何机制来安全的终止线程。但它提供了中断(Interruption),这是一种协作机制能够使一个线程终止另一个线程的当前工作。

1.任务取消     

 1.1通过volatile类型的域来保存取消状态

一般run()方法执行完,线程就会正常结束,然而,常常有些线程是伺服线程。它们需要长时间的运行,只有在外部某些条件满足的情况下,才能关闭这些线程。使用一个变量来控制循环,例如:最直接的方法就是设一个boolean类型的标志,并通过设置这个标志为true或false来控制while循环是否退出,代码示例:

 一个仅运行一秒的素数生成器
* 线程的取消与关闭  通过cancel方法将设置标志 并且主循环会检车这个标志,
* 为了使这个过程可靠的工作  标志cancelled必须为  volatile
package com.cocurrent.demo;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;

import static java.util.concurrent.TimeUnit.SECONDS;

/**
 * @Author: nanJunYu
 * @Description: 一个仅运行一秒的素数生成器
 * 线程的取消与关闭  通过cancel方法将设置标志 并且主循环会检车这个标志,
 * 为了使这个过程可靠的工作  标志cancelled必须为  volatile
 * @Date: Create in  2018/9/11 15:21
 */
public class CancelVolatile implements Runnable {

    private final List<BigInteger> pri = new ArrayList<BigInteger>();

    private volatile boolean cancelled;

    public void run() {
        BigInteger p = BigInteger.ONE;
        while (!cancelled) {
            p = p.nextProbablePrime();
            synchronized (this) {
                pri.add(p);
            }
        }
    }

    private void cancel() {
        cancelled = true;
    }

    private synchronized List<BigInteger> get() {
        return new ArrayList<BigInteger>(pri);
    }

    public static void main(String[] args) {
        CancelVolatile cancelVolatile = new CancelVolatile();
        new Thread(cancelVolatile).start();
        try {
            SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            cancelVolatile.cancel();
        }
        System.out.println(cancelVolatile.get());
    }
}

CancelVolatile中的取消机制最终会使得搜索的素数任务退出,但是退出过程中需要花费一定的时间,然而,如果任务调度了一些阻塞方法,(BlockingQueue.put)那么可能产生一个问题——任务可能永远不会检查取消标示,永远不会结束。为取保线程能退出,我们通常使用中断,当我们调用interrupt,并不意味着立即停止目标线程正在运行的线程,而只是传递了一个请求中断的信息,它会在线程下一个合适的的时刻中断自己。wait、sleep、join、将严格处理这种请求,当他们收到一个中断请求,或饿着开始执行时发现中断状态时,将抛出异常。

使用静态的interrupted时应该小心,因为它会清除当前线程的中断状态,如果返回true,除非你你想屏蔽这个中断,否则必须对它进行处理,抛出异常或者再次调用interrupt来恢复中断状态

try {
			...
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
		}

通常,我们用中断来取消任务比检查标记更好,是最合适的取消任务方式,我们看一个更加健壮的获得素数的类。

public class PrimeProducer extends Thread{
	
	private final BlockingQueue<BigInteger> queue;
	
	public PrimeProducer(BlockingQueue<BigInteger> queue) {
		this.queue = queue;
	}
 
	@Override
	public void run() {
		try {
			BigInteger p = BigInteger.ONE;
			while(!Thread.currentThread().isInterrupted()) //①用线程的状态来检查
				queue.put(p = p.nextProbablePrime());
			
		} catch (InterruptedException e) {
			//中断将线程退出
		}
	}
	public void cancel() { interrupt(); }
 
}

1.2通过 future 的cancel取消线程

package com.cocurrent.demo;

import java.util.concurrent.*;

/**
 * @Author: nanJunYu
 * @Description:通过 future 的cancel取消线程
 * @Date: Create in  2018/9/12 16:41
 */
public class FutureCancel {
    public static void timeRun(Runnable runnable, Long timeout, TimeUnit timeUnit) throws InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<?> future = executorService.submit(runnable);
        try {
            future.get(timeout,timeUnit);
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            future.cancel(true);
        }
    }
}

 

1.3使用interrupt()方法中断当前线程 

在程序中,我们是不能随便中断一个线程的,因为这是极其不安全的操作,我们无法知道这个线程正运行在什么状态,它可能持有某把锁,强行中断可能导致锁不能释放的问题;或者线程可能在操作数据库,强行中断导致数据不一致混乱的问题。正因此,JAVA里将Thread的stop方法设置为过时,以禁止大家使用。

一个线程什么时候可以退出呢?当然只有线程自己才能知道。

所以我们这里要说的Thread的interrrupt方法,本质不是用来中断一个线程。是将线程设置一个中断状态。

当我们调用线程的interrupt方法,它有两个作用:

1、如果此线程处于阻塞状态(比如调用了wait方法,io等待),则会立马退出阻塞,并抛出InterruptedException异常,线程就可以通过捕获InterruptedException来做一定的处理,然后让线程退出。

2、如果此线程正处于运行之中,则线程不受任何影响,继续运行,仅仅是线程的中断标记被设置为true。所以线程要在适当的位置通过调用isInterrupted方法来查看自己是否被中断,并做退出操作。

package com.cocurrent.demo;

/**
 * @Author: nanJunYu
 * @Description:线程的中断方法测试
 * @Date: Create in  2018/9/11 17:16
 */
public class InterruptedDemo {
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(new Work());
        thread.start();

        Thread.sleep(1000);
        thread.interrupt();
        System.out.println("Main thread stopped.");

    }

    public static class Work implements Runnable {

        public void run() {
            System.out.println("我在做一些事情。。。");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                Thread curr = Thread.currentThread();
                //再次调用interrupt方法中断自己,将中断状态设置为“中断”
                curr.interrupt();
                System.out.println("Worker IsInterrupted: " + curr.isInterrupted());
                System.out.println("Worker IsInterrupted: " + curr.isInterrupted());
                System.out.println("Static Call: " + Thread.interrupted());//clear status
                System.out.println("---------After Interrupt Status Cleared----------");
                System.out.println("Static Call: " + Thread.interrupted());
                System.out.println("Worker IsInterrupted: " + curr.isInterrupted());
                System.out.println("Worker IsInterrupted: " + curr.isInterrupted());
            }

            System.out.println("Worker stopped.");

        }
    }
}

1.4使用shutdown()和shutdownNow()

  1.4.1 shutdown()

将线程池状态置为SHUTDOWN,并不会立即停止:

  • 停止接收外部submit的任务
  • 内部正在跑的任务和队列里等待的任务,会执行完
  • 等到第二步完成后,才真正停止

 shutdown方法源码

 public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

  1.4.2 shutdownNow()

将线程池状态置为STOP企图立即停止,事实上不一定:

  • 跟shutdown()一样,先停止接收外部提交的任务
  • 忽略队列里等待的任务
  • 尝试将正在跑的任务interrupt中断
  • 返回未执行的任务列表

shutdownNow源码

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

从源码可以很清晰的看出两者的区别,shutdown使用了以后会置状态为SHUTDOWN,而shutdownNow为STOP。此外,shutdownNow会返回任务列表。

1.5使用stop方法终止线程 

程序中可以直接使用thread.stop()来强行终止线程,但是stop方法是很危险的,就象突然关闭计算机电源,而不是按正常程序关机一样,可能会产生不可预料的结果,不安全主要是:thread.stop()调用之后,创建子线程的线程就会抛出ThreadDeatherror的错误,并且会释放子线程所持有的所有锁。一般任何进行加锁的代码块,都是为了保护数据的一致性,如果在调用thread.stop()后导致了该线程所持有的所有锁的突然释放(不可控制),那么被保护数据就有可能呈现不一致性,其他线程在使用这些被破坏的数据时,有可能导致一些很奇怪的应用程序错误。因此,并不推荐使用stop方法来终止线程

2.停止基于线程的服务

    2.1关闭ExecutorService 

ExecutorService提供两个关闭的方法,shutdown 和shutdownNow。shutdownNow首先关闭当前执行线程,然后返回未执行的任务,而shutdown则等待执行完成。两种方法差别在于安全性和响应性。

直接看例子了,这没什么好说的。

package com.cocurrent.demo;

import java.util.concurrent.*;

/**
 * @Author: nanJunYu
 * @Description:
 * @Date: Create in  2018/9/12 17:42
 */
public class ExecutorServiceCancelB {

    public static class MyCallable implements Callable {

        private int flag = 0;

        public MyCallable(int flag) {
            this.flag = flag;
        }

        public String call() throws Exception {

            if (this.flag == 0) {
                return "flag = 0";
            }
            if (this.flag == 1) {
                try {
                    while (true) {
                        System.out.println("looping.");
                        Thread.sleep(2000);
                    }
                } catch (InterruptedException e) {
                    System.out.println("Interrupted");
                }
                return "false";
            } else {
                throw new Exception("Bad flag value!");
            }
        }
    }

    public static void main(String[] args) {

        // 定义3个Callable类型的任务
        MyCallable task1 = new MyCallable(0);
        MyCallable task2 = new MyCallable(1);
        MyCallable task3 = new MyCallable(2);

        // 创建一个执行任务的服务
        ExecutorService es = Executors.newFixedThreadPool(3);
        try {
            // 提交并执行任务,任务启动时返回了一个Future对象,
            // 如果想得到任务执行的结果或者是异常可对这个Future对象进行操作
            Future future1 = es.submit(task1);
            // 获得第一个任务的结果,如果调用get方法,当前线程会等待任务执行完毕后才往下执行
            System.out.println("task1: " + future1.get());

            Future future2 = es.submit(task2);

            shutdownAndAwaitTermination(es);

            // 等待5秒后,再停止第二个任务。因为第二个任务进行的是无限循环
            Thread.sleep(5000);
            System.out.println("task2 cancel: " + future2.cancel(true));

            // 获取第三个任务的输出,因为执行第三个任务会引起异常
            // 所以下面的语句将引起异常的抛出
            Future future3 = es.submit(task3);
            System.out.println("task3: " + future3.get());
        } catch (Exception e) {
            System.out.println(e.toString());
        }
        // 停止任务执行服务
        //es.shutdown();

    }

    private static void shutdownAndAwaitTermination(ExecutorService pool) {
        pool.shutdown(); // Disable new tasks from being submitted
        try {
            // Wait a while for existing tasks to terminate
            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
                pool.shutdownNow(); // Cancel currently executing tasks
                // Wait a while for tasks to respond to being cancelled
                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
                    System.err.println("Pool did not terminate");
            }
        } catch (InterruptedException ie) {
            // (Re-)Cancel if current thread also interrupted
            pool.shutdownNow();
            // Preserve interrupt status
            Thread.currentThread().interrupt();
        }
    }
}

    2.2毒丸对象

另一种关闭生产者-消费者服务的方式就是使用“毒丸”对象,其实就是指往对象里面放一个标志对象,当得到这个对象就立即停止,这就需要在执行方法里面判断,消费者读到毒丸后就不会再执行,同样生产者提交毒丸后,就不能再提交任务。

只有生产者和消费者都已知的情况下,才可以使用“毒丸”,当生产者和消费者和数量较大时,方法变的难以使用。

直接看书中的例子,这个不是很常用,粘过来,以备用吧。

public class IndexingService {
	private static final File POISON = new File("");
	private final IndexerThread consumer = new IndexerThread();
	private final CrawlerThread producer = new CrawlerThread();
	private final BlockingQueue<File> queue;
	private final FileFilter fileFilter;
	private final File root;
 
	class CrawlerThread extends Thread {
		class IndexerThread extends Thread {
			public void start() {
				producer.start();
				consumer.start();
			}
			/* Listing 7.18 */
		} /* Listing 7.19 */
	}
 
	public void stop() {
		producer.interrupt();
	}
 
	public void awaitTermination() throws InterruptedException {
		consumer.join();
	}
}

3.3只执行一次的服务

如果某个方法需要处理一批任务,并且当所有任务都处理完成后才返回,那么可以通过一个私有的Executor来简化服务的生命周期,其中该Executor的生命周期由这个方法来控制。

下面程序checkMail方法能在多台主机上并行地检查新邮件,它创建一个私有的Executor,并向每台主机提交一个任务,当所有邮件任务都执行完成后,关闭并结束。

boolean checkMail(Set<String> hosts, long timeout, TimeUnit unit)
			throws InterruptedException {
 
		ExecutorService exec = Executors.newCachedThreadPool();
		final AtomicBoolean hasNewMail = new AtomicBoolean(false);
 
		try {
			for (final String host : hosts) {
				exec.execute(new Runnable() {
 
					@Override
					public void run() {
						if (check(host))
							hasNewMail.set(true);
					}
				});
			}
		} finally {
			exec.shutdown();
			exec.awaitTermination(timeout, unit);
		}
		return hasNewMail.get();
	}

3.JVM关闭钩子

   3.1关闭钩子

关闭钩子本质上是一个线程(也称为Hook线程),用来监听JVM的关闭。通过使用Runtime的addShutdownHook(Thread hook)可以向JVM注册一个关闭钩子。Hook线程在JVM 正常关闭才会执行,在强制关闭时不会执行。

对于一个JVM中注册的多个关闭钩子它们将会并发执行,所以JVM并不能保证它的执行顺行。当所有的Hook线程执行完毕后,如果此时runFinalizersOnExit为true,那么JVM将先运行终结器,然后停止。Hook线程会延迟JVM的关闭时间,这就要求在编写钩子过程中必须要尽可能的减少Hook线程的执行时间。另外由于多个钩子是并发执行的,那么很可能因为代码不当导致出现竞态条件或死锁等问题,为了避免该问题,强烈建议在一个钩子中执行一系列操作。

明白了其原理之后,也需要知道其使用场景: 
1、内存管理 
在某些情况下,我们需要根据当前内存的使用情况,人为的调用System.gc()来尝试回收堆内存中失效的对象。此时就可以用到Runtime中的totalMemory()freeMemory()等方法。示例如下:

 /**
     * @Author: nanJunYu
     * @Description:1、内存管理 在某些情况下,我们需要根据当前内存的使用情况,人为的调用System.gc()来尝试回收堆内存中失效的对象。
     * 此时就可以用到Runtime中的totalMemory()、freeMemory()等方法。示例如下:
     * @Date: Create in  2018/9/12 20:48
     * @params:
     * @return:
     */
    public static void autoClean() {
        Runtime runtime = Runtime.getRuntime();
        if ((runtime.totalMemory() - runtime.freeMemory()) / (double) runtime.maxMemory() > 0.90) {
            System.out.println("执行清理工作");
        } else {
            System.out.println(runtime.freeMemory());
        }
    }

 

2、执行命令

 class Test { 
        public static void main(String args[]){ 
                Runtime r = Runtime.getRuntime(); 
                Process p = null; 
                try{ 
                        p = r.exec("notepad"); 
                } catch (Exception e) { 

                } 
        } 
}

注意:通过exec()方式执行命令时,该命令在单独的进程(Process)中。

 

3、通过Hook实现临时文件清理

实例:

 /**
    * @Author: nanJunYu
    * @Description:通过Hook实现临时文件清理
    * @Date: Create in  2018/9/12 20:54
    * @params:
    * @return:
    */
    public void clearTemporaryFile() {
        Runtime runtime = Runtime.getRuntime();
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            public void run() {
                System.out.println("auto clean temporary file");
            }
        }));
    }

 

 

                                                                                   

最后修改:2019-04-26 16:00:42 © 著作权归作者所有
如果觉得我的文章对你有用,请随意赞赏
扫一扫支付

上一篇

发表评论

说点什么吧~

评论列表

还没有人评论哦~赶快抢占沙发吧~