guava学习(一):观察者模式

时间:2022-07-23
本文章向大家介绍guava学习(一):观察者模式,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

观察者模式是很常见的一种行为型设计模式。在Java原生态的实现方式中,观察者实现Observer接口,被观察者继承Observable。

下面编写一个使用Java api简单的实现。

观察者代码:

public class MyObserver implements Observer {
    public void update(Observable o, Object arg) {        if (o instanceof MyObservable){            System.out.println(arg);        }
    }}

被观察者:

public class MyObservable extends Observable {
    @Override    public void notifyObservers(Object message){        super.setChanged();        super.notifyObservers(message);    }
}

绑定主题类:

public class Subject {
    private Observable observable = new MyObservable();
    public void registerObserver(MyObserver observer) {        observable.addObserver(observer);    }
    public void removeObserver(MyObserver observer) {        observable.deleteObserver(observer);    }
    public void notifyObservers(String message) {        observable.notifyObservers(message);    }}

测试代码

public static void main(String[] args) {        Subject subject = new Subject();        MyObserver observer = new MyObserver();        subject.registerObserver(observer);        subject.notifyObservers("hi, I am subject Observable");}

java的实现方式,如果观察者使用异步来实现消息处理,会使业务代码和非业务代码耦合在一起。

guava封装了Java的观察者模式,并且方便的支持异步。talk is cheap,先看一下代码:

定义2个观察者:

public class AObserver {
    Logger logger = LoggerFactory.getLogger(getClass());
    @Subscribe    public void handleMessage(String msg){        logger.info("a obsesrver receive message:{}", msg);    }}
public class BObserver {
    Logger logger = LoggerFactory.getLogger(getClass());
    @Subscribe    public void handleMessage(String msg){        logger.info("b obsesrver receive message:{}", msg);    }}

EventBusUtil类

public class EventBusUtil {
    public static EventBus getEventBus(){        return EventBusFactory.getAsyncInstance();    }
    public static class EventBusFactory{        private static EventBus asyncEventBus = new AsyncEventBus(LocalThreadPoolExecutor.getExecutor());        private static EventBus syncEventBus = new AsyncEventBus(MoreExecutors.directExecutor());
        public static EventBus getAsyncInstance(){            return asyncEventBus;        }
        public static EventBus getyncInstance(){            return syncEventBus;        }
    }}

注意:MoreExecutors.directExecutor()看起来是线程池,其实是单线程,看源码注解:

测试代码:

public static void main(String[] args){        EventBus eventBus = EventBusUtil.getEventBus();        eventBus.register(new AObserver());        eventBus.register(new BObserver());
        for (int j = 0; j < 2; j ++){            eventBus.post("hi, observer" + j);        }    }

下面看一下guava中的实现:

1)EventBus中的注册,可以注册任意对象作为观察者

/**   * Registers all subscriber methods on {@code object} to receive events.   *   * @param object object whose subscriber methods should be registered.   */  public void register(Object object) {    subscribers.register(object);  }

所有的观察者类中,处理监听事件的方法加了注解@Subscribe,注册的时候,会查找类中加了这个注解的方法然后进行注册,见下面代码中的

findAllSubscribers方法

  /** Registers all subscriber methods on the given listener object. */  void register(Object listener) {  //获取所有加了@Subscribe注解的方法    Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
    for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {      Class<?> eventType = entry.getKey();      Collection<Subscriber> eventMethodsInListener = entry.getValue();
      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
      if (eventSubscribers == null) {//还没有注册        CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();        eventSubscribers =            MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);      }
      eventSubscribers.addAll(eventMethodsInListener);    }  }

2)EventBus中的通知

/**   * Posts an event to all registered subscribers. This method will return successfully after the   * event has been posted to all subscribers, and regardless of any exceptions thrown by   * subscribers.   *   * <p>If no subscribers have been subscribed for {@code event}'s class, and {@code event} is not   * already a {@link DeadEvent}, it will be wrapped in a DeadEvent and reposted.   *   * @param event event to post.   */  public void post(Object event) {    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);    if (eventSubscribers.hasNext()) {      dispatcher.dispatch(event, eventSubscribers);    } else if (!(event instanceof DeadEvent)) {      // the event had no subscribers and was not itself a DeadEvent      post(new DeadEvent(this, event));    }  }

从上面代码可以看出,通过dispatcher.dispatch方法进行通知,这个方法的代码看下面代码:

/** Global event queue. */    private final ConcurrentLinkedQueue<EventWithSubscriber> queue =        Queues.newConcurrentLinkedQueue();
    @Override    void dispatch(Object event, Iterator<Subscriber> subscribers) {      checkNotNull(event);      while (subscribers.hasNext()) {        queue.add(new EventWithSubscriber(event, subscribers.next()));      }
      EventWithSubscriber e;      while ((e = queue.poll()) != null) {        e.subscriber.dispatchEvent(e.event);      }    }

上面的代码能够看出,消息事件event和观察者subscriber封装成一个对象放入并发队列中,然后出队让观察者触发消息处理。

/** Dispatches {@code event} to this subscriber using the proper executor. */  final void dispatchEvent(final Object event) {    executor.execute(        new Runnable() {          @Override          public void run() {            try {              invokeSubscriberMethod(event);            } catch (InvocationTargetException e) {              bus.handleSubscriberException(e.getCause(), context(event));            }          }        });  }

这儿的线程池正是我们在声明EventBus时传入的线程池变量。最后的事件触发使用了java的反射。

@VisibleForTesting  void invokeSubscriberMethod(Object event) throws InvocationTargetException {    try {      method.invoke(target, checkNotNull(event));    } catch (IllegalArgumentException e) {      throw new Error("Method rejected target/argument: " + event, e);    } catch (IllegalAccessException e) {      throw new Error("Method became inaccessible: " + event, e);    } catch (InvocationTargetException e) {      if (e.getCause() instanceof Error) {        throw (Error) e.getCause();      }      throw e;    }  }

代码分析就到这儿,guava详细代码请看这里:

https://github.com/google/guava

文中的示例代码请看这里

https://github.com/jinjunzhu/myguava.git