commit e05d1d1ba8c7c20adb5fb500ead2c814a9cdc94b Author: alex Date: Fri Mar 28 22:59:25 2025 +0800 add ThreadPool / Scheduler code diff --git a/.gitignore b/.gitignore new file mode 100755 index 0000000..fc40060 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.idea +.gradle +bin +build \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..aecfd42 --- /dev/null +++ b/README.md @@ -0,0 +1,25 @@ +## 基础架构服务实现 DEMO + +### 线程池实现 + +1. ThreadSea + +Deepseek 生成版本, 参考了 JDK 的实现, 但是没有完全实现, 只是实现了核心功能: 核心/非核心线程调度, 任务队列, 线程池状态管理, 线程池关闭, 线程池监控等. + +2. ThreadGrok + +Grok 版本, 在 Deepseek 基础上再次精简, 只保留了核心/非核心线程调度功能, 去掉了线程池状态管理, 线程池关闭, 线程池监控等. + +3. ThreadLake + +自己实现的版本, 同样只保留了核心/非核心线程调度功能 + +### 调度器实现 + +1. SeqenceScheduler + +调度任务, 使用线程池并行顺序化执行任务 + +2. CombineScheduler + +调度任务, 使用线程池合并同时间的任务一起执行 diff --git a/build.gradle b/build.gradle new file mode 100755 index 0000000..ce9379f --- /dev/null +++ b/build.gradle @@ -0,0 +1,42 @@ +buildscript { + ext { + kotlin_version = '1.8.0' + } + + repositories { + mavenLocal() + maven { url 'https://maven.aliyun.com/repository/gradle-plugin' } + maven { url 'https://maven.aliyun.com/repository/central' } + maven { url 'https://maven.aliyun.com/repository/public' } + mavenCentral() + } + + dependencies { + classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version" + } +} + +apply plugin: 'idea' +apply plugin: 'java' +apply plugin: 'kotlin' + +ext { + spring_version = "2.6.8" +} + +group = 'pangu' +version = '0.1.0' + +repositories { + mavenLocal() + maven { url 'https://maven.aliyun.com/repository/gradle-plugin' } + maven { url 'https://maven.aliyun.com/repository/central' } + maven { url 'https://maven.aliyun.com/repository/public' } + mavenCentral() +} + +dependencies { + implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version" + implementation "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version" + testImplementation group: 'junit', name: 'junit', version: '4.12' +} diff --git a/gradle.properties b/gradle.properties new file mode 100755 index 0000000..7fc6f1f --- /dev/null +++ b/gradle.properties @@ -0,0 +1 @@ +kotlin.code.style=official diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar new file mode 100755 index 0000000..249e583 Binary files /dev/null and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties new file mode 100755 index 0000000..6ebe425 --- /dev/null +++ b/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,6 @@ +#Tue May 21 20:15:48 CST 2024 +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-7.4.2-bin.zip +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew new file mode 100755 index 0000000..1b6c787 --- /dev/null +++ b/gradlew @@ -0,0 +1,234 @@ +#!/bin/sh + +# +# Copyright © 2015-2021 the original authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +############################################################################## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# +############################################################################## + +# Attempt to set APP_HOME + +# Resolve links: $0 may be a link +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac +done + +APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit + +APP_NAME="Gradle" +APP_BASE_NAME=${0##*/} + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD=maximum + +warn () { + echo "$*" +} >&2 + +die () { + echo + echo "$*" + echo + exit 1 +} >&2 + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD=$JAVA_HOME/jre/sh/java + else + JAVACMD=$JAVA_HOME/bin/java + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD=java + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac +fi + +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. + +# For Cygwin or MSYS, switch paths to Windows format before running java +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + + # Now convert the arguments - kludge to limit ourselves to /bin/sh + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) + fi + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg + done +fi + +# Collect all arguments for the java command; +# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of +# shell script including quotes and variable substitutions, so put them in +# double quotes to make sure that they get re-expanded; and +# * put everything else in single quotes, so that it's not re-expanded. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -classpath "$CLASSPATH" \ + org.gradle.wrapper.GradleWrapperMain \ + "$@" + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# + +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' + +exec "$JAVACMD" "$@" diff --git a/gradlew.bat b/gradlew.bat new file mode 100755 index 0000000..107acd3 --- /dev/null +++ b/gradlew.bat @@ -0,0 +1,89 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto execute + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/settings.gradle b/settings.gradle new file mode 100755 index 0000000..e37a0cd --- /dev/null +++ b/settings.gradle @@ -0,0 +1,2 @@ +rootProject.name = 'InfrastructureDemo' + diff --git a/src/main/kotlin/com/pangu/core/scheduler/CombineScheduler.kt b/src/main/kotlin/com/pangu/core/scheduler/CombineScheduler.kt new file mode 100644 index 0000000..2b69251 --- /dev/null +++ b/src/main/kotlin/com/pangu/core/scheduler/CombineScheduler.kt @@ -0,0 +1,37 @@ +package com.pangu.core.scheduler + +import java.util.* +import java.util.concurrent.* + + +class CombineScheduler(private val combineJob: ICombineJob) { + private val triggers = PriorityBlockingQueue() + private val triggerJob = ConcurrentHashMap() + private val executor = ThreadPoolExecutor(6, 12, 60, TimeUnit.SECONDS, LinkedBlockingQueue()) + private val jobPool = mutableSetOf() + + fun addJob(trigger: Trigger, job: IJob) { + triggers.add(trigger) + triggerJob[trigger] = job + } + + fun execute() { + while (true) { + var trigger: Trigger = triggers.peek() ?: return + var span = trigger.nextExecuteTime - Date().time + while (span <= 0) { + trigger = triggers.take() + trigger.execute() + triggers.add(trigger) + jobPool.add(triggerJob[trigger]!!) + trigger = triggers.peek() + span = trigger.nextExecuteTime - Date().time + } + if (jobPool.isNotEmpty()) { + executor.execute(combineJob.combine(*jobPool.toTypedArray())) + jobPool.clear() + } + Thread.sleep(span) + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/pangu/core/scheduler/ICombineJob.kt b/src/main/kotlin/com/pangu/core/scheduler/ICombineJob.kt new file mode 100644 index 0000000..952d445 --- /dev/null +++ b/src/main/kotlin/com/pangu/core/scheduler/ICombineJob.kt @@ -0,0 +1,5 @@ +package com.pangu.core.scheduler + +fun interface ICombineJob { + fun combine(vararg jobs: IJob): Runnable +} \ No newline at end of file diff --git a/src/main/kotlin/com/pangu/core/scheduler/IJob.kt b/src/main/kotlin/com/pangu/core/scheduler/IJob.kt new file mode 100644 index 0000000..cc3fad4 --- /dev/null +++ b/src/main/kotlin/com/pangu/core/scheduler/IJob.kt @@ -0,0 +1,4 @@ +package com.pangu.core.scheduler + +fun interface IJob : Runnable { +} \ No newline at end of file diff --git a/src/main/kotlin/com/pangu/core/scheduler/SequenceScheduler.kt b/src/main/kotlin/com/pangu/core/scheduler/SequenceScheduler.kt new file mode 100644 index 0000000..954f42a --- /dev/null +++ b/src/main/kotlin/com/pangu/core/scheduler/SequenceScheduler.kt @@ -0,0 +1,32 @@ +package com.pangu.core.scheduler + +import java.util.* +import java.util.concurrent.* + + +class SequenceScheduler { + private val triggers = PriorityBlockingQueue() + private val triggerJob = ConcurrentHashMap() + private val executor = ThreadPoolExecutor(6, 12, 60, TimeUnit.SECONDS, LinkedBlockingQueue()) + + fun addJob(trigger: Trigger, job: IJob) { + triggers.add(trigger) + triggerJob[trigger] = job + } + + fun execute() { + while (true) { + var trigger: Trigger = triggers.peek() ?: return + var span = trigger.nextExecuteTime - Date().time + while (span <= 0) { + trigger = triggers.take() + trigger.execute() + triggers.add(trigger) + executor.execute(triggerJob[trigger]!!) + trigger = triggers.peek() + span = trigger.nextExecuteTime - Date().time + } + Thread.sleep(span) + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/pangu/core/scheduler/Trigger.kt b/src/main/kotlin/com/pangu/core/scheduler/Trigger.kt new file mode 100644 index 0000000..4a61c57 --- /dev/null +++ b/src/main/kotlin/com/pangu/core/scheduler/Trigger.kt @@ -0,0 +1,32 @@ +package com.pangu.core.scheduler + +import java.util.* + +class Trigger( + var interval: Long = 0, + var startTime: Long = Date().time, + var endTime: Long = 0, +) : Comparable { + private var nextTime = startTime + + init { + val now = System.currentTimeMillis() + while (now > nextTime) { + nextTime += interval + } + } + + val nextExecuteTime: Long + get() = nextTime + + fun execute(): Boolean { + nextTime += interval + return endTime == 0L || nextTime <= endTime + } + + + override fun compareTo(other: Trigger): Int { +// println("this interval $interval, nextExecuteTime $nextExecuteTime; other interval ${other.interval}, nextExecuteTime ${other.nextExecuteTime}") + return if (this.nextExecuteTime > other.nextExecuteTime) 1 else -1 + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/pangu/core/threads/ThreadGrok.kt b/src/main/kotlin/com/pangu/core/threads/ThreadGrok.kt new file mode 100644 index 0000000..5cd384e --- /dev/null +++ b/src/main/kotlin/com/pangu/core/threads/ThreadGrok.kt @@ -0,0 +1,216 @@ +package com.pangu.core.threads + +import java.util.* +import java.util.concurrent.BlockingQueue +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.RejectedExecutionException +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong + +/** + * 自定义线程池实现类,类似于Java中的ThreadPoolExecutor + * + * @param corePoolSize 核心线程数,即使空闲也会保留的线程数量 + * @param maxPoolSize 最大线程数,线程池允许创建的最大线程数量 + * @param keepAliveTime 非核心线程空闲时的存活时间 + * @param timeUnit 存活时间的时间单位 + * @param taskQueue 用于存放待执行任务的工作队列 + */ +class ThreadGrok( + private val corePoolSize: Int = 1, + private val maxPoolSize: Int = 1, + private val keepAliveTime: Long = 1000L, + private val timeUnit: TimeUnit = TimeUnit.MILLISECONDS, + private val taskQueue: BlockingQueue = LinkedBlockingQueue() +) { + // 当前工作线程数的原子计数器 + private val workerCount = AtomicInteger(0) + + // 工作线程集合,使用同步包装保证线程安全 + private val workers = Collections.synchronizedSet(HashSet()) + + // 总的完成任务数量 + private var supportCompletedTaskCount = AtomicLong(0) + + // 是否允许核心线程超时退出,volatile保证可见性 + @Volatile + var allowCoreThreadTimeOut = false + + val poolSize: Int get() = workerCount.get() + val completedTaskCount: Long get() = (workers.sumOf { it.completedTasks } + supportCompletedTaskCount.get()) + val taskCount: Int get() = taskQueue.size + + // 初始化时进行参数校验 + init { + if (corePoolSize < 0 || maxPoolSize <= 0 || maxPoolSize < corePoolSize || keepAliveTime < 0) { + throw IllegalArgumentException("Invalid configuration for ThreadOcean") + } + } + + /** + * 执行任务的方法 + * @param task 要执行的任务 + */ + fun execute(task: Runnable) { + val wc = workerCount.get() + + // 1. 如果当前线程数小于核心线程数,直接创建新线程(核心线程) + if (wc < corePoolSize) { + addWorker(task, true) + return + } + + // 2. 尝试将任务加入工作队列 + if (taskQueue.offer(task)) { + // 任务加入队列成功,检查是否需要创建一个空任务的线程来处理队列 + // 修复:不再将同一个任务传给新线程,而是传null作为首任务 + addWorker(null, false) + return + } + + // 3. 如果队列已满且线程数小于最大线程数,创建新线程(非核心线程) + if (wc < maxPoolSize) { + addWorker(task, false) + return + } + + // 4. 如果队列已满且线程数已达最大值,可以抛出拒绝异常(这里被注释掉了) + throw RejectedExecutionException("Task rejected from ThreadGrok") + } + + /** + * 添加工作线程 + * @param firstTask 新线程首先执行的任务,可以为null + * @param core 是否为核心线程 + */ + private fun addWorker(firstTask: Runnable?, core: Boolean) { + val wc = workerCount.get() + val limit = if (core) corePoolSize else maxPoolSize + + // 检查是否超过限制 + if (wc >= limit) { + return + } + + // 使用CAS增加工作线程计数 + if (!workerCount.compareAndSet(wc, wc + 1)) { + return + } + + var worker: Worker? = null + var workerStarted = false + try { + // 创建新工作线程 + worker = Worker(firstTask) + workers.add(worker) + worker.start() + workerStarted = true + } finally { + // 如果线程启动失败,进行清理 + if (!workerStarted) { + workers.remove(worker) + workerCount.decrementAndGet() + } + } + } + + /** + * 从队列中获取任务 + * @return 获取到的任务,如果返回null表示工作线程应该退出 + */ + private fun getTask(): Runnable? { + /** + * *** 设计原因 *** + * 1. 综合决策机制:线程是否应该退出不仅取决于超时,还需要考虑: + * - 当前线程数是否超过最大值 + * - 是否至少保留一个线程处理队列中的任务 + * - 任务队列是否为空 + * 2. 保证服务质量:即使某个线程获取任务超时,也不会立即退出,而是在下一次循环中综合评估。这样可以避免在临时任务少的情况下过早释放线程。 + * 3. 智能化线程管理:通过(wc > 1 || taskQueue.isEmpty())条件,确保当队列有任务时至少保留一个线程可用,防止所有线程都因超时而退出。 + * + * *** 实际应用场景 *** + * 当线程池处于低负载状态时: + * - 如果允许核心线程超时,那么核心线程在keepAliveTime时间内没有新任务会尝试退出 + * - 但不会让所有线程都退出(特别是当队列中还有任务时) + * - 这种机制确保了线程池既能在低负载时释放资源,又能在任务到来时及时响应 + * 这种设计比简单地"超时即退出"更复杂,但能更好地平衡资源利用和服务响应能力。 + */ + var timedOut = false // 上次poll是否超时 + + while (true) { + val wc = workerCount.get() + // 判断是否需要进行超时控制 + val timed = allowCoreThreadTimeOut || wc > corePoolSize + + // 检查线程是否需要回收: + // 1. 线程数超过最大值 或 (允许超时且已超时) + // 2. 线程数大于1 或 工作队列为空 + if ((wc > maxPoolSize || (timed && timedOut)) && (wc > 1 || taskQueue.isEmpty())) { + // 使用CAS减少工作线程计数 + if (workerCount.compareAndSet(wc, wc - 1)) { + return null // 返回null表示工作线程应该退出 + } + continue + } + + try { + // 根据是否允许超时选择不同的获取任务方式 + val r = if (timed) { + // 带超时的poll + taskQueue.poll(keepAliveTime, timeUnit) + } else { + // 不带超时的take + taskQueue.take() + } + if (r != null) return r + timedOut = true // 获取任务超时 + } catch (e: InterruptedException) { + timedOut = false + } + } + } + + /** + * 工作线程内部类,封装了实际执行任务的线程 + */ + private inner class Worker(var firstTask: Runnable?) : Runnable { + @Volatile + var completedTasks: Long = 0 // 已完成任务计数 + + // 实际执行任务的线程 + val thread: Thread = Thread(this) + + // 启动工作线程 + fun start() = thread.start() + + override fun run() { + var task = firstTask + firstTask = null + var completedAbruptly = true // 标记是否异常完成 + + try { + // 循环获取并执行任务 + while (task != null || (getTask().also { task = it }) != null) { + try { + task?.run() // 执行任务 + completedTasks++ + } catch (_: Exception) { + // 捕获任务执行过程中的异常,防止影响线程 + } finally { + task = null // 清理任务引用 + } + } + completedAbruptly = false // 正常退出循环 + } finally { + // 清理工作 + if (completedAbruptly) { + workerCount.decrementAndGet() + } + // 累加完成的任务数量到总的完成任务数量 + supportCompletedTaskCount.addAndGet(completedTasks) + workers.remove(this) + } + } + } +} diff --git a/src/main/kotlin/com/pangu/core/threads/ThreadLake.kt b/src/main/kotlin/com/pangu/core/threads/ThreadLake.kt new file mode 100644 index 0000000..e0d375c --- /dev/null +++ b/src/main/kotlin/com/pangu/core/threads/ThreadLake.kt @@ -0,0 +1,193 @@ +package com.pangu.core.threads + +import java.util.* +import java.util.concurrent.BlockingQueue +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.RejectedExecutionException +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong + +/** + * 自定义线程池实现类,支持核心线程、最大线程和任务队列配置 + * + * @param corePoolSize 核心线程数,即使空闲也会保留的线程数量 + * @param maxPoolSize 最大线程数,线程池允许创建的最大线程数量 + * @param keepAliveTime 非核心线程空闲时的存活时间(单位由timeUnit指定) + * @param timeUnit 存活时间的时间单位 + * @param taskQueue 用于存放待执行任务的工作队列 + */ +class ThreadLake( + private val corePoolSize: Int = 1, + private val maxPoolSize: Int = 1, + private val keepAliveTime: Long = 1000L, + private val timeUnit: TimeUnit = TimeUnit.MILLISECONDS, + private val taskQueue: BlockingQueue = LinkedBlockingQueue() +) { + // 当前工作线程数的原子计数器 + private val workerCount = AtomicInteger(0) + + // 工作线程集合,使用同步包装保证线程安全 + private val workers = Collections.synchronizedSet(HashSet()) + + // 已完成任务总数的计数器(用于已终止线程的任务统计) + private var supportCompletedTaskCount = AtomicLong(0) + + /** + * 获取已完成任务总数(包含当前活跃线程和已终止线程完成的任务) + */ + val completedTaskCount: Long get() = (workers.sumOf { it.completedTasks } + supportCompletedTaskCount.get()) + + // 是否允许核心线程超时退出,volatile保证可见性 + @Volatile + var allowCoreThreadTimeOut = false + + // 初始化时进行参数校验 + init { + if (corePoolSize < 0 || maxPoolSize <= 0 || maxPoolSize < corePoolSize || keepAliveTime < 0) { + throw IllegalArgumentException("Invalid configuration for ThreadOcean") + } + } + + /** + * 执行任务的方法 + * @param task 要执行的任务 + * @throws RejectedExecutionException 当线程池已满且队列已满时抛出 + */ + fun execute(task: Runnable) { + val wc = workerCount.get() + + // 1. 如果当前线程数小于核心线程数,直接创建新线程(核心线程) + if (wc < corePoolSize) { + addWorker(task, true) + return + } + + // 2. 尝试将任务加入工作队列 + if (taskQueue.offer(task)) { + // 如果队列中有空闲线程,任务会被自动处理 + return + } + + // 3. 如果队列已满且线程数小于最大线程数,创建新线程(非核心线程) + if (wc < maxPoolSize) { + addWorker(task, false) + return + } + + // 4. 如果队列已满且线程数已达最大值,抛出拒绝异常 +// throw RejectedExecutionException("Task rejected from ThreadLake") + } + + /** + * 添加工作线程 + * @param task 新线程首先执行的任务 + * @param isCore 是否为核心线程 + */ + private fun addWorker(task: Runnable, isCore: Boolean) { + val wc = workerCount.get() + val limit = if (isCore) corePoolSize else maxPoolSize + + // 检查是否超过限制 + if (wc >= limit) { + return + } + + // 使用CAS增加工作线程计数 + if (!workerCount.compareAndSet(wc, wc + 1)) { + return + } + + // 创建并启动工作线程 + val worker = Worker(task) + workers.add(worker) + worker.start() + } + + /** + * 从队列中获取任务 + * @return 获取到的任务,如果返回null表示工作线程应该退出 + */ + private fun getTask(): Runnable? { + /** + * *** 设计原因 *** + * 1. 综合决策机制:线程是否应该退出不仅取决于超时,还需要考虑: + * - 当前线程数是否超过最大值 + * - 是否至少保留一个线程处理队列中的任务 + * - 任务队列是否为空 + * 2. 保证服务质量:即使某个线程获取任务超时,也不会立即退出,而是在下一次循环中综合评估。这样可以避免在临时任务少的情况下过早释放线程。 + * 3. 智能化线程管理:通过(wc > 1 || taskQueue.isEmpty())条件,确保当队列有任务时至少保留一个线程可用,防止所有线程都因超时而退出。 + * + * *** 实际应用场景 *** + * 当线程池处于低负载状态时: + * - 如果允许核心线程超时,那么核心线程在keepAliveTime时间内没有新任务会尝试退出 + * - 但不会让所有线程都退出(特别是当队列中还有任务时) + * - 这种机制确保了线程池既能在低负载时释放资源,又能在任务到来时及时响应 + * 这种设计比简单地"超时即退出"更复杂,但能更好地平衡资源利用和服务响应能力。 + */ + var timedOut = false // 上次poll是否超时 + + while (true) { + val wc = workerCount.get() + // 判断是否需要进行超时控制 + val timed = allowCoreThreadTimeOut || wc > corePoolSize + + // 检查线程是否需要回收: + // 1. 线程数超过最大值 或 (允许超时且已超时) + // 2. 线程数大于1 或 工作队列为空 + if ((wc > maxPoolSize || (timed && timedOut)) && (wc > corePoolSize || taskQueue.isEmpty())) { + return null // 返回null表示工作线程应该退出 + } + + try { + // 根据是否允许超时选择不同的获取任务方式 + val r = if (timed) { + // 带超时的poll + taskQueue.poll(keepAliveTime, timeUnit) + } else { + // 不带超时的take(核心线程默认使用) + taskQueue.take() + } + if (r != null) return r + timedOut = true // 获取任务超时 + } catch (e: InterruptedException) { + timedOut = false + } + } + } + + /** + * 工作线程内部类,封装了实际执行任务的线程 + * 继承自Thread类,直接作为线程使用 + */ + private inner class Worker(var firstTask: Runnable?) : Thread() { + // 当前Worker完成的任务数量(volatile保证可见性) + @Volatile + var completedTasks: Long = 0 + + override fun run() { + var task = firstTask + firstTask = null + + try { + // 循环获取并执行任务 + while (task != null || (getTask().also { task = it }) != null) { + try { + task?.run() // 执行任务 + completedTasks++ + } catch (_: Exception) { + // 捕获任务执行过程中的异常,防止线程意外退出 + } finally { + task = null // 清理任务引用 + } + } + } finally { + // 线程退出前的清理工作 + workerCount.decrementAndGet() + workers.remove(this) + // 将当前Worker完成的任务数累加到总计数器 + supportCompletedTaskCount.addAndGet(completedTasks) + } + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/pangu/core/threads/ThreadOcean.kt b/src/main/kotlin/com/pangu/core/threads/ThreadOcean.kt new file mode 100644 index 0000000..9378be3 --- /dev/null +++ b/src/main/kotlin/com/pangu/core/threads/ThreadOcean.kt @@ -0,0 +1,361 @@ +package com.pangu.core.threads + +import java.util.* +import java.util.concurrent.* +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.locks.ReentrantLock +import java.util.logging.Level +import java.util.logging.Logger + +class ThreadOcean( + private val corePoolSize: Int = 1, + private val maxPoolSize: Int = 1, + private val keepAliveTime: Long = 1000L, + private val timeUnit: TimeUnit = TimeUnit.MILLISECONDS, + private val taskQueue: BlockingQueue = LinkedBlockingQueue() +) { + + private val COUNT_BITS = 29 + private val CAPACITY = (1 shl COUNT_BITS) - 1 + + // 运行状态存储在高3位 + private val RUNNING = -1 shl COUNT_BITS + private val SHUTDOWN = 0 shl COUNT_BITS + private val STOP = 1 shl COUNT_BITS + private val TIDYING = 2 shl COUNT_BITS + private val TERMINATED = 3 shl COUNT_BITS + + // 使用AtomicInteger同时保存线程池状态和工作线程数 + private val ctl = AtomicInteger(ctlOf(RUNNING, 0)) + private val workers = Collections.synchronizedSet(HashSet()) + private val mainLock = ReentrantLock() + private val termination = mainLock.newCondition() + + @Volatile + private var isShutdown = false + + @Volatile + var allowCoreThreadTimeOut = false + + init { + require(corePoolSize >= 0 && maxPoolSize > 0 && maxPoolSize >= corePoolSize && keepAliveTime >= 0) { + "Invalid configuration for ThreadOcean" + } + } + + val poolSize: Int get() = workerCountOf(ctl.get()) + val activeCount: Int get() = workers.count { it.isLocked } + val completedTaskCount: Long get() = workers.sumOf { it.completedTasks } + val queueSize: Int get() = taskQueue.size + + @Throws(RejectedExecutionException::class) + fun execute(task: Runnable) { + if (isShutdown) { + throw RejectedExecutionException("ThreadOcean has been shut down") + } + + val c = ctl.get() + when { + // 工作线程数 < corePoolSize,创建新核心线程 + workerCountOf(c) < corePoolSize -> { + if (addWorker(task, true)) return + // 如果添加失败,重新检查状态 + } + // 任务可以入队 + taskQueue.offer(task) -> { + val recheck = ctl.get() + // 双重检查,防止在入队后线程池关闭或线程死亡 + if (isShutdown && remove(task)) { + throw RejectedExecutionException("ThreadOcean has been shut down") + } + if (workerCountOf(recheck) == 0) { + addWorker(null, false) + } + return + } + // 队列已满,尝试创建非核心线程 + workerCountOf(c) < maxPoolSize -> { + if (addWorker(task, false)) return + } + } + throw RejectedExecutionException("Task rejected from ThreadOcean") + } + + private fun addWorker(firstTask: Runnable?, core: Boolean): Boolean { + retry@ while (true) { + val c = ctl.get() + if (runStateAtLeast(c, SHUTDOWN)) { + return false + } + + val wc = workerCountOf(c) + val limit = if (core) corePoolSize else maxPoolSize + if (wc >= limit) { + return false + } + + if (compareAndIncrementWorkerCount(c)) { + break + } + } + + var worker: Worker? = null + var workerStarted = false + try { + worker = Worker(firstTask) + workers.add(worker) + worker.start() + workerStarted = true + } finally { + if (!workerStarted) { + addWorkerFailed(worker) + } + } + return workerStarted + } + + private fun addWorkerFailed(w: Worker?) { + mainLock.lock() + try { + w?.let { workers.remove(it) } + decrementWorkerCount() + tryTerminate() + } finally { + mainLock.unlock() + } + } + + private fun getTask(): Runnable? { + var timedOut = false + + retry@ while (true) { + val c = ctl.get() + + // 检查线程池状态 + if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || taskQueue.isEmpty())) { + decrementWorkerCount() + return null + } + + val wc = workerCountOf(c) + + // 是否允许超时(非核心线程或允许核心线程超时) + val timed = allowCoreThreadTimeOut || wc > corePoolSize + + if ((wc > maxPoolSize || (timed && timedOut)) && (wc > 1 || taskQueue.isEmpty())) { + if (compareAndDecrementWorkerCount(c)) { + return null + } + continue@retry + } + + try { + val r = if (timed) { + taskQueue.poll(keepAliveTime, timeUnit) + } else { + taskQueue.take() + } + if (r != null) return r + timedOut = true + } catch (e: InterruptedException) { + timedOut = false + } + } + } + + fun shutdown() { + mainLock.lock() + try { + advanceRunState(SHUTDOWN) + interruptIdleWorkers() + } finally { + mainLock.unlock() + } + tryTerminate() + } + + fun shutdownNow(): List { + val tasks: List + mainLock.lock() + try { + advanceRunState(STOP) + interruptWorkers() + tasks = drainQueue() + } finally { + mainLock.unlock() + } + tryTerminate() + return tasks + } + + private fun interruptIdleWorkers() { + mainLock.lock() + try { + workers.forEach { worker -> + if (!worker.isLocked) { + worker.interrupt() + } + } + } finally { + mainLock.unlock() + } + } + + private fun interruptWorkers() { + mainLock.lock() + try { + workers.forEach { it.interrupt() } + } finally { + mainLock.unlock() + } + } + + private fun drainQueue(): List { + val taskList = ArrayList() + taskQueue.drainTo(taskList) + if (!taskQueue.isEmpty()) { + taskQueue.forEach { taskList.add(it) } + taskQueue.clear() + } + return taskList + } + + private fun tryTerminate() { + while (true) { + val c = ctl.get() + if (isRunning(c) || + runStateAtLeast(c, TIDYING) || + (runStateOf(c) == SHUTDOWN && !taskQueue.isEmpty()) + ) { + return + } + + if (workerCountOf(c) != 0) { + interruptIdleWorkers() + return + } + + mainLock.lock() + try { + if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { + try { + // 终止钩子,可以在这里添加清理逻辑 + } finally { + ctl.set(ctlOf(TERMINATED, 0)) + termination.signalAll() + } + return + } + } finally { + mainLock.unlock() + } + } + } + + fun awaitTermination(timeout: Long, unit: TimeUnit): Boolean { + var nanos = unit.toNanos(timeout) + mainLock.lock() + try { + while (true) { + if (runStateAtLeast(ctl.get(), TERMINATED)) { + return true + } + if (nanos <= 0) { + return false + } + nanos = termination.awaitNanos(nanos) + } + } finally { + mainLock.unlock() + } + } + + private inner class Worker(var firstTask: Runnable?) : Runnable { + @Volatile + var completedTasks: Long = 0 + val thread: Thread = Thread(this) + + @Volatile + var isLocked = false + + fun start() = thread.start() + fun interrupt() = thread.interrupt() + fun join(millis: Long) = thread.join(millis) + + override fun run() { + runWorker(this) + } + } + + private fun runWorker(w: Worker) { + var task = w.firstTask + w.firstTask = null + var completedAbruptly = true + try { + while (task != null || (getTask().also { task = it }) != null) { + w.isLocked = true + try { + task?.run() + w.completedTasks++ + } catch (e: Exception) { + Logger.getLogger(ThreadOcean::class.java.name).log(Level.SEVERE, "Task execution failed", e) + } finally { + task = null + w.isLocked = false + } + } + completedAbruptly = false + } finally { + processWorkerExit(w, completedAbruptly) + } + } + + private fun processWorkerExit(w: Worker, completedAbruptly: Boolean) { + if (completedAbruptly) { + decrementWorkerCount() + } + + mainLock.lock() + try { + workers.remove(w) + tryTerminate() + } finally { + mainLock.unlock() + } + } + + private fun remove(task: Runnable?): Boolean { + return taskQueue.remove(task) + } + + + private fun runStateOf(c: Int) = c and (CAPACITY.inv()) + private fun workerCountOf(c: Int) = c and CAPACITY + private fun ctlOf(rs: Int, wc: Int) = rs or wc + + private fun runStateAtLeast(c: Int, s: Int) = c >= s + private fun isRunning(c: Int) = c < SHUTDOWN + + private fun advanceRunState(targetState: Int) { + while (true) { + val c = ctl.get() + if (runStateAtLeast(c, targetState) || + ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))) + ) { + break + } + } + } + + private fun compareAndIncrementWorkerCount(expect: Int): Boolean { + return ctl.compareAndSet(expect, expect + 1) + } + + private fun compareAndDecrementWorkerCount(expect: Int): Boolean { + return ctl.compareAndSet(expect, expect - 1) + } + + private fun decrementWorkerCount() { + ctl.addAndGet(-1) + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/pangu/test/SchedulerTest.kt b/src/test/kotlin/com/pangu/test/SchedulerTest.kt new file mode 100644 index 0000000..60d84f4 --- /dev/null +++ b/src/test/kotlin/com/pangu/test/SchedulerTest.kt @@ -0,0 +1,59 @@ +package com.pangu.test + +import com.pangu.core.scheduler.* +import junit.framework.TestCase +import java.text.SimpleDateFormat +import java.util.* + + +class SchedulerTest : TestCase() { + fun testSequenceScheduler() { + val formatter = SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS") + val onetimeScheduler = SequenceScheduler() + + val calendar = Calendar.getInstance() + // 设置自定义时间 + calendar[2025, Calendar.MARCH, 28, 0, 0] = 0 + calendar.set(Calendar.MILLISECOND, 0) + val startTime = calendar.timeInMillis + onetimeScheduler.addJob(Trigger(15000, startTime)) { + println("15s - ${formatter.format(Date())} hello") + } + onetimeScheduler.addJob(Trigger(20000, startTime)) { + println("20s - ${formatter.format(Date())} hello") + } + onetimeScheduler.addJob(Trigger(25000, startTime)) { + println("25s - ${formatter.format(Date())} hello") + } + onetimeScheduler.execute() + } + + fun testCombineScheduler() { + val formatter = SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS") + val onetimeScheduler = CombineScheduler { jobs -> + Runnable { + println("---> Combine executed start <---") + jobs.forEach { + it.run() + } + println("---> Combine executed end <---\n") + } + } + + val calendar = Calendar.getInstance() + // 设置自定义时间 + calendar[2025, Calendar.MARCH, 28, 0, 0] = 0 + calendar.set(Calendar.MILLISECOND, 0) + val startTime = calendar.timeInMillis + onetimeScheduler.addJob(Trigger(5000, startTime)) { + println("05s - ${formatter.format(Date())} hello") + } + onetimeScheduler.addJob(Trigger(10000, startTime)) { + println("10s - ${formatter.format(Date())} hello") + } + onetimeScheduler.addJob(Trigger(15000, startTime)) { + println("15s - ${formatter.format(Date())} hello") + } + onetimeScheduler.execute() + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/pangu/test/ThreadPoolTest.kt b/src/test/kotlin/com/pangu/test/ThreadPoolTest.kt new file mode 100644 index 0000000..ee22da9 --- /dev/null +++ b/src/test/kotlin/com/pangu/test/ThreadPoolTest.kt @@ -0,0 +1,93 @@ +package com.pangu.test + +import com.pangu.core.threads.ThreadGrok +import com.pangu.core.threads.ThreadLake +import com.pangu.core.threads.ThreadOcean +import junit.framework.TestCase +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.locks.LockSupport + + +class ThreadPoolTest : TestCase() { + fun testThreadOcean() { + val i = AtomicLong(1000000L) + val threadOcean = ThreadOcean( + 2, 5, 1000L, TimeUnit.MILLISECONDS, + LinkedBlockingQueue(100) + ) + + val start = System.currentTimeMillis() + repeat(1000) { _ -> + threadOcean.execute { + repeat(1000) { _ -> + i.decrementAndGet() + } + } + Thread.sleep(10) + } + while (true) { + if (threadOcean.completedTaskCount == 1000L) { + break + } + Thread.sleep(10) + } + println("结果: $i, 执行时间: ${System.currentTimeMillis() - start}ms") + threadOcean.shutdown() + println("") + } + + fun testThreadLake() { + val i = AtomicLong(1000000L) + val threadLake = ThreadLake( + 2, 5, 1000L, TimeUnit.MILLISECONDS, + LinkedBlockingQueue(40) + ) + + val start = System.currentTimeMillis() + + repeat(1000) { _ -> + threadLake.execute { + repeat(1000) { _ -> + i.decrementAndGet() + LockSupport.parkNanos(1) + } + } + Thread.sleep(10) + } + while (true) { + if (threadLake.completedTaskCount == 1000L) { + break + } + Thread.sleep(10) + } + println("结果: $i, 执行时间: ${System.currentTimeMillis() - start}ms") + } + + fun testThreadGrok() { + val i = AtomicLong(1000000L) + val threadGrok = ThreadGrok( + 2, 5, 1000L, TimeUnit.MILLISECONDS, + LinkedBlockingQueue(100) + ) + + val start = System.currentTimeMillis() + repeat(1000) { _ -> + threadGrok.execute { + repeat(1000) { _ -> + i.decrementAndGet() + LockSupport.parkNanos(1) + } + } + Thread.sleep(10) + } + while (true) { + if (threadGrok.completedTaskCount == 1000L) { + break + } + Thread.sleep(10) + } + println("结果: $i, 执行时间: ${System.currentTimeMillis() - start}ms") + } +} \ No newline at end of file