背景
我的项目中,有一个需求是监听文件系统的变化,然后进行其他的操作,一旦在监听器中直接处理业务逻辑,就很容易引发很多bug(其实主要是早期设计不好,同步代码和锁还有莫名其妙的东西满天飞,业务逻辑都堆在一起了),比较难以修改和使用,最终我决定采用反应器模式来重写这部分逻辑。项目主要用到了JNotify和lombok这两个东西,前者是监听文件变化的,后者就不多说了,我想应该都知道。
概述
什么是反应器模式呢?反应器基本上有这样几个部分:
- 分发器:分发一个事件到处理器中进行处理,一般来说是一个线程,他可以对事件的类型之类的进行判断,然后分配给合适的处理器,在典型的reactor模式中,这都是在同一个线程内进行的,分发器的主体应该含有一个事件循环,读取容器中存在的事件然后分发和处理他们。
- 监听器或连接器或者类似的东西:收集事件,然后将他们放入分发器的容器中,稍后这些事件就会被分发器发现并且分发到处理器中。
- 处理器:具体进行某种具体的业务处理。
那么对于我的项目来说,首先应该收集文件事件到队列中,然后在分发器出队事件放入处理器,整个过程相比直接在listener中处理具体逻辑的做法,这是更加有序而可控的,对于这样一个业务来说,稳定性十分重要。
大致结构
我使用了一个单独的Thread作为分发器的线程,内部的run方法放置事件循环,然后同时继承listener,监听事件,看起来就像是这样:
@Component
public class ReactorDispatcher implements JNotifyListener,Runnable {
private CycleBarrier monitor = new CycleBarrier(2);
private Deque<FSEvent> queue = new ArrayDeque<>();
private int watcherId;
private Thread thread;
private boolean state = true;
// 这里使用了lombok
// data注解会自动为下面的字段添加Getter和Setter
// allArgsConstructor会给这个类添加一个默认的含有全部字段的构造方法
@Data
@AllArgsConstructor
private class FSEvent {
private String path;
private String name;
private int type;
}
// 初始化方法,spring完成装配后会首先执行他。
@PostConstruct
public void initialize() {
try {
// 使用JNotify开始监听文件系统
watcherId = JNotify.addWatcher(new File("files").getAbsolutePath(),
JNotify.FILE_CREATED|JNotify.FILE_DELETED);
// 创建线程,其实是不是也可以作为守护线程处理呢,
// 不过守护线程的特点我不熟悉,就还是用来普通的方法。
thread = new Thread(this);
thread.setName("reactor - fs listener");
thread.start();
}catch(Exception e) {
}
}
// 销毁方法,清理用。
@PreDestory
public void destory() {
try {
JNotify.removeWatcher(watcherId);
state = false;
} catch(Exception e) {
}
}
/**
* 这几种方法是监听器的方法,他会在文件发生变化的时候被回调。
* 通过这些方法可以将监听到的内容变为事件对象,然后存入队列。
*/
public void fileCreated(int type,String path,String name) {
FSEvent event = new FSEvent(JNotify.FILE_CREATED,path,name);
this.emit(event);
}
/**
* 这几种方法是监听器的方法,他会在文件发生变化的时候被回调。
* 通过这些方法可以将监听到的内容变为事件对象,然后存入队列。
*/
public void fileRemoved(int type,String path,String name) {
FSEvent event = new FSEvent(JNofiy.FILE_DELETED,path,name);
this.emit(event);
}
// 省略不必要的监听方法,其实JNotify还有两个监听方法,这里就不在列举了。
// 业务处理
private void resolveEvent(FSEvent event) {
// 在这里进行业务处理,ReactorDispatcher还会进行批量注入,将实现某个接口的实例作为list
// 注入进来(但是这段代码里面我没写,因为这个是需要根据实际情况设计的)即通过接
// 口注入一组业务逻辑的处理器
// 然后在这里根据type进行分发,虽然标准的reactor是单线程的,但是我的业务处理很多
// 时候是需要消耗相当的时间,所以我在这里用了线程池,所有的event都是异步处置,根据不同的
// 场景,时间消耗较低的可以直接单线程。当然以nio为基础的话,这里完全可以单线程的。
// 不过nio处理相对复杂一点,容易出现回调地狱这种问题,因此在没有好的处理方式之前,
// 还不如直接线程池异步处理,只要小心锁的问题就是了。
}
public void emit(FSEvent event) {
if(queue.isEmpty()) {
quque.addFirst(event);
try {
monitor.await();
} catch(Exception e) {
}
} else {
quque.addFirst(event);
}
}
public void run() {
while(state) {
try{
// 阻塞线程,直到Event出现。
monitor.await();
if(queue.isEmpty()) {
continue;
}
// removeLast 在没有下一个的时候会抛出空指针异常,需要catch一下,不然线程有
// 异常后会直接退出,连报错都没有。之前遇到线程莫名自动退出的问题,就是因为run
// 里面出现了异常。
while(!quque.isEmpty()) {
FSEvent event = queue.removeLast();
this.resolveEvent(event);
}
mointor.reset();
}catch(Exception e) {
}
}
}
}可以看到,相比直接在listener进行处理,收集事件然后统一进行分配,这样更加容易控制,而且详细的逻辑可以不用和这个监听和分发的结构混在一起,整体结构更加清晰明确。当然和web的reactor模式相比,我编写的有比较明显的差别,这种差别还是来自具体的应用场景,主要的思路还是差不多的。

大概的思路就像上图这样。
如果分发器线程没有事件,一直在循环,什么都不做,是会消耗cpu资源的.
所以增加了一个CycleBarrier,这个东西的作用,是在await的次数达到指定次数后,会释放锁,让线程继续执行,这里设置为2,当分发器线程没有event的时候,会进行一个await,这个时候分发器被锁住,线程阻塞,一直到出现event,在event出现后,CycleBarrier的await达到2,锁被释放.
分发器可以继续执行。其实反应器模式应该就是在无事件的时候阻塞的,之前我没有想好该怎么处理,后来读某个源码的时候看到了CountDownLatch和CycleBarrier,就突然想到可以用在这里。
2020 – 11 – 18 修改。





