使用Callable进行多线程处理数据。当处理大批量的数据时,若某个线程返回需要的值则终止其他线程并输出结果。下面是我自己结合 Callable具有返回值的线程,和 CyclicBarrier 这里我叫着等待线程(当所有的线程处理完毕才能继续)。

操作步骤:

1、先把所有的线程创建出来后放入线程池中,通过线程池的submit进行提交运行

2、采用无限循环策略,来检测哪些线程已经处理完毕,然后对比数据是否是需要的数据,若不是需要的数据则,继续循环

3、创建一个set集合,用于存放哪些线程对象已经处理完毕,保证不重复,为了判断当没有获取到想要的数据时好结束无限循环

4、若找到所需要的数据,则取消其他未执行完毕的线程,然后表明状态为true,已找到,然后跳出该次循环。

5、然后接着就是处理外出的无限循环了。表明状态为true或者已处理完 全部的线程都要跳出该次循环。

创建一个CyclicBarrier守护线程,在无限循环的外面,个数为2,一个是守护线程等待,一个是运行的即跑腿的。(好像不弄这个跑腿的不行),创建一个空的线程表明已经到达。 外层还有个等待,目的就是等跑腿的执行完回来,大家一起结束,跑腿的不会了就不往下执行

语言表达能力欠佳还望多多见谅。下面代码足以说明了,有个线程延迟10s,若是找到则1s就完成。

package com.xie.test;

import java.util.*;
import java.util.concurrent.*;

/**
 * @Description 测试callback返回
 * @Date 2022-11-09 10:36
 * @Author xie
 */
public class TestCallable {

    public static void main(String[] args) throws Exception {
        TestCallable testCallable = new TestCallable();
        testCallable.myTest();
    }

    public void myTest() throws Exception {
        int taskSize = 100;
        List<Future<Integer>> futureTasks = new ArrayList<>();
        ExecutorService pool = Executors.newFixedThreadPool(taskSize);
        for (int i = 0; i < taskSize; i++) {
            MyCallable callable = new MyCallable(i);
            Future f = pool.submit(callable);
            futureTasks.add(f);
        }
        // 关闭线程池
        pool.shutdown();
        System.out.println("线程关闭了");
        int sum = 0;
        // 获取所有并发任务的运行结果
        int count = 0;
        CyclicBarrier cb = new CyclicBarrier(2);
        Set<Future> futureSet = new LinkedHashSet<>();//防止重复,记录完成的对象
        while (true) {
            boolean flag = false;
            for (Future f : futureTasks) {
                if (f.isDone()) {
                    futureSet.add(f);
                    // 从Future对象上获取任务的返回值,并输出到控制台
                    Map<String, Integer> map = (Map<String, Integer>) f.get();
                    //System.out.println(">>>" + map.get("a"));
                    //int aa = Integer.valueOf(f.get().toString());
                    int aa = Integer.valueOf(map.get("a"));
                    if (aa == 4) {
                        System.out.println(">>>" + f.get().toString());
                        sum = aa;
                        //同时将其他的线程全部取消了
                        for (Future f1 : futureTasks) {
                            f1.cancel(true);
                        }
                        flag = true;
                        break;
                    }

                }


            }
            count = futureSet.size();
            if (flag || count == futureTasks.size()) {
                new Thread(new TempThread(cb)).start();
                break;
            }
        }
        System.out.println("线程池中执行了多少:" + count);
        cb.await();
        System.out.println("wode" + sum);
    }

    class MyCallable implements Callable<Map> {
        int type;

        public MyCallable(int type) {
            this.type = type;
        }

        @Override
        public Map call() throws Exception {
            Random random = new Random();
            int a = random.nextInt(10) + 1;
            //if (type == 2) {
            //    Thread.sleep(10000);
            //} else {
            //    Thread.sleep(1000);
            //}

            Thread.sleep(random.nextInt(1000) + 1);
            System.out.println(Thread.currentThread().getName() + ":随机值" + a + ",类型:" + type);

            Map<String, Integer> map = new HashMap();
            map.put("type", type);
            map.put("a", a);

            return map;
        }
    }

    class TempThread implements Runnable {
        CyclicBarrier cb;

        public TempThread(CyclicBarrier cb) {
            this.cb = cb;
        }

        @Override
        public void run() {
            try {
                cb.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }

}

 

ps:https://blog.csdn.net/wjs040/article/details/96863119

原文地址:http://www.cnblogs.com/mask-xiexie/p/16877779.html

1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长! 2. 分享目的仅供大家学习和交流,请务用于商业用途! 3. 如果你也有好源码或者教程,可以到用户中心发布,分享有积分奖励和额外收入! 4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解! 5. 如有链接无法下载、失效或广告,请联系管理员处理! 6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需! 7. 如遇到加密压缩包,默认解压密码为"gltf",如遇到无法解压的请联系管理员! 8. 因为资源和程序源码均为可复制品,所以不支持任何理由的退款兑现,请斟酌后支付下载 声明:如果标题没有注明"已测试"或者"测试可用"等字样的资源源码均未经过站长测试.特别注意没有标注的源码不保证任何可用性