线程池:
多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。 假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。 一个线程池包括以下四个基本组成部分: 1、线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务; 2、工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务; 3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等; 4、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。 线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。线程池不仅调整T1,T3产生的时间段,而且它还显著减少了创建线程的数目,看一个例子:假设一个服务器一天要处理50000个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目,而如果服务器不利用线程池来处理这些请求则线程总数为50000。一般线程池大小是远小于50000。所以利用线程池的服务器程序不会为了创建50000而在处理请求时浪费时间,从而提高效率。例子:ThreadPool类
/** * @作者 whs * @创建日期 2015年2月5日 * @版本 V 1.0 */package thread.pool;import java.util.Collections;import java.util.Date;import java.util.LinkedList;import java.util.List;import org.apache.log4j.Logger;/** * 线程池 * 创建线程池,销毁线程池,添加新任务 */public final class ThreadPool { private static Logger logger = Logger.getLogger(ThreadPool.class.getName()); private static Logger taskLogger = Logger.getLogger("TaskLogger"); private static boolean debug = taskLogger.isDebugEnabled(); // private static boolean debug = taskLogger.isInfoEnabled(); private static ThreadPool instancePool; // 单例 ,懒汉式 public static final int SYSTEM_BUSY_TASK_COUNT = 150; public static int worker_num = 5; // 默认池中线程数 private static int taskCounter = 0; // 已经处理的任务数 public static boolean systemIsBusy = false; private static ListtaskQueue = Collections.synchronizedList(new LinkedList ()); public PoolWorker[] workThrads; // 线程池中的线程数组 //默认构造函数 初始化 线程池中的线程 数目 5个 private ThreadPool() { workThrads = new PoolWorker[5]; for (int i = 0; i < workThrads.length; i++) { workThrads[i] = new PoolWorker(i); } } //传参时 构造函数 初始化 线程池中的线程 数目 参数pool_worker_num个 private ThreadPool(int pool_worker_num) { worker_num = pool_worker_num; workThrads = new PoolWorker[worker_num]; for (int i = 0; i < workThrads.length; i++) { workThrads[i] = new PoolWorker(i); } } // 单态模式,获得一个默认线程个数的线程池 public static ThreadPool getInstancePool() { return getInstancePool(ThreadPool.worker_num); } // 单例 ,懒汉式 public static ThreadPool getInstancePool(int worker_num1) { if (worker_num1 <= 0) worker_num1 = ThreadPool.worker_num; if (instancePool == null) instancePool = new ThreadPool(worker_num1); return instancePool; } /** * 增加新的任务 * 每增加一个新任务,都要唤醒任务队列 * @param newTask */ public void addTask(Task newTask) { synchronized (taskQueue) { newTask.setTaskId(++taskCounter); //设定当前数组线程的id号 newTask.setSubmitTime(new Date()); //设定当前线程的启动时间 taskQueue.add(newTask); //将该线程添加到线程队列中去 taskQueue.notifyAll(); //唤醒队列, 开始执行 } logger.info("Submit Task<" + newTask.getTaskId() + ">: " + newTask.info()); } /** * 批量增加新任务 * @param taskes */ public void batchAddTask(Task[] taskes) { if (taskes == null || taskes.length == 0) { return; } synchronized (taskQueue) { for (int i = 0; i < taskes.length; i++) { if (taskes[i] == null) { continue; } taskes[i].setTaskId(++taskCounter); taskes[i].setSubmitTime(new Date()); taskQueue.add(taskes[i]); } taskQueue.notifyAll(); //唤醒队列, 开始执行 } for (int i = 0; i < taskes.length; i++) { if (taskes[i] == null) { continue; } logger.info("Submit Task<" + taskes[i].getTaskId() + ">: " + taskes[i].info()); } } /** * 获取并输出 线程池信息 */ public String getInfo() { StringBuffer sb = new StringBuffer(); sb.append("\nTask Queue Size:" + taskQueue.size()); for (int i = 0; i < workThrads.length; i++) { sb.append("\nWorker " + i + " is " + ((workThrads[i].isWaiting()) ? "Waiting." : "Running.")); } return sb.toString(); } // 销毁线程池,该方法保证在所有任务都完成的情况下才销毁所有线程,否则等待任务完成才销毁 public synchronized void destroy() { while (!taskQueue.isEmpty()) {// 如果还有任务没执行完成,就先睡会吧 try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } // 工作线程停止工作,且置为null for (int i = 0; i < worker_num; i++) { workThrads[i].stopWorker(); workThrads[i] = null; } instancePool=null; taskQueue.clear();// 清空任务队列 } /** * 内部类,线程池中的工作线程类 */ private class PoolWorker extends Thread { private int index = -1; private boolean isRunning = true; //该工作线程是否有效, 用于结束该工作线程 private boolean isWaiting = true; //该工作线程是否可以执行新任务 public PoolWorker(int index) { this.index = index; start(); } public void stopWorker() { this.isRunning = false; } public boolean isWaiting() { return this.isWaiting; } /** * 循环执行任务 * 这也许是线程池的关键所在 如果任务队列不空,则取出任务执行,若任务队列空,则等待 */ public void run() { while (isRunning) { Task r = null; synchronized (taskQueue) { while (taskQueue.isEmpty()) { try { /* 任务队列为空,则等待有新任务加入从而被唤醒 */ taskQueue.wait(20); } catch (InterruptedException ie) { logger.error(ie); } } /* 取出任务执行 */ r = (Task) taskQueue.remove(0); } if (r != null) { isWaiting = false; try { if (debug) { r.setBeginExceuteTime(new Date()); taskLogger.debug("Worker<" + index + "> start execute Task<" + r.getTaskId() + ">"); if (r.getBeginExceuteTime().getTime() - r.getSubmitTime().getTime() > 1000) taskLogger.debug("longer waiting time. " + r.info() + ",<" + index + ">,time: + (r.getFinishTime().getTime() - r.getBeginExceuteTime().getTime())"); } /* 该任务是否需要立即执行 */ if (r.needExecuteImmediate()) { new Thread(r).start(); } else { r.run(); } if (debug) { r.setFinishTime(new Date()); taskLogger.debug("Worker<" + index + "> finish task<" + r.getTaskId() + ">"); if (r.getFinishTime().getTime() - r.getBeginExceuteTime().getTime() > 1000) taskLogger.debug("longer execution time. " + r.info() + ",<" + index + ">,time:" + (r.getFinishTime().getTime() - r.getBeginExceuteTime().getTime())); } } catch (Exception e) { e.printStackTrace(); logger.error(e); } isWaiting = true; r = null; } } } } }
Task 类
/** * @作者 whs * @创建日期 2015年2月5日 * @版本 V 1.0 */package thread.pool;/* * 所有任务接口 * 其他任务必须继承访类 */import java.util.Date;public abstract class Task implements Runnable { //private static Logger logger = Logger.getLogger(Task.class); private Date generateTime = null; // 产生时间 private Date submitTime = null; // 提交执行时间 private Date beginExceuteTime = null; // 开始执行时间 private Date finishTime = null; // 执行完成时间 private long taskId; public Task() { this.generateTime = new Date(); } /** * 任务执行入口 */ @Override public void run() { try { Thread.sleep(15*1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务 " + taskId + " 完成"); /** * 相关执行代码 * beginTransaction(); * 执行过程中可能产生新的任务 subtask = taskCore(); * commitTransaction(); * 增加新产生的任务 ThreadPool.getInstance().batchAddTask(taskCore()); */ } /** * 所有任务的核心 所以特别的业务逻辑执行之处 * @throws Exception */ public abstract Task[] taskCore() throws Exception; /** * 是否用到数据库 */ protected abstract boolean useDb(); /** * 是否需要立即执行 */ protected abstract boolean needExecuteImmediate(); /** * 任务信息 */ public abstract String info(); public Date getGenerateTime() { return generateTime; } public Date getBeginExceuteTime() { return beginExceuteTime; } public void setBeginExceuteTime(Date beginExceuteTime) { this.beginExceuteTime = beginExceuteTime; } public Date getFinishTime() { return finishTime; } public void setFinishTime(Date finishTime) { this.finishTime = finishTime; } public Date getSubmitTime() { return submitTime; } public void setSubmitTime(Date submitTime) { this.submitTime = submitTime; } public long getTaskId() { return taskId; } public void setTaskId(long taskId) { this.taskId = taskId; }}
TaskA 类
/** * @作者 whs * @创建日期 2015年2月6日 * @版本 V 1.0 */package thread.pool;public class TaskA extends Task { @Override public Task[] taskCore() throws Exception { // TODO Auto-generated method stub return null; } @Override protected boolean useDb() { // TODO Auto-generated method stub return false; } @Override protected boolean needExecuteImmediate() { // TODO Auto-generated method stub return false; } @Override public String info() { // TODO Auto-generated method stub return null; }}
TestThreadPool 类
/** * @作者 whs * @创建日期 2015年2月6日 * @版本 V 1.0 */package thread.pool;public class TestThreadPool { public static void main(String[] args) { // 创建线程池,并初始化2个线程工作者 ThreadPool t = ThreadPool.getInstancePool(2); Task[] tasks = new TaskA[3]; for(int i=0;i