转自:https://blog.csdn.net/MonkeyDCoding/article/details/81369610
0.源代码
github-简易高并发框架注:本篇博客知识来自于网课。
1.问题来源以及w
对于一个题库系统。考试组要有批量的离线文档要生成。题库组批量的题目要进行排重,且要根据条件批量修改题目内容。对于痛点:
批量任务完成缓慢
所有的问题都围绕着“查询”,即查询进度影响总体性能我们希望尽量使用友好(如果用多线程来提高性能,我们希望能屏蔽细节)因此我们需要一个可以提供查询进度通用的框架。2.我们该怎么做?
这里先要明确“任务”(Task)和“工作”(Job)的关系。对于一个工作,他内部可能须有许多的任务,任务是他的子元素(属性、字段)。用并发安全的类确保每个工作的属性和工作下的每个任务信息,也意味着工作和任务的注册机制。
需要并发安全的类保存每个任务的处理结果(TaskResult)。需要提供查询接口,供外部的使用。这里我们不处理对于工作的检查。有兴趣的可以实现。3.总体流程
这里不按照流程讲解,而是按照类关系从下而上讲解。
4.目录结构
5.TaskResultType
package me.hcFramework.pool.vo;//这个类只是用来作为标志的信息。public enum TaskResultType { SUCCESS, //表示任务成功 FAILSURE, //表示任务失败 EXCEPTION; //表示发生了异常,这里我们不去详尽判断,只用这个标示来笼统表示 }6.TaskResultpackage me.hcFramework.pool.vo;/** * * @param <R> 业务方法处理后的业务结果数据的类型 * * 对属性使用final修饰是为了使其不可改 */public class TaskResult<R> { //用户业务是否成功完成 private final TaskResultType resultType; //业务方法处理后的业务结果数据 private final R returnType; //如果失败,则失败原因 private final String reason; //针对任务失败的构造方法 public TaskResult(TaskResultType resultType , R returnType , String reason) { this.resultType = resultType; this.returnType = returnType; this.reason = reason; } //针对任务成功的构造方法 public TaskResult(TaskResultType resultType , R returnType) { this.resultType = resultType; this.returnType = returnType; this.reason = "success"; } //因为我们希望字段不可改,设置为了final。所以只提供getters public TaskResultType getResultType() { return resultType; } public R getReturnType() { return returnType; } public String getReason() { return reason; } @Override public String toString() { return "TaskResult [resultType=" + resultType + ", returnType=" + returnType + ", reason=" + reason + "]"; }}在这里其实可以发生一点小改动。即:把错误信息归并到TaskResultType中。这样一个TaskResultType包括成功,错误/异常以及其原因就完整了。这里不过多介绍。7.JobInfo
package me.hcFramework.pool.vo;import java.util.LinkedList;import java.util.List;import java.util.concurrent.LinkedBlockingDeque;import java.util.concurrent.atomic.AtomicInteger;/** * 可以看作是一堆Task的打包+信息控制 * 与TaskResult一样,一旦设置好了就不许再次更改 */public class JobInfo<R> { //唯一性标志 private final String jobName; //任务处理器,要求业务人员实现接口 private final ITaskProcessor<?, ?> taskProcessor; //工作(Job)中任务(Task)的数量 private final int jobLength; //以下两个类保证操作原子性 //任务总执行成功个数 private AtomicInteger successCount; //已执行的任务总数 private AtomicInteger taskProcessCount; //每个任务的处理结果,供查询调用 private LinkedBlockingDeque<TaskResult<R>> taskDetailQueue; public JobInfo(String jobName , int jobLength , ITaskProcessor<?,?> taskProcessor) { this.jobName = jobName; this.jobLength = jobLength; this.taskProcessor = taskProcessor; this.successCount = new AtomicInteger(0); this.taskProcessCount = new AtomicInteger(0); this.taskDetailQueue = new LinkedBlockingDeque<TaskResult<R>>(jobLength); } //提供工作的整体进度信息 public String getTotalProcess() { return "success[" + successCount.get()+"]/current[" + taskProcessCount.get() + "],Total=[" + jobLength + "]"; } //取得工作中每个任务的详情 public List<TaskResult<R>> getTaskDetail() { List<TaskResult<R>> taskDetailList = new LinkedList<TaskResult<R>>(); TaskResult<R> taskResult; //pollFirst()方法返回双端队列的第一个元素,返回的元素会从列表中移除 while((taskResult = taskDetailQueue.pollFirst()) != null) { taskDetailList.add(taskResult); } return taskDetailList; } //放入工作详情,只需要保证最终个数正确即可,不需要加锁 public void addTaskResult(TaskResult<R> result) { if(TaskResultType.SUCCESS == result.getResultType()) { successCount.getAndIncrement(); } taskProcessCount.getAndIncrement(); taskDetailQueue.add(result); } public String getJobName() { return jobName; } public ITaskProcessor<?, ?> getTaskProcessor() { return taskProcessor; } public int getJobLength() { return jobLength; } public int getSuccessCount() { return successCount.get(); } public int getTaskProcessCount() { return taskProcessCount.get(); } @Override public String toString() { return "JobInfo [jobName=" + jobName + ", taskProcessor=" + taskProcessor + ", jobLength=" + jobLength + ", successCount=" + successCount + ", taskProcessCount=" + taskProcessCount + ", taskDetailQueue=" + taskDetailQueue + "]"; }}关于LinkedBlockingDeque的说明:他是线程安全的。他是双端队列,任何一端都可以进行元素的出入。8.ITaskProcessor
package me.hcFramework.pool.vo;/** * 定义接口,所有需要完成的任务都需要实现此接口进行 * * @param <T> 业务方法需要的数据 * @param <R> 业务方法处理后的业务结果数据的类型 */public interface ITaskProcessor<T ,R> { TaskResult<R> taskExecute(T data);}9.真正的黑箱子:PendingJobPoolpackage me.hcFramework.pool;import java.util.List;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ExecutorService;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import me.hcFramework.pool.vo.ITaskProcessor;import me.hcFramework.pool.vo.JobInfo;import me.hcFramework.pool.vo.TaskResult;import me.hcFramework.pool.vo.TaskResultType;/** * * 这是框架主体类 */public class PendingJobPool { //key = 每个工作的名字 jobInfo.jobName //工作的存放容器,用于完成工作的注册 private static ConcurrentHashMap<String,JobInfo<?>> jobInfoMap = new ConcurrentHashMap<String,JobInfo<?>>(); //单例模式组合拳:类内部实例化+私有构造方法+静态get方法 private static PendingJobPool pool = new PendingJobPool(); private PendingJobPool(){ } public static PendingJobPool getPool() { //这里是为了完善逻辑,且为日后框架加入检查功能预留空间 //当然这里也可成为后续版本AOP的切点 //checkJob.initCheck(jobInfoMap); return pool; } //根据工作名称,拿工作的实体 @SuppressWarnings("unchecked") public <R> JobInfo<R> getJob(String jobName) { JobInfo<R> jobInfo = (JobInfo<R>) jobInfoMap.get(jobName); if(null == jobInfo) { throw new RuntimeException(jobName + "是非法任务!"); } return jobInfo; } //获得处理详情,这里不对与jobName作出检查 public <R> List<TaskResult<R>> getTaskDetail(String jobName) { JobInfo<R> jobInfo = getJob(jobName); return jobInfo.getTaskDetail(); } //获得处理进度 public String getTaskProgess(String jobName) { return getJob(jobName).getTotalProcess(); } //获得当前已处理多少个任务 public int getDoneCount(String jobName) { return getJob(jobName).getTaskProcessCount(); } /** * 注册方法:注册工作(job) * @param jobName 名字 * @param jobLength 工作中任务的长度 * @param taskProcessor 业务处理器 */ public <R> void registerJob(String jobName , int jobLength , ITaskProcessor<?,?> taskProcessor) { JobInfo<R> jobInfo = new JobInfo<R>(jobName, jobLength, taskProcessor); //putIfAbsent()如果map中没有该工作,则放入且返回null;如果已有会返回对象 if(jobInfoMap.putIfAbsent(jobName, jobInfo) != null) { throw new RuntimeException(jobName + "已经注册过了"); } } /** * 提交任务 * @param jobName 任务所对应的工作名 * @param t 任务数据 */ public <T ,R> void putTask(String jobName , T t) { JobInfo<R> jobInfo = getJob(jobName); PendingTask<T ,R> task = new PendingTask<T ,R>(jobInfo , t); taskExecutor.execute(task); } //取得当前机器上的CPU数量 private static final int THREAD_COUNTS = Runtime.getRuntime().availableProcessors(); //阻塞队列,线程池使用,用以存放待处理的任务 private static BlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<>(5000); //线程池,固定大小,有界队列 private static ExecutorService taskExecutor = new ThreadPoolExecutor(THREAD_COUNTS, THREAD_COUNTS, 60, TimeUnit.SECONDS, taskQueue); public void closePool() { taskExecutor.shutdown(); } //交给我们框架执行的任务 private static class PendingTask<T , R> implements Runnable { private JobInfo<R> jobInfo; private T processData; public PendingTask(JobInfo<R> jobInfo , T processData) { this.jobInfo = jobInfo; this.processData = processData; } @SuppressWarnings("unchecked") @Override public void run() { ITaskProcessor<T, R> taskProcessor = (ITaskProcessor<T, R>) jobInfo.getTaskProcessor(); TaskResult<R> result = null; try{ result = taskProcessor.taskExecute(processData); if(result== null) { result = new TaskResult<R>(TaskResultType.EXCEPTION, null , "is null"); }else if(result.getResultType() == null) { //如果你看懂这个判断,就会觉得很厉害同时又会感到羞辱 if(result.getReason() == null) { result = new TaskResult<R>(TaskResultType.EXCEPTION, null , "reason is null"); } else { result = new TaskResult<R>(TaskResultType.EXCEPTION, null , "type is null"); } } } catch (Exception e) { result = new TaskResult<R>(TaskResultType.EXCEPTION, null , "task exception" + e.getMessage()); } finally { jobInfo.addTaskResult(result); } } }}如果读者了解Spring的实现,会知道bean的注册过程其实也就是放入了Map中。或者读者也曾经开发过一些需要注册功能的应用,无疑都是使用了Map。除了Map的高性能,真的可以说是:聪明人都只用一种聪明法。10.测试
自己实现ITaskProcessor接口public class MyTask implements ITaskProcessor<Integer, Integer>{ @Override public TaskResult<Integer> taskExecute(Integer data) { Random r = new Random(); int flag = r.nextInt(500); try { Thread.sleep(flag); } catch (InterruptedException e) { e.printStackTrace(); } if(flag <= 300) {//正常处理的情况 Integer returnValue = data.intValue() + flag; return new TaskResult<Integer>(TaskResultType.SUCCESS, returnValue); } else if(flag > 300 && flag <= 400) {//处理失败的情况 return new TaskResult<Integer>(TaskResultType.FAILSURE, -1 , "Failsure"); } else { try { throw new RuntimeException("异常发生了!!"); } catch(Exception e) { return new TaskResult<Integer>(TaskResultType.EXCEPTION, -1 ,e.getMessage()); } } }}Test类public class AppTest { private final static String JOB_NAME="计算数值"; //private final static String JOB_OTHER_NAME = "字符串"; private final static int JOB_LENGTH = 150; private static class QueryResult implements Runnable { private PendingJobPool pool; private String jobName; public QueryResult(PendingJobPool pool , String jobName) { this.pool = pool; this.jobName = jobName; } @Override public void run() { while(pool.getDoneCount(jobName) <= JOB_LENGTH) { List<TaskResult<String>> taskDetail = pool.getTaskDetail(jobName); if(!taskDetail.isEmpty()) { System.out.println(pool.getTaskProgess(jobName)); System.out.println(taskDetail); } if(pool.getDoneCount(jobName) == JOB_LENGTH) { break; } try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String[] args) { PendingJobPool pool = PendingJobPool.getPool(); MyTask myTask = new MyTask(); pool.registerJob(JOB_NAME, JOB_LENGTH, myTask); Random r = new Random(); for(int i = 0 ; i < JOB_LENGTH ; i++) { pool.putTask(JOB_NAME, r.nextInt(1000)); } new Thread(new QueryResult(pool, JOB_NAME)).start(); }}Test类中实现了一个用来查询的线程。--------------------- 作者:MonkeyDCoding 来源:CSDN 原文:https://blog.csdn.net/MonkeyDCoding/article/details/81369610 版权声明:本文为博主原创文章,转载请附上博文链接!