hippo4j 是一个动态管理和监控线程池的开源框架,它有两种运行模式:轻量级依赖配置中心以及无中间件依赖版本。

文档地址参见 https://hippo4j.cn/docs/user_docs/intro

其中无中间件依赖版本支持的功能更丰富,代码也更复杂一些,本文以该版本为例分析 hippo4j 的代码,以供参考。

1. 定义了一个新的线程池类 DynamicThreadPoolExecutor

解析:
直接使用 JUC 的 ThreadPoolExecutor 的问题:Spring 容器关闭的时候可能任务队列里的任务还没处理完,有丢失任务的风险。
为了解决该问题,可以实现 InitializingBean 和 DisposableBean接口,在 bean 初始化、容器关闭时做相应处理。这里的 AbstractDynamicExecutorSupport 即是这样实现的。
部分代码如下:

    @Override
    public void destroy() {
        shutdownSupport();
    }

    public void shutdownSupport() {
        if (log.isInfoEnabled()) {
            log.info("Shutting down ExecutorService" + (this.threadPoolId != null ? " '" + this.threadPoolId + "'" : ""));
        }
        if (this.executor != null) {
            if (this.waitForTasksToCompleteOnShutdown) {
                this.executor.shutdown();
            } else {
                for (Runnable remainingTask : this.executor.shutdownNow()) {
                    cancelRemainingTask(remainingTask);
                }
            }
            awaitTerminationIfNecessary(this.executor);
        }
    }

    protected void cancelRemainingTask(Runnable task) {
        if (task instanceof Future) {
            ((Future<?>) task).cancel(true);
        }
    }

    private void awaitTerminationIfNecessary(ExecutorService executor) {
        if (this.awaitTerminationMillis > 0) {
            try {
                if (!executor.awaitTermination(this.awaitTerminationMillis, TimeUnit.MILLISECONDS)) {
                    if (log.isWarnEnabled()) {
                        log.warn("Timed out while waiting for executor" +
                                (this.threadPoolId != null ? " '" + this.threadPoolId + "'" : "") + " to terminate.");
                    }
                }
            } catch (InterruptedException ex) {
                if (log.isWarnEnabled()) {
                    log.warn("Interrupted while waiting for executor" +
                            (this.threadPoolId != null ? " '" + this.threadPoolId + "'" : "") + " to terminate.");
                }
                Thread.currentThread().interrupt();
            }
        }
    }

可以看到在 bean destroy 时,先检查是否需要等待任务完成再关闭线程池。
DynamicThreadPool 继承了 AbstractDynamicExecutorSupport,主要实现了任务计时和超时触发报警。

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        if (executeTimeOut == null || executeTimeOut <= 0) {
            return;
        }
        startTimeThreadLocal.set(SystemClock.now());
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        Long startTime;
        if ((startTime = startTimeThreadLocal.get()) == null) {
            return;
        }
        try {
            long endTime = SystemClock.now();
            long executeTime;
            boolean executeTimeAlarm = (executeTime = (endTime - startTime)) > executeTimeOut;
            if (executeTimeAlarm && ApplicationContextHolder.getInstance() != null) {
                ThreadPoolNotifyAlarmHandler notifyAlarmHandler = ApplicationContextHolder.getBean(ThreadPoolNotifyAlarmHandler.class);
                if (notifyAlarmHandler != null) {
                    notifyAlarmHandler.asyncSendExecuteTimeOutAlarm(threadPoolId, executeTime, executeTimeOut, this);
                }
            }
        } finally {
            startTimeThreadLocal.remove();
        }
    }

2. DynamicThreadPoolPostProcessor

该类继承了 BeanPostProcessor,在 Bean 初始化前后对 ThreadPoolExecutor 及其子类进行一些处理,主要用来获取线程池对象注册到 server 或 框架内部定义的容器中(如果server没有对应的配置)。主要代码如下:

    protected ThreadPoolExecutor fillPoolAndRegister(DynamicThreadPoolWrapper dynamicThreadPoolWrapper) {
        String threadPoolId = dynamicThreadPoolWrapper.getThreadPoolId();
        ThreadPoolExecutor executor = dynamicThreadPoolWrapper.getExecutor();
        Map<String, String> queryStrMap = new HashMap(3);
        queryStrMap.put(TP_ID, threadPoolId);
        queryStrMap.put(ITEM_ID, properties.getItemId());
        queryStrMap.put(NAMESPACE, properties.getNamespace());
        boolean isSubscribe = false;
        ThreadPoolExecutor newDynamicThreadPoolExecutor = null;
        ThreadPoolParameterInfo threadPoolParameterInfo = new ThreadPoolParameterInfo();
        try {
            Result result = httpAgent.httpGetByConfig(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 5000L);
            if (result.isSuccess() && result.getData() != null) {
                String resultJsonStr = JSONUtil.toJSONString(result.getData());
                if ((threadPoolParameterInfo = JSONUtil.parseObject(resultJsonStr, ThreadPoolParameterInfo.class)) != null) {
                    // Create a thread pool with relevant parameters.
                    // 省略。。。
                }
            } else {
                // DynamicThreadPool configuration undefined in server
                // 省略。。。
        
            }
        } catch (Exception ex) {
            // 省略。。。
        } finally {
            // 省略。。。
        }
        // 线程池对象注册到 hippo4j 框架内部定义的容器中,实际就是个map存储
        GlobalThreadPoolManage.register(dynamicThreadPoolWrapper.getThreadPoolId(), threadPoolParameterInfo, dynamicThreadPoolWrapper);
        return newDynamicThreadPoolExecutor;
    }

3. AbstractRefreshListener 及其子类。

实现了 ApplicationListener,监听事件并做处理,包含四个实现:

  • WebExecutorRefreshListener,监听 Web 容器(Tomcat/Jetty/Undertow) 的线程池配置变更。
  • PlatformsRefreshListener,监听报警平台配置变更。
  • DynamicThreadPoolRefreshListener,监听线程池本身的配置变更。
  • AdapterExecutorsRefreshListener,监听第三方框架线程池变更(Dubbo/RocketMQ 等)。
    代码比较简单,不再单独解析了。

4. 继承 ApplicationRunner/CommandLineRunner 的类

负责容器启动后回调,执行初始化工作,例如 DynamicThreadPoolMonitorExecutor,定时收集监控结果,主要代码如下:

@Slf4j
@RequiredArgsConstructor
public class DynamicThreadPoolMonitorExecutor implements ApplicationRunner {

    private final BootstrapConfigProperties properties;

    private ScheduledThreadPoolExecutor collectExecutor;

    private List<ThreadPoolMonitor> threadPoolMonitors;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        // 省略。。。
        // Get dynamic thread pool monitoring component.
        threadPoolMonitors = new ArrayList<>();
        collectExecutor = new ScheduledThreadPoolExecutor(
                new Integer(1),
                ThreadFactoryBuilder.builder().daemon(true).prefix("client.scheduled.collect.data").build());
        // 配置文件中已有的 collectType 添加到 threadPoolMonitors,此处省略。。。
       
        // 自定义扩展的 monitor 添加到 threadPoolMonitors,此处省略。。。
        
        // Execute dynamic thread pool monitoring component.
        collectExecutor.scheduleWithFixedDelay(
                () -> scheduleRunnable(),
                properties.getInitialDelay(),
                properties.getCollectInterval(),
                TimeUnit.MILLISECONDS);
        if (GlobalThreadPoolManage.getThreadPoolNum() > 0) {
            log.info("Dynamic thread pool: [{}]. The dynamic thread pool starts data collection and reporting.", getThreadPoolNum());
        }
    }

    private void scheduleRunnable() {
        for (ThreadPoolMonitor each : threadPoolMonitors) {
            try {
                each.collect();
            } catch (Exception ex) {
                log.error("Error monitoring the running status of dynamic thread pool. Type: {}", each.getType(), ex);
            }
        }
    }
}

原文地址:http://www.cnblogs.com/ylty/p/16782030.html

1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长! 2. 分享目的仅供大家学习和交流,请务用于商业用途! 3. 如果你也有好源码或者教程,可以到用户中心发布,分享有积分奖励和额外收入! 4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解! 5. 如有链接无法下载、失效或广告,请联系管理员处理! 6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需! 7. 如遇到加密压缩包,默认解压密码为"gltf",如遇到无法解压的请联系管理员! 8. 因为资源和程序源码均为可复制品,所以不支持任何理由的退款兑现,请斟酌后支付下载 声明:如果标题没有注明"已测试"或者"测试可用"等字样的资源源码均未经过站长测试.特别注意没有标注的源码不保证任何可用性