模仿线程池,写了个 Docker 容器池
源代码链接:
zazhiii/docker-pool: 容器池执行器,用于管理 Docker 容器的复用与生命周期。ContainerPoolExecutor is a lightweight container pool manager designed to manage Docker containers for code execution.
背景
在设计OJ系统的代码运行端时,我的思路是将代码放到 Docker 容器中运行。在某些场景下(例如竞赛的时候)去测评用户提交的代码的频率是很高的,若每次执行代码都去创建销毁一个容器,会严重占用资源,且效率很低。
所以参考线程池的思想,我设计了一个容器池,实现容器的复用。
目前实现的功能:
- 最大容器限制
- 复用容器
- 空闲容器超时自动停止
- 实现泛型,可管理自定义容器
下面介绍大致实现
容器对象封装
定义一个通用父类
这个父类描述容器的基本行为和属性。
属性:docker客户端、容器ID、容器名称、上一次使用时间等等。
方法:创建命令、运行命令、启动/停止容器、检查容器状态等。
@Getter @AllArgsConstructor public abstract class DockerContainer { protected DockerClient dockerClient; protected String containerId; // 容器ID protected String containerName; @Setter protected long lastUsedTime; // 最后使用时间 public void start() { dockerClient.startContainerCmd(containerId).exec(); } public void stop() { dockerClient.stopContainerCmd(containerId).exec(); } public ExecCreateCmdResponse createCmd(String... cmd) { return dockerClient.execCreateCmd(containerId) .withAttachStdout(true) .withAttachStderr(true) .withAttachStdin(true) .withCmd(cmd) .exec(); } public ResultCallback.Adapter<Frame> execCmdAsync(ExecCreateCmdResponse execResponse, ByteArrayOutputStream stdout, ByteArrayOutputStream stderr, InputStream stdin) { return dockerClient.execStartCmd(execResponse.getId()) .withStdIn(stdin) .exec(new ResultCallback.Adapter<Frame>() {@Overridepublic void onNext(Frame frame) { try { if (frame.getStreamType() == StreamType.STDOUT) { stdout.write(frame.getPayload()); } else if (frame.getStreamType() == StreamType.STDERR) { stderr.write(frame.getPayload()); } } catch (IOException e) { throw new RuntimeException(\"Error writing to output streams\", e); }} }); } public void execCmd(ExecCreateCmdResponse execResponse, ByteArrayOutputStream stdout, ByteArrayOutputStream stderr, InputStream stdin ) throws InterruptedException { this.execCmdAsync(execResponse, stdout, stderr, stdin) .awaitCompletion(); } public boolean execCmdWithTimeout( ExecCreateCmdResponse execResponse, ByteArrayOutputStream stdout, ByteArrayOutputStream stderr, InputStream stdin, int timeout, TimeUnit timeUnit ) throws InterruptedException { return this.execCmdAsync(execResponse, stdout, stderr, stdin) .awaitCompletion(timeout, timeUnit); } public InspectContainerResponse inspectContainer() { return dockerClient.inspectContainerCmd(containerId).exec(); } public boolean isRunning() { return Boolean.TRUE.equals(inspectContainer().getState().getRunning()); } }
实现自定义容器
继承通用父类,再实现容器想要的特定功能。
@Getter public class CodeExecContainer extends DockerContainer{ private final String containerWorkingDir; private final String hostWorkingDir; public CodeExecContainer(DockerClient dockerClient, String containerId, String containerName, String containerWorkingDir, String hostWorkingDir) { super(dockerClient, containerId, containerName, System.currentTimeMillis()); this.containerWorkingDir = containerWorkingDir; this.hostWorkingDir = hostWorkingDir; } /** * 编译 Java 代码 * @param filepath Java 文件路径 * @return 编译错误信息,如果编译成功则返回空字符串 */ public String compileJavaCode(String filepath) { ExecCreateCmdResponse resp = this.createCmd(\"javac\", filepath); ByteArrayOutputStream stderr = new ByteArrayOutputStream(); try { this.execCmdAsync(resp, new ByteArrayOutputStream(), stderr, null).awaitCompletion(10, java.util.concurrent.TimeUnit.SECONDS); } catch (InterruptedException e) { throw new RuntimeException(\"编译 Java 代码时被中断\", e); } return stderr.toString(StandardCharsets.UTF_8); } /** * 执行 Java 代码并计时 * @param workingDir 容器工作目录 * @param fileName Java 文件名 * @param stdin 标准输入流 * @param timeout 超时时间 * @param timeunit 超时时间单位 * @return CodeRunResult 包含执行结果、时间和内存使用情况 */ public CodeRunResult runJavaCode(String workingDir, String fileName, InputStream stdin, long timeout, TimeUnit timeunit ) { ExecCreateCmdResponse resp = this.createCmd(\"time\", \"-f\", \"__TIME__:%U %S %E %M\",\"java\", workingDir + File.separator + fileName); boolean awaited = false; ByteArrayOutputStream stdout = new ByteArrayOutputStream(); ByteArrayOutputStream stderr = new ByteArrayOutputStream(); try { awaited = this.execCmdAsync(resp, stdout, stderr, stdin).awaitCompletion(timeout, timeunit); } catch (InterruptedException e) { throw new RuntimeException(\"执行 Java 代码时被中断\", e); } if(!awaited) { return CodeRunResult.timeout(); } String err = stderr.toString(StandardCharsets.UTF_8); String output = stdout.toString(StandardCharsets.UTF_8); if(err.startsWith(\"__TIME__:\")) { String[] parts = err.substring(9).trim().split(\" \"); long timeUsed = (long)Double.parseDouble(parts[2].split(\":\")[1]) * 1000; long memoryUsed = Long.parseLong(parts[3]) / 1024; // MB return CodeRunResult.success(output, timeUsed, memoryUsed); } err = err.split(\"__TIME__:\")[0].trim(); // 只保留时间信息之前的错误信息 return CodeRunResult.error(err); } }
容器工厂
容器工厂接口
定义创建容器的一些规范。
public interface DockerContainerFactory<T extends DockerContainer> { T createDockerContainer(String containerName); T createDockerContainer(); }
自定义容器工厂
实现容器工厂接口,完成想要创建的自定义容器的逻辑。
package com.zazhi.docker_pool.pool; import com.github.dockerjava.api.DockerClient; import com.github.dockerjava.api.command.CreateContainerResponse; import com.github.dockerjava.api.model.Bind; import com.github.dockerjava.api.model.HostConfig; import com.github.dockerjava.api.model.Volume; import com.zazhi.docker_pool.pojo.CodeExecContainer; import java.io.File; import java.util.UUID; /** * @author zazhi * @date 2025/7/2 * @description: CodeExecContainerFactory 类用于创建 CodeExecContainer 实例 */ public class CodeExecContainerFactory implements DockerContainerFactory<CodeExecContainer> { private DockerClient dockerClient; private String hostWorkingDir; private String containerWorkingDir; private int memoryLimitMb; private String imageName; public CodeExecContainerFactory(DockerClient dockerClient, String hostWorkingDir, String containerWorkingDir, int memoryLimitMb, String imageName) { this.dockerClient = dockerClient; this.hostWorkingDir = hostWorkingDir; this.containerWorkingDir = containerWorkingDir; this.memoryLimitMb = memoryLimitMb; this.imageName = imageName; } public CodeExecContainer createDockerContainer(String containerName) { String hostWorkingDir = this.hostWorkingDir + File.separator + UUID.randomUUID(); // 使用 UUID 确保每个容器的工作目录唯一 CreateContainerResponse createRes = dockerClient.createContainerCmd(imageName) .withHostConfig( HostConfig.newHostConfig() .withBinds(new Bind(hostWorkingDir, new Volume(containerWorkingDir))) // 挂载路径注意要挂载绝对路径,否则会有点问题 .withMemory(memoryLimitMb * 1024 * 1024L) // 设置最大内存限制 .withMemorySwap(memoryLimitMb * 1024 * 1024L) // 禁止交换分区 (swap) ) .withName(containerName) .withWorkingDir(containerWorkingDir) // 设置工作目录 .exec(); return new CodeExecContainer(dockerClient, createRes.getId(), containerName, containerWorkingDir, hostWorkingDir); } public CodeExecContainer createDockerContainer() { CreateContainerResponse createRes = dockerClient.createContainerCmd(imageName) .withHostConfig( HostConfig.newHostConfig() .withBinds(new Bind(hostWorkingDir, new Volume(containerWorkingDir))) // 挂载路径注意要挂载绝对路径,否则会有点问题 .withMemory(memoryLimitMb * 1024 * 1024L) // 设置最大内存限制 .withMemorySwap(memoryLimitMb * 1024 * 1024L) // 禁止交换分区 (swap) ) .withWorkingDir(containerWorkingDir) // 设置工作目录 .exec(); return new CodeExecContainer(dockerClient, createRes.getId(), \"\", containerWorkingDir, hostWorkingDir); } }
容器池实现
核心是阻塞队列,使用容器时从队列取出容器,用完再归还。
用一个定时任务轮训容器列表,空闲太久的则将其停止。
package com.zazhi.docker_pool.pool; import com.zazhi.docker_pool.pojo.CodeExecContainer; import com.zazhi.docker_pool.pojo.DockerContainer; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * @author zazhi * @date 2025/7/2 * @description: ContainerPoolExecutor 类用于管理容器池的执行器 */ public class ContainerPoolExecutor<T extends DockerContainer> { private final int maximumPoolSize; private final long keepStartTime; private final DockerContainerFactory<T> dockerContainerFactory; private final LinkedBlockingQueue<T> containerQueue = new LinkedBlockingQueue<>(); private final AtomicInteger containerCount; public ContainerPoolExecutor(int maximumPoolSize, long keepStartTime, TimeUnit unit, DockerContainerFactory<T> dockerContainerFactory) { this.maximumPoolSize = maximumPoolSize; this.keepStartTime = unit.toMillis(keepStartTime); this.dockerContainerFactory = dockerContainerFactory; this.containerCount = new AtomicInteger(0); // 启动定时任务停止空闲容器 ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); scheduler.scheduleAtFixedRate(this::cleanIdleContainers, 1, 1, TimeUnit.SECONDS); } public T acquireContainer() throws InterruptedException { T container = containerQueue.poll(); if (container == null) { if (containerCount.get() < maximumPoolSize) { // 创建新的容器 container = dockerContainerFactory.createDockerContainer(); containerCount.incrementAndGet(); } else { // 等待直到有可用的容器 container = containerQueue.poll(); } } if(container == null){ throw new InterruptedException(\"No available container in the pool\"); } if(!container.isRunning()){ container.start(); } container.setLastUsedTime(System.currentTimeMillis()); return container; } public void releaseContainer(T container) { if (container != null) { containerQueue.offer(container); } } private void cleanIdleContainers(){ long currentTime = System.currentTimeMillis(); containerQueue.forEach(container -> { if (currentTime - container.getLastUsedTime() > keepStartTime) { container.stop(); } }); } }
如何使用/测试?
首先要有Docker环境,其次要有一个容器镜像,跟想要实现的自定义容器匹配。我这里的是执行代码的容器,镜像可以从Dockerfile构建
FROM ubuntu:22.04 # 设置工作目录 WORKDIR /app RUN apt-get update && apt-get install -y \\ time \\ openjdk-17-jdk #构建的容器会默认保持运行状态,一个后台进程保持容器运行 CMD [\"tail\", \"-f\", \"/dev/null\"]
- 用docker-java的api获取一个docker client
- 创建线程工厂
- 创建容器池
- 获取容器
- 归还容器
@Test void testContainerPool() { DockerClientConfig config = DefaultDockerClientConfig.createDefaultConfigBuilder().build(); DockerHttpClient httpClient = new ApacheDockerHttpClient.Builder() .dockerHost(config.getDockerHost()) .sslConfig(config.getSSLConfig()) .maxConnections(100) .connectionTimeout(Duration.ofSeconds(30)) .responseTimeout(Duration.ofSeconds(45)) .build(); DockerClient dockerClient = DockerClientImpl.getInstance(config, httpClient); ContainerPoolExecutor<CodeExecContainer> pool = new ContainerPoolExecutor<>( 5, // maximumPoolSize 10, // keepAliveTime TimeUnit.SECONDS, new CodeExecContainerFactory(dockerClient,\"G:\\\\code_exec_docker_v\", // 主机工作目录 \"/app\", // 容器工作目录 512, // 内存限制 (MB) \"jg\" // 镜像名称 ) ); try { CodeExecContainer container = pool.acquireContainer(); pool.releaseContainer(container); } catch (InterruptedException e) { throw new RuntimeException(e); } try { Thread.sleep(10000000); } catch (InterruptedException e) { throw new RuntimeException(e); } }
容器为懒惰创建的,有从容器池中获取容器才会去创建容器。
容器空闲一定时间后会被停止。
一些设计思考
为什么是停止容器而不是销毁容器?
先看线程池中的线程,线程没有容器这种暂停/恢复的机制,且创建的成本比创建容器的成本小,在线程池中使用的是结束/创建线程的方式。
不同于线程资源,容器是可以停止的,停止之后不会占用CPU 和内存资源,仅占用少量磁盘空间,停止之后通过启动就可以快速复用。此外创建容器的开销是比较大的,所以使用停止/启动的方式来节省CPU、内存资源,而不是创建/销毁。
为什么不设计临时容器、核心容器这样的概念呢?
在线程池中临时线程就是一定时间不用他了就把他销毁了。
基于上面一个问题,既然容器不会销毁,那么就没必要设计临时容器这样的概念了。
TODO
- 似乎停止容器和取出容器之间有一定线程安全问题,待测试修复。
- 容器取出来需要手动归还,万一忘记还了怎么办,变成野容器了,能否实现自动归还呢?