java并发编程_线程池的使用方法(详解)

作者:jingxian 时间:2023-03-29 22:23:00 

一、任务和执行策略之间的隐性耦合

Executor可以将任务的提交和任务的执行策略解耦

只有任务是同类型的且执行时间差别不大,才能发挥最大性能,否则,如将一些耗时长的任务和耗时短的任务放在一个线程池,除非线程池很大,否则会造成死锁等问题

1.线程饥饿死锁

类似于:将两个任务提交给一个单线程池,且两个任务之间相互依赖,一个任务等待另一个任务,则会发生死锁;表现为池不够

定义:某个任务必须等待池中其他任务的运行结果,有可能发生饥饿死锁

2.线程池大小

java并发编程_线程池的使用方法(详解)

注意:线程池的大小还受其他的限制,如其他资源池:数据库连接池

如果每个任务都是一个连接,那么线程池的大小就受制于数据库连接池的大小

3.配置ThreadPoolExecutor线程池

实例:

1.通过Executors的工厂方法返回默认的一些实现

2.通过实例化ThreadPoolExecutor(.....)自定义实现

线程池的队列

1. * 队列:任务到达,线程池饱满,则任务在队列中等待,如果任务无限达到,则队列会无限扩张

如:单例和固定大小的线程池用的就是此种

2.有界队列:如果新任务到达,队列满则使用饱和策略

3.同步移交:如果线程池很大,将任务放入队列后在移交就会产生延时,如果任务生产者很快也会导致任务排队

SynchronousQueue直接将任务移交给工作线程

机制:将一个任务放入,必须有一个线程等待接受,如果没有,则新增线程,如果线程饱和,则拒绝任务

如:CacheThreadPool就是使用的这种策略

饱和策略:

setRejectedExecutionHandler来修改饱和策略

1.终止Abort(默认):抛出异常由调用者处理

2.抛弃Discard

3.抛弃DiscardOldest:抛弃最旧的任务,注意:如果是优先级队列将抛弃优先级最高的任务

4.CallerRuns:回退任务,有调用者线程自行处理

4.线程工厂ThreadFactoy

每当创建线程时:其实是调用了线程工厂来完成

自定义线程工厂:implements ThreadFactory

可以定制该线程工厂的行为:如UncaughtExceptionHandler等


public class MyAppThread extends Thread {
 public static final String DEFAULT_NAME = "MyAppThread";
 private static volatile boolean debugLifecycle = false;
 private static final AtomicInteger created = new AtomicInteger();
 private static final AtomicInteger alive = new AtomicInteger();
 private static final Logger log = Logger.getAnonymousLogger();

public MyAppThread(Runnable r) {
   this(r, DEFAULT_NAME);
 }

public MyAppThread(Runnable runnable, String name) {
   super(runnable, name + "-" + created.incrementAndGet());
   //设置该线程工厂创建的线程的 未捕获异常的行为
   setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
     public void uncaughtException(Thread t,
                    Throwable e) {
       log.log(Level.SEVERE,
           "UNCAUGHT in thread " + t.getName(), e);
     }
   });
 }

public void run() {
   // Copy debug flag to ensure consistent value throughout.
   boolean debug = debugLifecycle;
   if (debug) log.log(Level.FINE, "Created " + getName());
   try {
     alive.incrementAndGet();
     super.run();
   } finally {
     alive.decrementAndGet();
     if (debug) log.log(Level.FINE, "Exiting " + getName());
   }
 }

public static int getThreadsCreated() {
   return created.get();
 }

public static int getThreadsAlive() {
   return alive.get();
 }

public static boolean getDebug() {
   return debugLifecycle;
 }

public static void setDebug(boolean b) {
   debugLifecycle = b;
 }
}

5.扩展ThreadPoolExecutor

可以被自定义子类覆盖的方法:

1.afterExecute:结束后,如果抛出RuntimeException则方法不会执行

2.beforeExecute:开始前,如果抛出RuntimeException则任务不会执行

3.terminated:在线程池关闭时,可以用来释放资源等

二、递归算法的并行化

1.循环

在循环中,每次循环操作都是独立的


//串行化
 void processSequentially(List<Element> elements) {
   for (Element e : elements)
     process(e);
 }
 //并行化
 void processInParallel(Executor exec, List<Element> elements) {
   for (final Element e : elements)
     exec.execute(new Runnable() {
       public void run() {
         process(e);
       }
     });
 }

2.迭代

如果每个迭代操作是彼此独立的,则可以串行执行

如:深度优先搜索算法;注意:递归还是串行的,但是,每个节点的计算是并行的


//串行 计算compute 和串行迭代
 public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results) {
   for (Node<T> n : nodes) {
     results.add(n.compute());
     sequentialRecursive(n.getChildren(), results);
   }
 }
 //并行 计算compute 和串行迭代
 public <T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) {
   for (final Node<T> n : nodes) {
     exec.execute(() -> results.add(n.compute()));
     parallelRecursive(exec, n.getChildren(), results);
   }
 }
 //调用并行方法的操作
 public <T> Collection<T> getParallelResults(List<Node<T>> nodes)
     throws InterruptedException {
   ExecutorService exec = Executors.newCachedThreadPool();
   Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
   parallelRecursive(exec, nodes, resultQueue);
   exec.shutdown();
   exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
   return resultQueue;
 }

实例:


public class ConcurrentPuzzleSolver <P, M> {
 private final Puzzle<P, M> puzzle;
 private final ExecutorService exec;
 private final ConcurrentMap<P, Boolean> seen;
 protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>();

public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) {
   this.puzzle = puzzle;
   this.exec = initThreadPool();
   this.seen = new ConcurrentHashMap<P, Boolean>();
   if (exec instanceof ThreadPoolExecutor) {
     ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec;
     tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
   }
 }

private ExecutorService initThreadPool() {
   return Executors.newCachedThreadPool();
 }

public List<M> solve() throws InterruptedException {
   try {
     P p = puzzle.initialPosition();
     exec.execute(newTask(p, null, null));
     // 等待ValueLatch中闭锁解开,则表示已经找到答案
     PuzzleNode<P, M> solnPuzzleNode = solution.getValue();
     return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList();
   } finally {
     exec.shutdown();//最终主线程关闭线程池
   }
 }

protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) {
   return new SolverTask(p, m, n);
 }

protected class SolverTask extends PuzzleNode<P, M> implements Runnable {
   SolverTask(P pos, M move, PuzzleNode<P, M> prev) {
     super(pos, move, prev);
   }
   public void run() {
     //如果有一个线程找到了答案,则return,通过ValueLatch中isSet CountDownlatch闭锁实现;
     //为类避免死锁,将已经扫描的节点放入set集合中,避免继续扫描产生死循环
     if (solution.isSet() || seen.putIfAbsent(pos, true) != null){
       return; // already solved or seen this position
     }
     if (puzzle.isGoal(pos)) {
       solution.setValue(this);
     } else {
       for (M m : puzzle.legalMoves(pos))
         exec.execute(newTask(puzzle.move(pos, m), m, this));
     }
   }
 }
}
标签:java,线程池
0
投稿

猜你喜欢

  • Java JVM运行时数据区(Run-Time Data Areas)

    2022-01-22 22:10:35
  • 如何使用JCTools实现Java并发程序

    2023-11-21 07:37:39
  • Unity3D生成一段隧道网格的方法

    2022-02-22 23:46:27
  • 简单谈谈C#中深拷贝、浅拷贝

    2022-06-25 04:36:56
  • 关于maven:pom文件的使用解析

    2022-02-08 06:21:41
  • 解决Spring boot整合mybatis,xml资源文件放置及路径配置问题

    2021-07-07 12:27:20
  • c#语言使用Unity粒子系统制作手雷爆炸

    2021-10-11 11:13:46
  • C#隐藏手机号、邮箱等敏感信息的实现方法

    2023-12-08 17:17:45
  • 深入理解Java设计模式之建造者模式

    2022-12-06 00:57:37
  • 快速理解Java设计模式中的组合模式

    2021-10-18 04:16:29
  • Spring如何消除代码中的if-else/switch-case

    2021-12-12 03:04:47
  • Android RecyclerView的卡顿问题的解决方法

    2023-07-01 20:01:14
  • SpringMVC拦截器配置及运行流程解析

    2023-03-30 15:38:57
  • Spring Security+JWT实现认证与授权的实现

    2022-04-02 00:41:11
  • C#实现XML文件与DataTable、Dataset互转

    2021-06-07 04:00:27
  • Matlab实现贪吃蛇小游戏的示例代码

    2023-07-14 14:13:00
  • Android 大文件切割与合并的实现代码

    2023-05-25 21:58:58
  • SpringBoot整合Druid数据源过程详解

    2023-06-03 19:47:14
  • Java中比较运算符compareTo()、equals()与==的区别及应用总结

    2023-11-28 20:08:28
  • spring boot项目中MongoDB的使用方法

    2021-06-10 13:31:23
  • asp之家 软件编程 m.aspxhome.com