博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
阻塞队列 Future 线程池 AtomicInteger简单示例
阅读量:7216 次
发布时间:2019-06-29

本文共 5441 字,大约阅读时间需要 18 分钟。

hot3.png

import java.util.concurrent.BlockingQueue;public class Consumer implements Runnable{	   private final BlockingQueue queue;	   	   public Consumer(BlockingQueue q) { 		   queue = q; 		   }	   	   public void run() {	     try {	      for(int i=0;i<100;i++){	      consume(queue.take());	      }	     } catch (InterruptedException ex) {}	   }	   void consume(Object x) {	   System.out.println("cousume"+x.toString());	   }}
import java.util.concurrent.BlockingQueue;public class Producer implements Runnable{	   private final BlockingQueue queue;	   public Producer(BlockingQueue q)	   { 		   queue = q; 	   }	   	   public void run() {	     try {	      for(int i=0;i<100;i++){	      queue.put(produce());	      }	      	     } catch (InterruptedException ex) {}	   }	   	   String produce() {	   String temp=""+(char)('A'+(int)(Math.random()*26));	   System.out.println("produce"+temp);	   return temp;	   }}
//生产者 消费者 模式  ArrayBlockingQueue LinkedBlockingQueue 阻塞队列    	BlockingQueue
queue1=new LinkedBlockingQueue
(5);// Producer p=new Producer(queue1); Consumer c1=new Consumer(queue1); Consumer c2=new Consumer(queue1); new Thread(p).start(); new Thread(c1).start(); new Thread(c2).start();

List<Callable<Long>> callList = new ArrayList<Callable<Long>>();add一些Callable的实现类,多线程求和计算。

int threadCounts = 19;// 使用的线程数		long sum = 0;		ExecutorService exec1 = Executors.newFixedThreadPool(threadCounts);		List
> callList = new ArrayList
>(); // 生成很大的List List
list = new ArrayList
(); for (int i = 0; i <= 1000000; i++) { list.add(i); } int len = list.size() / threadCounts;// 平均分割List // List中的数量没有线程数多(很少存在) if (len == 0) { threadCounts = list.size();// 采用一个线程处理List中的一个元素 len = list.size() / threadCounts;// 重新平均分割List } for (int i = 0; i < threadCounts; i++) { final List
subList; if (i == threadCounts - 1) { subList = list.subList(i * len, list.size()); } else { subList = list.subList(i * len, len * (i + 1) > list.size() ? list.size() : len * (i + 1)); } // 采用匿名内部类实现 callList.add(new Callable
() { public Long call() throws Exception { long subSum = 0L; for (Integer i : subList) { subSum += i; } System.out.println("分配给线程:" + Thread.currentThread().getName() + "那一部分List的整数和为:\tSubSum:" + subSum); return subSum; } }); } List
> futureList = exec1.invokeAll(callList); for (Future
future : futureList) { sum += future.get(); } exec1.shutdown(); System.out.println(sum);

AtomicInteger的简单举例

// 阻塞队列,能容纳100个文件        final BlockingQueue
queue = new LinkedBlockingQueue
(100); // 线程池 final ExecutorService exec = Executors.newFixedThreadPool(5); final File root = new File("D:\\jpg"); // 完成标志 final File exitFile = new File(""); // 原子整型,读个数 // AtomicInteger可以在并发情况下达到原子化更新,避免使用了synchronized,而且性能非常高。 final AtomicInteger rc = new AtomicInteger(); // 原子整型,写个数 final AtomicInteger wc = new AtomicInteger(); // 读线程 Runnable read = new Runnable() { public void run() { scanFile(root); scanFile(exitFile); } public void scanFile(File file) { if (file.isDirectory()) { File[] files = file.listFiles(new FileFilter() { public boolean accept(File pathname) { return pathname.isDirectory() || pathname.getPath().endsWith(".jpg"); } }); for (File one : files) scanFile(one); } else { try { // 原子整型的incrementAndGet方法,以原子方式将当前值加 1,返回更新的值 int index = rc.incrementAndGet(); System.out.println("Read0: " + index + " " + file.getPath()); // 添加到阻塞队列中 queue.put(file); } catch (InterruptedException e) { } } } }; // submit方法提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。 exec.submit(read); // 四个写线程 for (int index = 0; index < 4; index++) { // write thread final int num = index; Runnable write = new Runnable() { String threadName = "Write" + num; public void run() { while (true) { try { Thread.sleep(randomTime()); // 原子整型的incrementAndGet方法,以原子方式将当前值加 1,返回更新的值 int index = wc.incrementAndGet(); // 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。 File file = queue.take(); // 队列已经无对象 if (file == exitFile) { // 再次添加"标志",以让其他线程正常退出 queue.put(exitFile); break; } System.out.println(threadName + ": " + index + " " + file.getPath()); } catch (InterruptedException e) { } } } }; exec.submit(write); } exec.shutdown(); }

转载于:https://my.oschina.net/rouchongzi/blog/172448

你可能感兴趣的文章
CentOS mini 6.5 安装DB2 Express-C 问题处理记录
查看>>
DirectByteBuffer
查看>>
Docker Compose文件详解 V2
查看>>
Memcached的原理与应用(未完)
查看>>
基于 Confluence 6 数据中心的 SAML 单点登录设置你的身份提供者
查看>>
mysql总结
查看>>
Navicat for MySQL版本更新至v11.2.12,修复多项问题|附下载
查看>>
整理 JAVA中的IO流 (字符流和字节流两个大类)
查看>>
uefi与win8 (根据网络资料整理)
查看>>
Eclipse优化
查看>>
Log4j tutorial with Tomcat examples
查看>>
Kong 网关
查看>>
三层结构视频中的DBHelper.cs
查看>>
[转载] 信息系统项目管理师视频教程——18 项目沟通管理
查看>>
在Windows下建立QT开发环境
查看>>
Jedis、JedisPool、ShardedJedis和ShardedJedisPool,java对redis的基本操作
查看>>
[转载] 致命伴侣
查看>>
HTML5 localStorage本地存储实际应用举例
查看>>
Scala访问修饰符
查看>>
实习感悟
查看>>