> 技术文档 > 模仿线程池,写了个 Docker 容器池

模仿线程池,写了个 Docker 容器池


代码链接:

zazhiii/docker-pool: 容器池执行器,用于管理 Docker 容器的复用与生命周期。ContainerPoolExecutor is a lightweight container pool manager designed to manage Docker containers for code execution.

背景

在设计OJ系统的代码运行端时,我的思路是将代码放到 Docker 容器中运行。在某些场景下(例如竞赛的时候)去测评用户提交的代码的频率是很高的,若每次执行代码都去创建销毁一个容器,会严重占用资源,且效率很低。

所以参考线程池的思想,我设计了一个容器池,实现容器的复用。

目前实现的功能:

  • 最大容器限制
  • 复用容器
  • 空闲容器超时自动停止
  • 实现泛型,可管理自定义容器

下面介绍大致实现

容器对象封装

定义一个通用父类

这个父类描述容器的基本行为和属性。

属性:docker客户端、容器ID、容器名称、上一次使用时间等等。

方法:创建命令、运行命令、启动/停止容器、检查容器状态等。

@Getter @AllArgsConstructor public abstract class DockerContainer { protected DockerClient dockerClient; protected String containerId; // 容器ID  protected String containerName; @Setter protected long lastUsedTime; // 最后使用时间  public void start() { dockerClient.startContainerCmd(containerId).exec(); } public void stop() { dockerClient.stopContainerCmd(containerId).exec(); } public ExecCreateCmdResponse createCmd(String... cmd) { return dockerClient.execCreateCmd(containerId)  .withAttachStdout(true)  .withAttachStderr(true)  .withAttachStdin(true)  .withCmd(cmd)  .exec(); } public ResultCallback.Adapter<Frame> execCmdAsync(ExecCreateCmdResponse execResponse, ByteArrayOutputStream stdout, ByteArrayOutputStream stderr, InputStream stdin) { return dockerClient.execStartCmd(execResponse.getId())  .withStdIn(stdin)  .exec(new ResultCallback.Adapter<Frame>() {@Overridepublic void onNext(Frame frame) { try { if (frame.getStreamType() == StreamType.STDOUT) { stdout.write(frame.getPayload()); } else if (frame.getStreamType() == StreamType.STDERR) { stderr.write(frame.getPayload()); } } catch (IOException e) { throw new RuntimeException(\"Error writing to output streams\", e); }}  }); } public void execCmd(ExecCreateCmdResponse execResponse, ByteArrayOutputStream stdout, ByteArrayOutputStream stderr, InputStream stdin ) throws InterruptedException { this.execCmdAsync(execResponse, stdout, stderr, stdin)  .awaitCompletion(); } public boolean execCmdWithTimeout(  ExecCreateCmdResponse execResponse,  ByteArrayOutputStream stdout,  ByteArrayOutputStream stderr,  InputStream stdin,  int timeout,  TimeUnit timeUnit  ) throws InterruptedException { return this.execCmdAsync(execResponse, stdout, stderr, stdin)  .awaitCompletion(timeout, timeUnit); } public InspectContainerResponse inspectContainer() { return dockerClient.inspectContainerCmd(containerId).exec(); } public boolean isRunning() { return Boolean.TRUE.equals(inspectContainer().getState().getRunning()); } }

实现自定义容器

继承通用父类,再实现容器想要的特定功能。

@Getter public class CodeExecContainer extends DockerContainer{ private final String containerWorkingDir; private final String hostWorkingDir; public CodeExecContainer(DockerClient dockerClient, String containerId, String containerName, String containerWorkingDir, String hostWorkingDir) { super(dockerClient, containerId, containerName, System.currentTimeMillis()); this.containerWorkingDir = containerWorkingDir; this.hostWorkingDir = hostWorkingDir; } /** * 编译 Java 代码 * @param filepath Java 文件路径 * @return 编译错误信息,如果编译成功则返回空字符串 */ public String compileJavaCode(String filepath) { ExecCreateCmdResponse resp = this.createCmd(\"javac\", filepath); ByteArrayOutputStream stderr = new ByteArrayOutputStream(); try {  this.execCmdAsync(resp, new ByteArrayOutputStream(), stderr, null).awaitCompletion(10, java.util.concurrent.TimeUnit.SECONDS); } catch (InterruptedException e) {  throw new RuntimeException(\"编译 Java 代码时被中断\", e); } return stderr.toString(StandardCharsets.UTF_8); } /** * 执行 Java 代码并计时 * @param workingDir 容器工作目录 * @param fileName Java 文件名 * @param stdin 标准输入流 * @param timeout 超时时间 * @param timeunit 超时时间单位 * @return CodeRunResult 包含执行结果、时间和内存使用情况 */ public CodeRunResult runJavaCode(String workingDir,  String fileName, InputStream stdin, long timeout, TimeUnit timeunit ) { ExecCreateCmdResponse resp = this.createCmd(\"time\", \"-f\", \"__TIME__:%U %S %E %M\",\"java\", workingDir + File.separator + fileName); boolean awaited = false; ByteArrayOutputStream stdout = new ByteArrayOutputStream(); ByteArrayOutputStream stderr = new ByteArrayOutputStream(); try {  awaited = this.execCmdAsync(resp, stdout, stderr, stdin).awaitCompletion(timeout, timeunit); } catch (InterruptedException e) {  throw new RuntimeException(\"执行 Java 代码时被中断\", e); } if(!awaited) {  return CodeRunResult.timeout(); } String err = stderr.toString(StandardCharsets.UTF_8); String output = stdout.toString(StandardCharsets.UTF_8); if(err.startsWith(\"__TIME__:\")) {  String[] parts = err.substring(9).trim().split(\" \");  long timeUsed = (long)Double.parseDouble(parts[2].split(\":\")[1]) * 1000;  long memoryUsed = Long.parseLong(parts[3]) / 1024; // MB  return CodeRunResult.success(output, timeUsed, memoryUsed); } err = err.split(\"__TIME__:\")[0].trim(); // 只保留时间信息之前的错误信息  return CodeRunResult.error(err); } }

容器工厂

容器工厂接口

定义创建容器的一些规范。

public interface DockerContainerFactory<T extends DockerContainer> { T createDockerContainer(String containerName); T createDockerContainer(); }

自定义容器工厂

实现容器工厂接口,完成想要创建的自定义容器的逻辑。

package com.zazhi.docker_pool.pool; import com.github.dockerjava.api.DockerClient; import com.github.dockerjava.api.command.CreateContainerResponse; import com.github.dockerjava.api.model.Bind; import com.github.dockerjava.api.model.HostConfig; import com.github.dockerjava.api.model.Volume; import com.zazhi.docker_pool.pojo.CodeExecContainer; import java.io.File; import java.util.UUID; /** * @author zazhi * @date 2025/7/2 * @description: CodeExecContainerFactory 类用于创建 CodeExecContainer 实例 */ public class CodeExecContainerFactory implements DockerContainerFactory<CodeExecContainer> { private DockerClient dockerClient; private String hostWorkingDir; private String containerWorkingDir; private int memoryLimitMb; private String imageName; public CodeExecContainerFactory(DockerClient dockerClient, String hostWorkingDir, String containerWorkingDir, int memoryLimitMb, String imageName) { this.dockerClient = dockerClient; this.hostWorkingDir = hostWorkingDir; this.containerWorkingDir = containerWorkingDir; this.memoryLimitMb = memoryLimitMb; this.imageName = imageName; } public CodeExecContainer createDockerContainer(String containerName) { String hostWorkingDir = this.hostWorkingDir + File.separator + UUID.randomUUID(); // 使用 UUID 确保每个容器的工作目录唯一  CreateContainerResponse createRes = dockerClient.createContainerCmd(imageName)  .withHostConfig( HostConfig.newHostConfig() .withBinds(new Bind(hostWorkingDir, new Volume(containerWorkingDir))) // 挂载路径注意要挂载绝对路径,否则会有点问题  .withMemory(memoryLimitMb * 1024 * 1024L) // 设置最大内存限制  .withMemorySwap(memoryLimitMb * 1024 * 1024L) // 禁止交换分区 (swap) )  .withName(containerName)  .withWorkingDir(containerWorkingDir) // 设置工作目录  .exec(); return new CodeExecContainer(dockerClient, createRes.getId(), containerName,  containerWorkingDir, hostWorkingDir); } public CodeExecContainer createDockerContainer() { CreateContainerResponse createRes = dockerClient.createContainerCmd(imageName)  .withHostConfig( HostConfig.newHostConfig() .withBinds(new Bind(hostWorkingDir, new Volume(containerWorkingDir))) // 挂载路径注意要挂载绝对路径,否则会有点问题  .withMemory(memoryLimitMb * 1024 * 1024L) // 设置最大内存限制  .withMemorySwap(memoryLimitMb * 1024 * 1024L) // 禁止交换分区 (swap) )  .withWorkingDir(containerWorkingDir) // 设置工作目录  .exec(); return new CodeExecContainer(dockerClient, createRes.getId(), \"\",  containerWorkingDir, hostWorkingDir); } }

容器池实现

核心是阻塞队列,使用容器时从队列取出容器,用完再归还。

用一个定时任务轮训容器列表,空闲太久的则将其停止。

package com.zazhi.docker_pool.pool; import com.zazhi.docker_pool.pojo.CodeExecContainer; import com.zazhi.docker_pool.pojo.DockerContainer; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * @author zazhi * @date 2025/7/2 * @description: ContainerPoolExecutor 类用于管理容器池的执行器 */ public class ContainerPoolExecutor<T extends DockerContainer> { private final int maximumPoolSize; private final long keepStartTime; private final DockerContainerFactory<T> dockerContainerFactory; private final LinkedBlockingQueue<T> containerQueue = new LinkedBlockingQueue<>(); private final AtomicInteger containerCount; public ContainerPoolExecutor(int maximumPoolSize, long keepStartTime, TimeUnit unit, DockerContainerFactory<T> dockerContainerFactory) { this.maximumPoolSize = maximumPoolSize; this.keepStartTime = unit.toMillis(keepStartTime); this.dockerContainerFactory = dockerContainerFactory; this.containerCount = new AtomicInteger(0); // 启动定时任务停止空闲容器  ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); scheduler.scheduleAtFixedRate(this::cleanIdleContainers,  1, 1, TimeUnit.SECONDS); } public T acquireContainer() throws InterruptedException { T container = containerQueue.poll(); if (container == null) {  if (containerCount.get() < maximumPoolSize) {  // 创建新的容器  container = dockerContainerFactory.createDockerContainer();  containerCount.incrementAndGet();  } else {  // 等待直到有可用的容器  container = containerQueue.poll();  } } if(container == null){  throw new InterruptedException(\"No available container in the pool\"); } if(!container.isRunning()){  container.start(); } container.setLastUsedTime(System.currentTimeMillis()); return container; } public void releaseContainer(T container) { if (container != null) {  containerQueue.offer(container); } } private void cleanIdleContainers(){ long currentTime = System.currentTimeMillis(); containerQueue.forEach(container -> {  if (currentTime - container.getLastUsedTime() > keepStartTime) {  container.stop();  } }); } }

如何使用/测试?

首先要有Docker环境,其次要有一个容器镜像,跟想要实现的自定义容器匹配。我这里的是执行代码的容器,镜像可以从Dockerfile构建

FROM ubuntu:22.04 # 设置工作目录 WORKDIR /app RUN apt-get update && apt-get install -y \\ time \\ openjdk-17-jdk #构建的容器会默认保持运行状态,一个后台进程保持容器运行 CMD [\"tail\", \"-f\", \"/dev/null\"]
  1. 用docker-java的api获取一个docker client
  2. 创建线程工厂
  3. 创建容器池
  4. 获取容器
  5. 归还容器
@Test void testContainerPool() { DockerClientConfig config = DefaultDockerClientConfig.createDefaultConfigBuilder().build(); DockerHttpClient httpClient = new ApacheDockerHttpClient.Builder()  .dockerHost(config.getDockerHost())  .sslConfig(config.getSSLConfig())  .maxConnections(100)  .connectionTimeout(Duration.ofSeconds(30))  .responseTimeout(Duration.ofSeconds(45))  .build(); DockerClient dockerClient = DockerClientImpl.getInstance(config, httpClient); ContainerPoolExecutor<CodeExecContainer> pool = new ContainerPoolExecutor<>(  5, // maximumPoolSize  10, // keepAliveTime  TimeUnit.SECONDS,  new CodeExecContainerFactory(dockerClient,\"G:\\\\code_exec_docker_v\", // 主机工作目录   \"/app\", // 容器工作目录   512, // 内存限制 (MB)    \"jg\" // 镜像名称  ) ); try { CodeExecContainer container = pool.acquireContainer(); pool.releaseContainer(container); } catch (InterruptedException e) { throw new RuntimeException(e); } try { Thread.sleep(10000000); } catch (InterruptedException e) { throw new RuntimeException(e); } }

容器为懒惰创建的,有从容器池中获取容器才会去创建容器。

容器空闲一定时间后会被停止。

一些设计思考

为什么是停止容器而不是销毁容器?

先看线程池中的线程,线程没有容器这种暂停/恢复的机制,且创建的成本比创建容器的成本小,在线程池中使用的是结束/创建线程的方式。

不同于线程资源,容器是可以停止的,停止之后不会占用CPU 和内存资源,仅占用少量磁盘空间,停止之后通过启动就可以快速复用。此外创建容器的开销是比较大的,所以使用停止/启动的方式来节省CPU、内存资源,而不是创建/销毁。

为什么不设计临时容器、核心容器这样的概念呢?

在线程池中临时线程就是一定时间不用他了就把他销毁了。

基于上面一个问题,既然容器不会销毁,那么就没必要设计临时容器这样的概念了。

TODO

  • 似乎停止容器和取出容器之间有一定线程安全问题,待测试修复。
  • 容器取出来需要手动归还,万一忘记还了怎么办,变成野容器了,能否实现自动归还呢?