spring与disruptor集成的简单示例

作者:Muroidea 时间:2021-12-16 11:01:41 

disruptor不过多介绍了,描述下当前的业务场景,两个应用A,B,应用 A 向应用 B 传递数据 . 数据传送比较快,如果用http直接push数据然后入库,效率不高.有可能导致A应用比较大的压力. 使用mq 太重量级,所以选择了disruptor. 也可以使用Reactor

BaseQueueHelper.java


/**
* lmax.disruptor 高效队列处理模板. 支持初始队列,即在init()前进行发布。
*
* 调用init()时才真正启动线程开始处理 系统退出自动清理资源.
*
* @author xielongwang
* @create 2018-01-18 下午3:49
* @email xielong.wang@nvr-china.com
* @description
*/
public abstract class BaseQueueHelper<D, E extends ValueWrapper<D>, H extends WorkHandler<E>> {

/**
  * 记录所有的队列,系统退出时统一清理资源
  */
 private static List<BaseQueueHelper> queueHelperList = new ArrayList<BaseQueueHelper>();
 /**
  * Disruptor 对象
  */
 private Disruptor<E> disruptor;
 /**
  * RingBuffer
  */
 private RingBuffer<E> ringBuffer;
 /**
  * initQueue
  */
 private List<D> initQueue = new ArrayList<D>();

/**
  * 队列大小
  *
  * @return 队列长度,必须是2的幂
  */
 protected abstract int getQueueSize();

/**
  * 事件工厂
  *
  * @return EventFactory
  */
 protected abstract EventFactory<E> eventFactory();

/**
  * 事件消费者
  *
  * @return WorkHandler[]
  */
 protected abstract WorkHandler[] getHandler();

/**
  * 初始化
  */
 public void init() {
   ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("DisruptorThreadPool").build();
   disruptor = new Disruptor<E>(eventFactory(), getQueueSize(), namedThreadFactory, ProducerType.SINGLE, getStrategy());
   disruptor.setDefaultExceptionHandler(new MyHandlerException());
   disruptor.handleEventsWithWorkerPool(getHandler());
   ringBuffer = disruptor.start();

//初始化数据发布
   for (D data : initQueue) {
     ringBuffer.publishEvent(new EventTranslatorOneArg<E, D>() {
       @Override
       public void translateTo(E event, long sequence, D data) {
         event.setValue(data);
       }
     }, data);
   }

//加入资源清理钩子
   synchronized (queueHelperList) {
     if (queueHelperList.isEmpty()) {
       Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         public void run() {
           for (BaseQueueHelper baseQueueHelper : queueHelperList) {
             baseQueueHelper.shutdown();
           }
         }
       });
     }
     queueHelperList.add(this);
   }
 }

/**
  * 如果要改变线程执行优先级,override此策略. YieldingWaitStrategy会提高响应并在闲时占用70%以上CPU,
  * 慎用SleepingWaitStrategy会降低响应更减少CPU占用,用于日志等场景.
  *
  * @return WaitStrategy
  */
 protected abstract WaitStrategy getStrategy();

/**
  * 插入队列消息,支持在对象init前插入队列,则在队列建立时立即发布到队列处理.
  */
 public synchronized void publishEvent(D data) {
   if (ringBuffer == null) {
     initQueue.add(data);
     return;
   }
   ringBuffer.publishEvent(new EventTranslatorOneArg<E, D>() {
     @Override
     public void translateTo(E event, long sequence, D data) {
       event.setValue(data);
     }
   }, data);
 }

/**
  * 关闭队列
  */
 public void shutdown() {
   disruptor.shutdown();
 }
}

EventFactory.java


/**
* @author xielongwang
* @create 2018-01-18 下午6:24
* @email xielong.wang@nvr-china.com
* @description
*/
public class EventFactory implements com.lmax.disruptor.EventFactory<SeriesDataEvent> {

@Override
 public SeriesDataEvent newInstance() {
   return new SeriesDataEvent();
 }
}

MyHandlerException.java


public class MyHandlerException implements ExceptionHandler {

private Logger logger = LoggerFactory.getLogger(MyHandlerException.class);

/*
  * (non-Javadoc) 运行过程中发生时的异常
  *
  * @see
  * com.lmax.disruptor.ExceptionHandler#handleEventException(java.lang.Throwable
  * , long, java.lang.Object)
  */
 @Override
 public void handleEventException(Throwable ex, long sequence, Object event) {
   ex.printStackTrace();
   logger.error("process data error sequence ==[{}] event==[{}] ,ex ==[{}]", sequence, event.toString(), ex.getMessage());
 }

/*
  * (non-Javadoc) 启动时的异常
  *
  * @see
  * com.lmax.disruptor.ExceptionHandler#handleOnStartException(java.lang.
  * Throwable)
  */
 @Override
 public void handleOnStartException(Throwable ex) {
   logger.error("start disruptor error ==[{}]!", ex.getMessage());
 }

/*
  * (non-Javadoc) 关闭时的异常
  *
  * @see
  * com.lmax.disruptor.ExceptionHandler#handleOnShutdownException(java.lang
  * .Throwable)
  */
 @Override
 public void handleOnShutdownException(Throwable ex) {
   logger.error("shutdown disruptor error ==[{}]!", ex.getMessage());
 }
}

SeriesData.java (代表应用A发送给应用B的消息)


public class SeriesData {
 private String deviceInfoStr;
 public SeriesData() {
 }

public SeriesData(String deviceInfoStr) {
   this.deviceInfoStr = deviceInfoStr;
 }

public String getDeviceInfoStr() {
   return deviceInfoStr;
 }

public void setDeviceInfoStr(String deviceInfoStr) {
   this.deviceInfoStr = deviceInfoStr;
 }

@Override
 public String toString() {
   return "SeriesData{" +
       "deviceInfoStr='" + deviceInfoStr + '\'' +
       '}';
 }
}

SeriesDataEvent.java


public class SeriesDataEvent extends ValueWrapper<SeriesData> {
}

SeriesDataEventHandler.java


public class SeriesDataEventHandler implements WorkHandler<SeriesDataEvent> {
 private Logger logger = LoggerFactory.getLogger(SeriesDataEventHandler.class);
 @Autowired
 private DeviceInfoService deviceInfoService;

@Override
 public void onEvent(SeriesDataEvent event) {
   if (event.getValue() == null || StringUtils.isEmpty(event.getValue().getDeviceInfoStr())) {
     logger.warn("receiver series data is empty!");
   }
   //业务处理
   deviceInfoService.processData(event.getValue().getDeviceInfoStr());
 }
}

SeriesDataEventQueueHelper.java


@Component
public class SeriesDataEventQueueHelper extends BaseQueueHelper<SeriesData, SeriesDataEvent, SeriesDataEventHandler> implements InitializingBean {
 private static final int QUEUE_SIZE = 1024;
 @Autowired
 private List<SeriesDataEventHandler> seriesDataEventHandler;

@Override
 protected int getQueueSize() {
   return QUEUE_SIZE;
 }

@Override
 protected com.lmax.disruptor.EventFactory eventFactory() {
   return new EventFactory();
 }

@Override
 protected WorkHandler[] getHandler() {
   int size = seriesDataEventHandler.size();
   SeriesDataEventHandler[] paramEventHandlers = (SeriesDataEventHandler[]) seriesDataEventHandler.toArray(new SeriesDataEventHandler[size]);
   return paramEventHandlers;
 }

@Override
 protected WaitStrategy getStrategy() {
   return new BlockingWaitStrategy();
   //return new YieldingWaitStrategy();
 }

@Override
 public void afterPropertiesSet() throws Exception {
   this.init();
 }
}

ValueWrapper.java


public abstract class ValueWrapper<T> {
 private T value;
 public ValueWrapper() {}
 public ValueWrapper(T value) {
   this.value = value;
 }

public T getValue() {
   return value;
 }

public void setValue(T value) {
   this.value = value;
 }
}

DisruptorConfig.java


@Configuration
@ComponentScan(value = {"com.portal.disruptor"})
//多实例几个消费者
public class DisruptorConfig {

/**
  * smsParamEventHandler1
  *
  * @return SeriesDataEventHandler
  */
 @Bean
 public SeriesDataEventHandler smsParamEventHandler1() {
   return new SeriesDataEventHandler();
 }

/**
  * smsParamEventHandler2
  *
  * @return SeriesDataEventHandler
  */
 @Bean
 public SeriesDataEventHandler smsParamEventHandler2() {
   return new SeriesDataEventHandler();
 }

/**
  * smsParamEventHandler3
  *
  * @return SeriesDataEventHandler
  */
 @Bean
 public SeriesDataEventHandler smsParamEventHandler3() {
   return new SeriesDataEventHandler();
 }

/**
  * smsParamEventHandler4
  *
  * @return SeriesDataEventHandler
  */
 @Bean
 public SeriesDataEventHandler smsParamEventHandler4() {
   return new SeriesDataEventHandler();
 }

/**
  * smsParamEventHandler5
  *
  * @return SeriesDataEventHandler
  */
 @Bean
 public SeriesDataEventHandler smsParamEventHandler5() {
   return new SeriesDataEventHandler();
 }
}

测试


 //注入SeriesDataEventQueueHelper消息生产者
 @Autowired
 private SeriesDataEventQueueHelper seriesDataEventQueueHelper;

@RequestMapping(value = "/data", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE)
 public DataResponseVo<String> receiverDeviceData(@RequestBody String deviceData) {
   long startTime1 = System.currentTimeMillis();

if (StringUtils.isEmpty(deviceData)) {
     logger.info("receiver data is empty !");
     return new DataResponseVo<String>(400, "failed");
   }
   seriesDataEventQueueHelper.publishEvent(new SeriesData(deviceData));
   long startTime2 = System.currentTimeMillis();
   logger.info("receiver data ==[{}] millisecond ==[{}]", deviceData, startTime2 - startTime1);
   return new DataResponseVo<String>(200, "success");
 }

应用A通过/data 接口把数据发送到应用B ,然后通过seriesDataEventQueueHelper 把消息发给disruptor队列,消费者去消费,整个过程对不会堵塞应用A. 可接受消息丢失, 可以通过扩展SeriesDataEventQueueHelper来达到对disruptor队列的监控

来源:http://blog.csdn.net/u014087707/article/details/79340463

标签:spring,disruptor
0
投稿

猜你喜欢

  • 详谈jvm--Java中init和clinit的区别

    2022-01-10 10:35:22
  • 学生视角带你了解Java内部类

    2022-09-29 17:51:04
  • SpringBoot如何在运行时动态添加数据源

    2023-11-13 21:36:40
  • JavaWeb入门教程之分页查询功能的简单实现

    2021-11-11 21:52:23
  • c# WPF中的TreeView使用详解

    2021-05-24 15:17:58
  • Java程序控制逻辑—流程控制

    2023-08-28 01:51:18
  • Java BoxLayout(盒子布局)布局管理器解析

    2022-07-19 05:26:09
  • 简介Java的Hibernate框架中的Session和持久化类

    2023-04-17 14:41:13
  • Javassist如何操作Java 字节码

    2021-08-09 08:21:28
  • Spring Bean后处理器详细介绍

    2021-06-27 07:29:06
  • C#数据结构与算法揭秘五 栈和队列

    2023-09-13 13:31:47
  • java.util.Collection源码分析与深度理解

    2022-07-31 09:05:52
  • C# Partial:分部方法和分部类代码实例

    2022-04-07 11:28:26
  • C#常用数据结构和算法总结

    2021-10-31 22:05:29
  • 详解java为什么不允许类多重继承却允许接口多重继承

    2022-12-26 03:25:57
  • java 汉诺塔Hanoi递归、非递归(仿系统递归)和非递归规律 实现代码

    2023-09-13 11:29:31
  • Java并发编程之详解CyclicBarrier线程同步

    2023-08-01 14:08:47
  • C# 调用API函数弹出映射网络驱动器对话框问题

    2023-03-25 15:41:05
  • WebService教程详解(一)

    2022-02-26 09:59:50
  • Java套接字(Socket)网络编程入门

    2022-09-22 16:19:56
  • asp之家 软件编程 m.aspxhome.com