返回顶部
首页 > 资讯 > 精选 >JAVA多线程怎么实现用户任务排队并预估排队时长
  • 625
分享到

JAVA多线程怎么实现用户任务排队并预估排队时长

2023-06-22 00:06:35 625人浏览 泡泡鱼
摘要

这篇文章主要介绍“JAVA多线程怎么实现用户任务排队并预估排队时长”,在日常操作中,相信很多人在JAVA多线程怎么实现用户任务排队并预估排队时长问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”JAVA多线程怎么

这篇文章主要介绍“JAVA多线程怎么实现用户任务排队并预估排队时长”,在日常操作中,相信很多人在JAVA多线程怎么实现用户任务排队并预估排队时长问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”JAVA多线程怎么实现用户任务排队并预估排队时长”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

JAVA多线程怎么实现用户任务排队并预估排队时长

实现流程

JAVA多线程怎么实现用户任务排队并预估排队时长

初始化一定数量的任务处理线程和缓存线程池,用户每次调用接口,开启一个线程处理。

假设初始化5个处理器,代码执行 BlockingQueue.take 时候,每次take都会处理器队列就会减少一个,当处理器队列为空时,take就是阻塞线程,当用户处理某某任务完成时候,调用资源释放接口,在处理器队列put 一个处理器对象,原来阻塞的take ,就继续执行。

排队论简介

排队论是研究系统随机聚散现象和随机系统工作工程的数学理论和方法,又称随机服务系统理论,为运筹学的一个分支。我们下面对排队论做下简化处理,先看下图:

JAVA多线程怎么实现用户任务排队并预估排队时长

代码具体实现

任务队列初始化 TaskQueue

import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;import org.springframework.stereotype.Component; import javax.annotation.PostConstruct;import java.util.Optional;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.atomic.AtomicInteger; @Componentpublic class TaskQueue {    //处理器队列    public static BlockingQueue<TaskProcessor> taskProcessors;    //等待任务队列    public static BlockingQueue<CompileTask> waitTasks;    //处理任务队列    public static BlockingQueue<CompileTask> executeTasks;    //线程池    public static ExecutorService exec;    //初始处理器数(计算机cpu可用线程数)    public static Integer processorNum=Runtime.getRuntime().availableProcessors();         @PostConstruct    public static void initEquipmentAndUsersQueue(){        exec = Executors.newCachedThreadPool();        taskProcessors =new LinkedBlockingQueue<TaskProcessor>(processorNum);        //将空闲的设备放入设备队列中        setFreeDevices(processorNum);        waitTasks =new LinkedBlockingQueue<CompileTask>();        executeTasks=new LinkedBlockingQueue<CompileTask>(processorNum);    }          private static void setFreeDevices(int num) {        //获取可用的设备        for (int i = 0; i < num; i++) {            TaskProcessor dc=new TaskProcessor();            try {                taskProcessors.put(dc);            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }       public static CompileTask getWaitTask(Long clazzId) {        return get(TaskQueue.waitTasks,clazzId);    }     public static CompileTask getExecuteTask(Long clazzId) {        return get(TaskQueue.executeTasks,clazzId);    }      private static CompileTask get(BlockingQueue<CompileTask> users, Long clazzId) {        CompileTask compileTask =null;        if (CollectionUtils.isNotEmpty(users)){            Optional<CompileTask> optional=users.stream().filter(e->e.getClazzId().longValue()==clazzId.longValue()).findFirst();            if(optional.isPresent()){                compileTask =  optional.get();            }        }        return compileTask;    }     public static Integer getSort(Long clazzId) {        AtomicInteger index = new AtomicInteger(-1);        BlockingQueue<CompileTask> compileTasks = TaskQueue.waitTasks;        if (CollectionUtils.isNotEmpty(compileTasks)){            compileTasks.stream()                    .filter(e -> {                        index.getAndIncrement();                        return e.getClazzId().longValue() == clazzId.longValue();                    })                    .findFirst();        }        return index.get();    }     //单位秒    public static int estimatedTime(Long clazzId){        return  estimatedTime(60,getSort(clazzId)+1);    }     //单位秒    public static int estimatedTime(int cellMs,int num){         int a= (num-1)/processorNum;         int b= cellMs*(a+1);        return  b;    }

编译任务类 CompileTask

import lombok.Data;import org.springblade.core.tool.utils.SpringUtil;import org.springblade.GIS.common.enums.DataScheduleEnum;import org.springblade.gis.dynamicds.service.DynamicDataSourceService;import org.springblade.gis.modules.feature.schedule.service.DataScheduleService; import java.util.Date;  @Datapublic class CompileTask implements Runnable {    //当前请求的线程对象    private Long clazzId;    //用户id    private Long userId;    //当前请求的线程对象    private Thread thread;    //绑定处理器    private TaskProcessor taskProcessor;    //任务状态    private Integer status;    //开始时间    private Date startTime;    //结束时间    private Date endTime;     private DataScheduleService dataScheduleService= SpringUtil.getBean(DataScheduleService.class);     private DynamicDataSourceService dataSourceService= SpringUtil.getBean(DynamicDataSourceService.class);     @Override    public void run() {        compile();    }         public void compile() {        try {            //取出一个设备            TaskProcessor taskProcessor = TaskQueue.taskProcessors.take();            //取出一个任务            CompileTask compileTask = TaskQueue.waitTasks.take();            //任务和设备绑定            compileTask.setTaskProcessor(taskProcessor);            //放入            TaskQueue.executeTasks.put(compileTask);            System.out.println(DataScheduleEnum.DEAL_WITH.getName()+" "+userId);            //切换用户数据源            dataSourceService.switchDataSource(userId);            //添加进度            dataScheduleService.addSchedule(clazzId, DataScheduleEnum.DEAL_WITH.getState());        } catch (InterruptedException e) {            System.err.println( e.getMessage());        }    } }

任务处理器 TaskProcessor 

import lombok.Data; import java.util.Date; @Datapublic class TaskProcessor {         public  static Boolean release(CompileTask task)  {        Boolean flag=false;        Thread thread=task.getThread();        synchronized (thread) {            try {                if(null!=task.getTaskProcessor()){                    TaskQueue.taskProcessors.put(task.getTaskProcessor());                    TaskQueue.executeTasks.remove(task);                    task.setEndTime(new Date());                    long intervalMilli = task.getEndTime().getTime() - task.getStartTime().getTime();                    flag=true;                    System.out.println("用户"+task.getClazzId()+"耗时"+intervalMilli+"ms");                }            } catch (InterruptedException e) {                e.printStackTrace();            }            return flag;        }    } }

Controller控制器接口实现

import io.swagger.annotations.api;import io.swagger.annotations.ApiOperation;import org.springblade.core.tool.api.R;import org.springblade.gis.multithread.TaskProcessor;import org.springblade.gis.multithread.TaskQueue;import org.springblade.gis.multithread.CompileTask;import org.springframework.WEB.bind.annotation.*; import java.util.Date;  @RestController@RequestMapping("task")@Api(value = "数据编译任务", tags = "数据编译任务")public class CompileTaskController {     @ApiOperation(value = "添加等待请求 @author Tarzan Liu")    @PostMapping("compile/{clazzId}")    public R<Integer> compile(@PathVariable("clazzId") Long clazzId) {        CompileTask checkUser=TaskQueue.getWaitTask(clazzId);        if(checkUser!=null){            return  R.fail("已经正在排队!");        }        checkUser=TaskQueue.getExecuteTask(clazzId);        if(checkUser!=null){            return  R.fail("正在执行编译!");        }        //获取当前的线程        Thread thread=Thread.currentThread();        //创建当前的用户请求对象        CompileTask compileTask =new CompileTask();        compileTask.setThread(thread);        compileTask.setClazzId(clazzId);        compileTask.setStartTime(new Date());        //将当前用户请求对象放入队列中        try {            TaskQueue.waitTasks.put(compileTask);        } catch (InterruptedException e) {            e.printStackTrace();        }        TaskQueue.exec.execute(compileTask);        return R.data(TaskQueue.waitTasks.size()-1);    }     @ApiOperation(value = "查询当前任务前还有多少任务等待 @author Tarzan Liu")    @PostMapping("sort/{clazzId}")    public R<Integer> sort(@PathVariable("clazzId") Long clazzId) {        return R.data(TaskQueue.getSort(clazzId));    }     @ApiOperation(value = "查询当前任务预估时长 @author Tarzan Liu")    @PostMapping("estimate/time/{clazzId}")    public R<Integer> estimatedTime(@PathVariable("clazzId") Long clazzId) {        return R.data(TaskQueue.estimatedTime(clazzId));    }     @ApiOperation(value = "任务释放 @author Tarzan Liu")    @PostMapping("release/{clazzId}")    public R<Boolean> release(@PathVariable("clazzId") Long clazzId) {        CompileTask task=TaskQueue.getExecuteTask(clazzId);        if(task==null){            return  R.fail("资源释放异常");        }        return R.status(TaskProcessor.release(task));    }     @ApiOperation(value = "执行 @author Tarzan Liu")    @PostMapping("exec")    public R exec() {        Long start=System.currentTimeMillis();        for (Long i = 1L; i < 100; i++) {            compile(i);        }        System.out.println("消耗时间:"+(System.currentTimeMillis()-start)+"ms");        return R.status(true);    }}

接口测试

根据任务id查询该任务前还有多少个任务待执行

JAVA多线程怎么实现用户任务排队并预估排队时长

根据任务id查询该任务预估执行完成的剩余时间,单位秒

JAVA多线程怎么实现用户任务排队并预估排队时长

补充知识

BlockingQueue

BlockingQueue即阻塞队列,它是基于ReentrantLock,依据它的基本原理,我们可以实现Web中的长连接聊天功能,当然其最常用的还是用于实现生产者与消费者模式,大致如下图所示:

JAVA多线程怎么实现用户任务排队并预估排队时长

在Java中,BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于take与put操作的原理,却是类似的。

阻塞与非阻塞

入队

offer(E e):如果队列没满,立即返回true; 如果队列满了,立即返回false-->不阻塞

put(E e):如果队列满了,一直阻塞,直到队列不满了或者线程被中断-->阻塞

offer(E e, long timeout, TimeUnit unit):在队尾插入一个元素,,如果队列已满,则进入等待,直到出现以下三种情况:-->阻塞

被唤醒

等待时间超时

当前线程被中断

出队

poll():如果没有元素,直接返回null;如果有元素,出队

take():如果队列空了,一直阻塞,直到队列不为空或者线程被中断-->阻塞

poll(long timeout, TimeUnit unit):如果队列不空,出队;如果队列已空且已经超时,返回null;如果队列已空且时间未超时,则进入等待,直到出现以下三种情况:

被唤醒

等待时间超时

当前线程被中断 

到此,关于“JAVA多线程怎么实现用户任务排队并预估排队时长”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注编程网网站,小编会继续努力为大家带来更多实用的文章!

--结束END--

本文标题: JAVA多线程怎么实现用户任务排队并预估排队时长

本文链接: https://lsjlt.com/news/301653.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • JAVA多线程怎么实现用户任务排队并预估排队时长
    这篇文章主要介绍“JAVA多线程怎么实现用户任务排队并预估排队时长”,在日常操作中,相信很多人在JAVA多线程怎么实现用户任务排队并预估排队时长问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”JAVA多线程怎么...
    99+
    2023-06-22
  • JAVA多线程之实现用户任务排队并预估排队时长
    目录实现流程排队论简介代码具体实现接口测试补充知识BlockingQueue阻塞与非阻塞 实现流程 初始化一定数量的任务处理线程和缓存线程池,用户每次调用接口,开启一个线程处理。 ...
    99+
    2024-04-02
  • 怎么在Java中实现多线程排序
    这期内容当中小编将会给大家带来有关怎么在Java中实现多线程排序,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。java基本数据类型有哪些Java的基本数据类型分为:1、整数类型,用来表示整数的数据类型。2...
    99+
    2023-06-14
  • java中的消息队列怎么利用多线程实现
    java中的消息队列怎么利用多线程实现?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。定义一个队列缓存池: //static修饰的成员变量和成员方法独立于该类的任何对象...
    99+
    2023-05-31
    java 多线程 ava
  • python怎么利用多线程+队列技术爬取中介网互联网网站排行榜
    本篇内容介绍了“python怎么利用多线程+队列技术爬取中介网互联网网站排行榜”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!目标站点分析本次...
    99+
    2023-06-30
  • Java多线程中怎么利用Future实现携带结果的任务
    本篇文章为大家展示了Java多线程中怎么利用Future实现携带结果的任务,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。Future 介绍Future表示异步计算的结果,它提供了检查计算是否完成的方...
    99+
    2023-06-22
  • Java中的多线程回显服务器怎么利用Socket实现
    Java中的多线程回显服务器怎么利用Socket实现?相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。具体如下:需要两个类,一个是EchoServer,代表服务器。另外一个是Ech...
    99+
    2023-05-31
    java socket 多线程
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作