运用Disurptor一个生产者多个顾客不重复消费数据与BlockingQueue功能比照ITeye - AG环亚娱乐集团

运用Disurptor一个生产者多个顾客不重复消费数据与BlockingQueue功能比照ITeye

2019-01-10 20:57:16 | 作者: 香彤 | 标签: 运用,顾客,数据 | 浏览: 2463

        作为比较挨近正式运用的办法,我用一个出产者多个顾客来进行了Disurptor的不重复消费的功能测验,在这里我主要是介绍下我在测验过程中运用的代码以及呈现的状况做下阐明,这些状况有可能是我自己的代码原因引起的,在此也给自己留一个记载,假如看到的同学提出异议的费事给我说一下,关于Disruptor的其他介绍能够参阅这是我参阅他人的文章写的。

        运用maven的办法引进需求的jar

       

 dependencies 
 dependency 
 groupId com.lmax /groupId 
 artifactId disruptor /artifactId 
 version 3.3.7 /version 
 /dependency 
 dependency 
 groupId com.alibaba /groupId 
 artifactId fastjson /artifactId 
 version 1.2.20 /version 
 /dependency 
 /dependencies 

 

        在这里我运用的是WorkHandler接口来完成的顾客类,顾客的代码如下:

package com.demo.disruptor.consumer;
import com.demo.disruptor.dto.LogEvent;
import com.demo.disruptor.test.BlockingQueueTest;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
 * @author liaoyubo
 * @version 1.0 2017/10/9
 * @description 自界说顾客
public class LogEventWorkHandlerConsumer implements WorkHandler LogEvent {
 private long startTime;
 public LogEventWorkHandlerConsumer() {
 this.startTime = System.currentTimeMillis();
 @Override
 public void onEvent(LogEvent logEvent) throws Exception {
 //悉数转化为大写,用于耗时测验
 logEvent.setContent(logEvent.getContent().toUpperCase());
 //判别是否现已有了开端时刻
 if(logEvent.getStartTime() == null || logEvent.getStartTime() == 0){
 logEvent.setStartTime(startTime);
 }else {
 startTime = logEvent.getStartTime();
 //判别是否现已到最后
 if (logEvent.getLogId() +1 == BlockingQueueTest.eventNum) {
 long endTime = System.currentTimeMillis();
 System.out.println(" costTime1 = " + (endTime - startTime) + "ms");
 //System.out.println("顾客1-seq logEvent:" + logEvent.toString());
      LogEventWorkHandlerConsumer2和LogEventWorkHandlerConsumer3同LogEventWorkHandlerConsumer也相同。

 

    出产者类代码如下:

package com.demo.disruptor.producer;
import com.demo.disruptor.dto.LogEvent;
import com.lmax.disruptor.EventTranslatorVararg;
import com.lmax.disruptor.RingBuffer;
import java.util.Date;
 * @author liaoyubo
 * @version 1.0 2017/10/9
 * @description 运用translator办法到事情出产者发布事情,一般运用该办法
public class LogEventProducerWithTranslator {
 private EventTranslatorVararg eventTranslatorVararg = new EventTranslatorVararg LogEvent () {
 public void translateTo(LogEvent logEvent, long l, Object... objects) {
 logEvent.setLogId((Long) objects[0]);
 logEvent.setContent((String)objects[1]);
 logEvent.setDate((Date)objects[2]);
 private RingBuffer LogEvent ringBuffer;
 public LogEventProducerWithTranslator(RingBuffer LogEvent ringBuffer) {
 this.ringBuffer = ringBuffer;
 public void onData(long logId, String content, Date date){
 //System.out.println("出产者:" + logId);
 ringBuffer.publishEvent(eventTranslatorVararg,logId,content,date);

    创立的实体目标:

package com.demo.disruptor.dto;
import com.alibaba.fastjson.JSONObject;
import java.util.Date;
 * @author liaoyubo
 * @version 1.0 2017/10/9
 * @description 自界说事情目标
public class LogEvent {
 private long logId;
 private String content;
 private Date date;
 private Long startTime;
 public long getLogId() {
 return logId;
 public void setLogId(long logId) {
 this.logId = logId;
 public String getContent() {
 return content;
 public void setContent(String content) {
 this.content = content;
 public Date getDate() {
 return date;
 public void setDate(Date date) {
 this.date = date;
 public Long getStartTime() {
 return startTime;
 public void setStartTime(Long startTime) {
 this.startTime = startTime;
 @Override
 public String toString() {
 return JSONObject.toJSONString(this);
}
一个用于界说测验次数的类:
package com.demo.disruptor.test;
 * @author liaoyubo
 * @version 1.0 2017/10/11
 * @description
public class BlockingQueueTest {
 public static int eventNum = 5000000;
}
用于界说反常的代码:
package com.demo.disruptor.exception;
import com.demo.disruptor.dto.LogEvent;
import com.lmax.disruptor.ExceptionHandler;
 * @author liaoyubo
 * @version 1.0 2017/10/12
 * @description
public class LogEventExceptionHandler implements ExceptionHandler LogEvent {
 @Override
 public void handleEventException(Throwable throwable, long l, LogEvent logEvent) {
 System.out.println("handleEventException....");
 @Override
 public void handleOnStartException(Throwable throwable) {
 System.out.println("handleOnStartException....");
 @Override
 public void handleOnShutdownException(Throwable throwable) {
 System.out.println("handleOnShutdownException....");

    下面是用于测验耗时的主程序:

package com.demo.disruptor.test;
import com.demo.disruptor.consumer.*;
import com.demo.disruptor.dto.LogEvent;
import com.demo.disruptor.exception.LogEventExceptionHandler;
import com.demo.disruptor.factory.LogEventFactory;
import com.demo.disruptor.producer.LogEventProducer;
import com.demo.disruptor.producer.LogEventProducerWithTranslator;
import com.lmax.disruptor.*;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.util.Date;
import java.util.concurrent.*;
 * @author liaoyubo
 * @version 1.0 2017/10/11
 * @description
public class DisruptorTest {
 //5000000:554,627,545,602,550,578,675,626,587,692 604
 //10000000:1657,1471,1234,1231,1302,1083,1186,1064,1044,1073 1235
 //50000000:5017,5255,5048,5009,5410,4609,5979,5184,5060,4771 5134
 public static void main(String [] args) throws TimeoutException, InterruptedException, InsufficientCapacityException {
 LogEventFactory factory = new LogEventFactory();
 //ExecutorService executor = Executors.newCachedThreadPool(); // 线程池
 int ringBufferSize = 65536;
 final Disruptor LogEvent disruptor = new Disruptor LogEvent (factory,ringBufferSize, DaemonThreadFactory.INSTANCE, ProducerType.MULTI, new YieldingWaitStrategy());
 EventHandlerGroup LogEvent eventHandlerGroup = disruptor.handleEventsWithWorkerPool(new LogEventWorkHandlerConsumer(),new LogEventWorkHandlerConsumer2(),new LogEventWorkHandlerConsumer3());
 final RingBuffer LogEvent ringBuffer = disruptor.getRingBuffer();
 disruptor.setDefaultExceptionHandler(new LogEventExceptionHandler());
 disruptor.start();
 /*new Thread(new Runnable() {
 @Override
 public void run() {
 RingBuffer LogEvent ringBuffer = disruptor.getRingBuffer();
 //进行事情的发布
 LogEventProducer logEventProducer = new LogEventProducer(ringBuffer);
 for(int i = 0; i BlockingQueueTest.eventNum; i++){
 logEventProducer.onData(i, "c" + i, new Date());
 }).start();*/
 //启用独自线程进行发布
 Thread thread = new Thread(new Runnable() {
 @Override
 public void run() {
 //进行事情的发布
 LogEventProducerWithTranslator producerWithTranslator = new LogEventProducerWithTranslator(ringBuffer);
 for(int i = 0; i BlockingQueueTest.eventNum; i++){
 producerWithTranslator.onData(i, "c" + i, new Date());
 thread.start();
 while (true){
 long sequence = ringBuffer.getMinimumGatingSequence();
 //System.out.println(sequence);
 if(sequence + 1 == BlockingQueueTest.eventNum){
 break;
 //检查现在运转的线程
 //Thread.currentThread().getThreadGroup().list();
 //从代码上了解应该仅仅把顾客线程封闭了
 disruptor.halt();
 //Thread.currentThread().getThreadGroup().list();
 //在封闭disruptor后假如建议新线程,那么新线程不会封闭,由于需求顾客消防数据导致处于阻塞状态,线程会一向挂起
 /*Thread thread1 = new Thread(new Runnable() {
 @Override
 public void run() {
 //进行事情的发布
 LogEventProducerWithTranslator producerWithTranslator = new LogEventProducerWithTranslator(ringBuffer);
 for(int i = 0; i BlockingQueueTest.eventNum; i++){
 producerWithTranslator.onData(i, "d" + i, new Date());
 thread1.setName("thread1");
 thread1.start();
 while (true){
 long sequence = ringBuffer.getMinimumGatingSequence();
 System.out.println(sequence);
 if(sequence + 1 == BlockingQueueTest.eventNum){
 break;
 Thread.currentThread().getThreadGroup().list();*/
 System.out.println(eventHandlerGroup.asSequenceBarrier().getCursor());
}

 这个当地由于增加了封闭程序,所以功能上面比没有增加封闭操作及判别要慢一些,上面的测验数据是没有封闭操作的测验数据。

    下面是BlockingQueue的代码:

/**
 * @author liaoyubo
 * @version 1.0 2017/10/11
 * @description
public class BlockingQueueMultiTest {
 //5000000:1195,1189,1147,1181,1135,1174,1216,1133,1145,1093 1161
 //10000000:2633,1972,2417,2330,2255,2429,2354,2178,2235,2402 2321
 //50000000:11841,11902,13048,12262,9769,10531,12721,12229,12283,13595 12018
 public static void main(String[] args) throws InterruptedException {
 final BlockingQueue LogEvent queue = new ArrayBlockingQueue LogEvent (65536);
 //System.out.println("开端时刻:" + startTime);
 new Thread(new Runnable() {
 @Override
 public void run() {
 int i = 0;
 while (i BlockingQueueTest.eventNum) {
 LogEvent logEvent = new LogEvent();
 logEvent.setLogId(i);
 logEvent.setContent("c" + i);
 logEvent.setDate(new Date());
 try {
 queue.put(logEvent);
 } catch (InterruptedException e) {
 e.printStackTrace();
 i++;
 }).start();
 // 创立缓冲池
 final ExecutorService executorService = Executors.newCachedThreadPool();
 final long startTime = System.currentTimeMillis();
 executorService.execute(new Thread(new Runnable() {
 @Override
 public void run() {
 while (true) {
 try {
 LogEvent logEvent = queue.take();
 logEvent.setContent(logEvent.getContent().toUpperCase());
 //System.out.println("BlockingQueue1获取数据:" + (countDownLatch.getCount()));
 if(logEvent.getLogId() + 1 == BlockingQueueTest.eventNum){
 break;
 } catch (InterruptedException e) {
 e.printStackTrace();
 long endTime = System.currentTimeMillis();
 System.out.println("BlockingQueue1 花费时刻:" + (endTime-startTime) + "ms");
 }));
 executorService.execute(new Thread(new Runnable() {
 @Override
 public void run() {
 while (true) {
 try {
 LogEvent logEvent = queue.take();
 logEvent.setContent(logEvent.getContent().toUpperCase());
 //System.out.println("BlockingQueue1获取数据:" + (countDownLatch.getCount()));
 if(logEvent.getLogId() + 1 == BlockingQueueTest.eventNum){
 break;
 } catch (InterruptedException e) {
 e.printStackTrace();
 long endTime = System.currentTimeMillis();
 System.out.println("BlockingQueue2 花费时刻:" + (endTime-startTime) + "ms");
 }));
 executorService.execute(new Thread(new Runnable() {
 @Override
 public void run() {
 while (true) {
 try {
 LogEvent logEvent = queue.take();
 logEvent.setContent(logEvent.getContent().toUpperCase());
 //System.out.println("BlockingQueue1获取数据:" + (countDownLatch.getCount()));
 if(logEvent.getLogId() + 1 == BlockingQueueTest.eventNum){
 break;
 } catch (InterruptedException e) {
 e.printStackTrace();
 long endTime = System.currentTimeMillis();
 System.out.println("BlockingQueue3 花费时刻:" + (endTime-startTime) + "ms");
 }));
}

     上面的Disruptor和BlockingQueue别离运用了3个线程来消费出产的数据,别离用500W,1000W,5000W的出产数据来取运转10次的平均值来检查成果:

版权声明
本文来源于网络,版权归原作者所有,其内容与观点不代表AG环亚娱乐集团立场。转载文章仅为传播更有价值的信息,如采编人员采编有误或者版权原因,请与我们联系,我们核实后立即修改或删除。

猜您喜欢的文章

阅读排行

  • 1

    oxygenxml.oxygenITeye

    插件,生成器,代码
  • 2

    JVM参数装备大全ITeye

    信息,打印,前后
  • 3

    ThreadITeye

    先后,正常,作业
  • 4
  • 5

    vba upgradeITeye

    文件,程序,晋级
  • 6

    java 多线程ITeye

    线程,作业,内存
  • 7
  • 8

    ClassLoaderITeye

    运用,文件,办法
  • 9
  • 10

    手机号码校验合法性ITeye

    代表,必定,第二位