import java.util.concurrent.BlockingQueue;public class Consumer implements Runnable{ private final BlockingQueue queue; public Consumer(BlockingQueue q) { queue = q; } public void run() { try { for(int i=0;i<100;i++){ consume(queue.take()); } } catch (InterruptedException ex) {} } void consume(Object x) { System.out.println("cousume"+x.toString()); }}
import java.util.concurrent.BlockingQueue;public class Producer implements Runnable{ private final BlockingQueue queue; public Producer(BlockingQueue q) { queue = q; } public void run() { try { for(int i=0;i<100;i++){ queue.put(produce()); } } catch (InterruptedException ex) {} } String produce() { String temp=""+(char)('A'+(int)(Math.random()*26)); System.out.println("produce"+temp); return temp; }}
//生产者 消费者 模式 ArrayBlockingQueue LinkedBlockingQueue 阻塞队列 BlockingQueuequeue1=new LinkedBlockingQueue (5);// Producer p=new Producer(queue1); Consumer c1=new Consumer(queue1); Consumer c2=new Consumer(queue1); new Thread(p).start(); new Thread(c1).start(); new Thread(c2).start();
List<Callable<Long>> callList = new ArrayList<Callable<Long>>();add一些Callable的实现类,多线程求和计算。
int threadCounts = 19;// 使用的线程数 long sum = 0; ExecutorService exec1 = Executors.newFixedThreadPool(threadCounts); List> callList = new ArrayList >(); // 生成很大的List List list = new ArrayList (); for (int i = 0; i <= 1000000; i++) { list.add(i); } int len = list.size() / threadCounts;// 平均分割List // List中的数量没有线程数多(很少存在) if (len == 0) { threadCounts = list.size();// 采用一个线程处理List中的一个元素 len = list.size() / threadCounts;// 重新平均分割List } for (int i = 0; i < threadCounts; i++) { final List subList; if (i == threadCounts - 1) { subList = list.subList(i * len, list.size()); } else { subList = list.subList(i * len, len * (i + 1) > list.size() ? list.size() : len * (i + 1)); } // 采用匿名内部类实现 callList.add(new Callable () { public Long call() throws Exception { long subSum = 0L; for (Integer i : subList) { subSum += i; } System.out.println("分配给线程:" + Thread.currentThread().getName() + "那一部分List的整数和为:\tSubSum:" + subSum); return subSum; } }); } List > futureList = exec1.invokeAll(callList); for (Future future : futureList) { sum += future.get(); } exec1.shutdown(); System.out.println(sum);
AtomicInteger的简单举例
// 阻塞队列,能容纳100个文件 final BlockingQueuequeue = new LinkedBlockingQueue (100); // 线程池 final ExecutorService exec = Executors.newFixedThreadPool(5); final File root = new File("D:\\jpg"); // 完成标志 final File exitFile = new File(""); // 原子整型,读个数 // AtomicInteger可以在并发情况下达到原子化更新,避免使用了synchronized,而且性能非常高。 final AtomicInteger rc = new AtomicInteger(); // 原子整型,写个数 final AtomicInteger wc = new AtomicInteger(); // 读线程 Runnable read = new Runnable() { public void run() { scanFile(root); scanFile(exitFile); } public void scanFile(File file) { if (file.isDirectory()) { File[] files = file.listFiles(new FileFilter() { public boolean accept(File pathname) { return pathname.isDirectory() || pathname.getPath().endsWith(".jpg"); } }); for (File one : files) scanFile(one); } else { try { // 原子整型的incrementAndGet方法,以原子方式将当前值加 1,返回更新的值 int index = rc.incrementAndGet(); System.out.println("Read0: " + index + " " + file.getPath()); // 添加到阻塞队列中 queue.put(file); } catch (InterruptedException e) { } } } }; // submit方法提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。 exec.submit(read); // 四个写线程 for (int index = 0; index < 4; index++) { // write thread final int num = index; Runnable write = new Runnable() { String threadName = "Write" + num; public void run() { while (true) { try { Thread.sleep(randomTime()); // 原子整型的incrementAndGet方法,以原子方式将当前值加 1,返回更新的值 int index = wc.incrementAndGet(); // 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。 File file = queue.take(); // 队列已经无对象 if (file == exitFile) { // 再次添加"标志",以让其他线程正常退出 queue.put(exitFile); break; } System.out.println(threadName + ": " + index + " " + file.getPath()); } catch (InterruptedException e) { } } } }; exec.submit(write); } exec.shutdown(); }