热衷学习,热衷生活!😄
沉淀、分享、成长,让自己和他人都能有所收获!😄
一、为什么要使用线程池?
线程池提供了一种限制和管理资源(线程、任务)的方式。
这里借用《Java 并发编程的艺术》提到的来说一下使用线程池的好处:
- 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度:当任务到达时,任务可以不需要等待创建线程就能立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无线的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
二、ThreadPoolExecutor类分析
Java
线程池主要由Executor
框架实现,Executor
框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,Executor
框架让并发编程变得更加简单。
线程池实现类ThreadPoolExecutor
是Executor
框架最核心的类,我们就从这个类的学习线程池的实现原理。
核心属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public class ThreadPoolExecutor extends AbstractExecutorService { private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private final BlockingQueue<Runnable> workQueue; private final HashSet<Worker> workers = new HashSet<>(); private final ReentrantLock mainLock = new ReentrantLock(); private final Condition termination = mainLock.newCondition(); private int largestPoolSize; private long completedTaskCount; private volatile ThreadFactory threadFactory; private volatile RejectedExecutionHandler handler; private volatile long keepAliveTime; private volatile boolean allowCoreThreadTimeOut; private volatile int corePoolSize; private volatile int maximumPoolSize; }
|
构造方法
参数最多的构造方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if(corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if(workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
|
可以根据这个构造方法自定义线程数、线程池容量(最大线程数)、空闲线程等待任务周期、任务队列、线程工厂、拒绝策略。
在《阿里巴巴Java开发手册》“并发处理”这一章节,明确指出线程资源必须通过线程池创建,不允许在应用中自行显示创建线程,这是因为使用线程池创建线程可以减少在创建和销毁线程上所消耗的时间以及系统资源的开销。
《阿里巴巴Java开发手册》中还强制不能使用Executors
去创建线程池,而是通过上面的ThreadPoolExecutor
的构造方式创建,这样的处理方式可以让写的同学更加明确线程池的运行规则,避免资源耗尽的风险。
下面简单分析一下每个参数的含义和作用:
corePoolSize
:核心线程数量。
maximumPoolSize
:最大线程数量,也就是线程池的容量。
keepAliveTime
:线程空闲等待时间,也和工作线程的生命周期有关。
unit
:线程空闲时间的单位,最终会转为成纳秒。
workQueue
:等待队列或者叫任务队列。
ThreadFactory
:创建线程的工厂,默认使用Executors.defaultThreadFactory()
作为线程池工厂实例。
handler
:线程池的执行执行处理器,更多的时候成为拒绝策略,拒绝策略执行的时机是当阻塞队列已满、没有空闲的线程(包含核心线程和非核心线程)并且继续提交任务。提供了4种拒绝策略实现:
AbortPolicy
:直接拒绝策略,也就是不会执行任务,直接抛出RjectedExecutionExcetion
错误,默认的拒绝策略。
DiscardPolicy
:抛弃策略,也就是直接忽略提交的任务。
DiscardOldestPolicy
:抛弃最老任务策略,也就是通过poll()
方法取出任务队列头的任务抛弃,然后执行当前提交的任务。
CallerRunsPolicy
:调用者执行策略,也就是当前调用Executor#execute()
的线程直接调用任务Runnable#run()
,一般不希望任务丢失会选用这种策略,但从实际角度来看,原来的异步调用意图会退化成同步调用。
状态控制
状态控制主要围绕原子整数成员变量crl
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
private static int workerCountOf(int c) { return c & COUNT_MASK; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static boolean runStateLessThan(int c, int s) { return c < s; }
private static boolean runStateAtLeast(int c, int s) { return c >= s; }
private static boolean isRunning(int c) { return c < SHUTDOWN; }
private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); }
private boolean compareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1); }
private void decrementWorkerCount() { ctl.addAndGet(-1); }
|
接下来分析一下线程池的状态变量,工作线程数量位的长度是COUNT_BITS
,它的值是Integer.SIZE - 3
,也就是正整数29:
我们知道,整数包装类型Integer实例的大小是4byte,一共是32位,也就是一共有32位用于存放0和1。
在ThreadPoolExecutor实现中,使用32位的整数包装类型存放工作线程数和线程状态,其中低29位用于存放工作线程数,而高3位用于存放线程池状态,所以线程池的状态最多只能有2^3种,工作线程上限数量为2^29 - 1,超过5亿,这个数量在短时间内不用考虑会超限。
接着看工作线程上线数量掩码COUNT_MASK
,它的值是(1 < COUNT_BITS - 1
),也就是1左移29位,再减去1,如果补全32位,它的位示图如下:
然后就是线程池的状态常量,比如RUNNING
状态:
1 2 3 4 5
|
private final static int RUNNING = -1 << COUNT_BITS;
|
线程池状态的运行状态常量:
状态名称 |
位图 |
十进制值 |
描述 |
RUNNING |
111-0000... |
-536870912 |
运行中状态,可以接受新的任务和执行任务队列中的任务。 |
SHUTDOWN |
000-0000... |
0 |
关闭状态,不再接收新的任务,但是会执行任务队列中的任务。 |
STOP |
001-0000... |
536870912 |
停止状态,不再接受新的任务,也不会执行任务队列中的任务,中断所有执行中的任务。 |
TIDYING |
010-0000... |
1073741824 |
整理中状态,所有任务都已经执行完毕,工作线程数为0,过渡到此状态的工作线程会调用钩子方法terminated() |
TERMINATED |
011-0000... |
1610612736 |
终止状态,钩子方法terminated() 执行完毕。 |
这里还有一个比较特殊的技巧,由于运行状态值存放在高3位,所以直接可以通过十进制来比较判断线程池的状态:
RUNNING
< SHUTDOWN
< STOP
< TIDYING
< TERMINATED
下面的3个方法就是使用这种技巧:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| private static boolean runStateLessThan(int c, int s) { return c < s; }
private static boolean runStateAtLeast(int c, int s) { return c >= s; }
private static boolean isRunning(int c) { return c < SHUTDOWN; }
|
线程状态流转关系如下图:
execute方法源码分析
线程异步执行任务的方法实现是ThreadPoolExecutor#execute()
方法,我们从源码的实现来学习,源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| public void execute(Runnable command) { if (command == null) throw new NullPointerExcetion(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWoker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (!isRunning() && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
|
上面代码的流程如下:
- 如果当前工作线程总数小于核心线程数
corePoolSize
,则直接创建核心线程去执行任务(任务实例会传入直接用于构造工作线程实例)。
- 如果当前工作线程总是大于等于核心线程数
corePoolSize
,判断线程状态是否是运行中状态,如果是运行中状态则会尝试用非阻塞方法(offer()
)向任务队列投放任务,如果投放成功会二次检查线程池运行状态,如果线程池是非运行中状态或者从任务队列移除当前的任务失败,则会调用拒绝策略,如果当前工作线程数量为0,则创建一个非核心线程并且传入的任务对象为null
。
- 如果任务队列投放任务失败了(任务队列满了),则会创建创建非核心线程传入任务实例执行。
- 如果非核心线程创建失败,则会调用拒绝策略。
这里有一个疑惑点:为什么要二次检查线程池的状态,当前工作线程数量为0,尝试创建一个非核心线程并且传入的任务对象为null
?这个可以看API
的解释:
如果一个任务成功加入任务队列,我们依然需要二次检查是否需要添加一个工作线程,因为所有存活的工作线程有可能在最后一次检查之后就终结了或者执行当前任务的时候线程池是否已经shutdown
了,所以我们需要二次检车线程池的状态,必须时要把任务从任务队列中移除或者在没有可用的工作线程的前提的下创建一个工作线程。
execute()
方法执行流程图如下:
addWorker方法源码分析
addWorker()
方法用于添加工作线程,源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
| pirvate boolean addWorker(Runnable firstTask, boolean core) { retry; for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndOIncreamentWorkerCount(c)) breack retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unLock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock(); mainLock.lock(); try { if (w != null) workers.remove(); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unLock(); } }
|
addWorker()
方法是用来添加执行任务,这个流程可以分为两部分来看,第一部分是用于记录线程数量,第二部分是在独占锁里创建执行线程并启动。流程如下:
- 首先判断当前线程池的状态是否是
SHUTDOWN
、STOP
、TIDYING
、TERMINATED
中的一个。并且当前状态为SHUTDOWN
、且传入的任务为NULL
、同时任务队列不为空。那么就返回false
。
- 不满足上一点然后判断线程数是否超过核心线程数或者最大线程数(根据传入的
core
判断),如果超过则返回false
。
- 然后通过
CAS
操作增加线程池数量,成功跳出循环体。
- 线程池数量记录成功之后,创建工作实例,使用独占锁创建工作线程并加入到工作线程集合,并记录添加状态,添加成功则启动工作线程,记录启动状态,如果最后启动失败则调用
addWorkerFailed()
方法移除线程等操作。
流程图如下:
内部类Worker源码分析
线程池中的每一个具体的工作线程被包装为内部类Worker
实例,Worker
继承与AQS
,实现了Runnable
接口,源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| private final class Worker extends AbstractQueuedSynchronized implements Runnable { private static final long serialVersionUID = 6138294804551838833L; final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); this.firstTask = firseTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } protected boolean isHeldExclusively() { return getState() != 0; }
protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
|
Worker
的构造函数里面的逻辑十分重要,通过ThreadFactory
创建Thread
实例同时引入Worker
实例,因为Worker
本身实现了Runnable
,所以可以作为任务提交到线程中执行。只要Worker
持有的线程实例w
调用Thread#start()
方法就能执行Worker#run()
。简化一下逻辑如下:
1 2 3 4 5 6 7 8
| Worker worker = createWorker();
ThreadFactory threadFactory = getThreadFactory();
Thread thread = threadFactory.newThread(worker);
thread.start();
|
Worker
继承AQS
,这里使用了AQS
的独占锁模式,这里有个技巧是构造Worker
的时候,把AQS
资源状态通过setState(-1)
设置成-1,这是因为Wokrer
实例刚创建时AQS
中state
的默认值是0,此时线程尚未启动,不能在这个时候进行线程中断,见Worker#interruptIfStarted()
方法。
runWorker方法源码分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
|
runWorker
方法的核心流程如下:
Worker
先执行解锁操作,允许线程中断。
- 通过
while
循环调用getTask()
方法获取任务对象,首轮循环可能是外部传入的收个任务对象。
- 如果线程池状态变为
STOP
状态,则需要确保工作线程是中断状态并且进行中断处理,否则要保证工作线程不是中断状态。
- 执行任务实例
Runnable#run()
方法,任务执行之前和之后分别会调用beforeExecute()
和afterExecute()
。
while
循环跳出说明任务全部执行完毕,然后会调用processWorkerExit()
方法处理工作线程退出后的工作。
千言万语不如一图,流程图如下:
getTask方法源码解析
getTask()
方法是工作线程在while
死循环中获取任务队列中的任务对象的方法,源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| private Runnable getTask() { boolean timeOut = false; for(;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())){ decrementWorkerCount(); return null; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedCount = true; } catch (InterruptedException retry) { timedOut = false; } } }
|
这个方法中,有两处十分复杂的if
逻辑,先来看第一处,对于第一处if
可能导致工作线程数量减去1直接返回null
的场景有:
- 线程池状态为
SHUTDOWN
,一般是调用了shutdown()
方法,并且任务队列为空。
- 线程池状态为
STOP
。
对于第二处if
逻辑有点复杂,先拆解一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| boolean b1 = wc > maximumPoolSize;
boolean b2 = timed && timedOut;
boolean b3 = wc > 1;
boolean b4 = workQueue.isEmpty(); if (r) { if (compareAndDecrementWorkerCount(c)){ return null; }else{ continue; } }
|
这段逻辑大多数情况下是针对非核心线程的。在execute()
方法中,线程总数大于核心线程并且小于最大线程数时,会调用addWorker(task, false)
方法添加非核心线程,而这里的逻辑恰好是想法的操作,用于减少非核心线程数,使得工作县城总数总是接近于核心线程数。如果对于核心线程,上一轮循环获取对象为null
,这一轮循环很容易满足timed && timedOut
为true
,这个时候getTask()
返回null
导致runWorker()
方法跳出循环,最后执行processWorkerExit()
方法处理工作,而该非核心线程对应的Worker
则变成“游离对象”,等待被JVM回收。当allowCoreThreadTimeOut
设置为true
的时候,这里分析的非核心线程的生命周期终结逻辑同时会适用于核心线程,那么可以总结出keepAliveTime
的意义:
- 当允许核心线程超时,也就是
allowCoreThreadTimeOut
设置为true的时候,此时keepAliveTime
表示空闲的工作线程存活周期。
- 默认情况下不允许核心线程超时,此时
keepAliveTime
表示空闲的非核心线程存活周期。
三、手写一个线程池
通过上面对ThreadPoolExecutor
的学习,我们可以手写一个简单的线程池,包含了线程的核心逻辑,包含了提交任务,添加任务,获取任务,执行任务核心逻辑。
这个手写线程池的逻辑也非常简单,只体现核心流程,包括:
- 有n个一直执行的线程。
- 把线程提交给线程池运行。
- 如果线程池已满,则把线程放入队列中。
- 最后当有空闲时,则获取队列中线程进行运行。
代码实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
| public class ThreadPoolTrader implements Executor {
private final AtomicInteger ctl = new AtomicInteger(0);
private volatile int corePoolSize; private volatile int maximumPoolSize; private final BlockingQueue<Runnable> workQueue;
public ThreadPoolTrader(int corePoolSize, int maximumPoolSize, BlockingQueue<Runnable> workQueue) { this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; }
@Override public void execute(Runnable command) { int c = ctl.get(); if (c < corePoolSize) { if (!addWorker(command)) { reject(); } return; } if (!workQueue.offer(command)) { if (!addWorker(command)) { reject(); } } }
private boolean addWorker(Runnable firstTask) { if (ctl.get() >= maximumPoolSize) return false; Worker worker = new Worker(firstTask); worker.thread.start(); ctl.incrementAndGet(); return true;
}
private final class Worker implements Runnable {
final Thread thread; Runnable firstTask;
Worker(Runnable firstTask) { this.thread = new Thread(this); this.firstTask = firstTask; }
@Override public void run() { Runnable task = firstTask; try { while (task != null || (task = getTask()) != null) { System.out.println("当前执行任务的线程:" + Thread.currentThread().getName()); task.run(); if (ctl.get() > maximumPoolSize) { break; } task = null; } } finally { ctl.decrementAndGet(); } } }
private Runnable getTask() { for (; ; ) { try { System.out.println("workQueue.size:" + workQueue.size()); return workQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } } }
private void reject() { throw new RuntimeException("Error!ctl.count:" + ctl.get() + " workQueue.size:" + workQueue.size()); }
public static void main(String[] args) { ThreadPoolTrader threadPoolTrader = new ThreadPoolTrader(2, 2, new ArrayBlockingQueue<Runnable>(10)); for (int i = 0; i < 10; i++) { int finalI = i; threadPoolTrader.execute(() ->{ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任务编号:" + finalI); }); } } }
|
上面的代码测试如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| 当前执行任务的线程:Thread-1 当前执行任务的线程:Thread-0 任务编号:1 任务编号:0 workQueue.size:8 workQueue.size:8 当前执行任务的线程:Thread-0 当前执行任务的线程:Thread-1 任务编号:3 任务编号:2 workQueue.size:6 当前执行任务的线程:Thread-1 workQueue.size:6 当前执行任务的线程:Thread-0 任务编号:5 workQueue.size:4 当前执行任务的线程:Thread-0 任务编号:4 workQueue.size:3 当前执行任务的线程:Thread-1 任务编号:6 workQueue.size:2 当前执行任务的线程:Thread-0 任务编号:7 workQueue.size:1 当前执行任务的线程:Thread-1 任务编号:8 任务编号:9 workQueue.size:0 workQueue.size:0
|
四、创建线程池的四种方式
Java 创建线程池的四种方式