Event Store框架探究

时间:2022-05-03
本文章向大家介绍Event Store框架探究,主要内容包括摘要:、挑战:、详细设计:、基本概念、基础应用、原理机制和需要注意的事项等,并结合实例形式分析了其使用技巧,希望通过本文能帮助到大家理解应用这部分内容。

摘要:

  游戏开发中,经常会越到千奇百怪的Bug。后台程序都是以demon 方式运行,要么GDB,要么Log。一些确定性的bug可以直接使用GDB调试,比如特定请求会Crash。如果是运行一段时间,Bug才会出现,无明显规律,那么也只能使用Log了。但是从成千上万条日志中Grep、分析、定位,然后修改代码、测试,这个过程效率极其低,有的时候挫折感倍强,想骂娘都。经过一些总结后,我们希望程序能够具有完整跟踪用户行为的功能。用户的行为被完整的记录下来,针对领域对象提供类似“快照”的功能,当程序出现问题时,我们可以从某个正确的“快照”为起点,回放用户的操作,这样Bug可以被重现,修复bug后也可以通过回放用户操作来验证正确与否。

挑战:

event store的相关概念:

  这样的”用户行为回放系统”以前还真没搞过, 最近对这个功能非常感兴趣,查阅了一些资料,前辈们在这方面还是有不少研究的。而且我自己也在实践DDDD,非常关注DDD社区的活动。DDD社区最近流行的设计思想CQRS强调命令与查询分离,并且成熟的几套框架都有集成Even Store。何为Event Store,其就是用来记录用户行为的对象。DDD强调关注点集中在和领域问题相关的几个领域对象上。用户的操作实际就对应着领域对象的修改。那么将领域对象的每次修改抽象成一个event事件,把这些event都存储在event Store, 当需要重新构造领域对象时, 遍历相应的event增量式的构造出领域对象。

推荐:Martin Fowler 的http://martinfowler.com/bliki/CQRS.html

snapshot 快照功能:

  如果把领域对象的所有操作都记录为event,随着时间推移,event可能积累的很多,当构造领域对象时可能需要花费较大的开销。在领域对象修改到一定量时,snapshot 为领域对象设置快照,那么恢复领域对象时只需从最近的快照开始回放event即可。snapshot保存了领域对象关键的修改点,它是对回放event构造领域对象的优化。

如何序列话对象:

  主要设计到两种对象,一个是event对象,一个是领域实体entity。event记录了用户行为,被event_store按时间(又称version)顺序记录,entity序列化发生在会entity设置快照时。

  有序列化当然有反序列化,实体对象必须能够从序列化的数据中(即snapshot数据)回建对象。并把event在实体对象上回放,也需要将event从序列化数据中回建。

详细设计:

简化模型:

  我们用entity_user对象模拟玩家的操作,假设有两个接口,inc_gold 和 inc_level,即玩家增涨金钱、增长等级。

类设计:

  1. 序列化基类serializer_i, encode用来序列化对象,decode用于反序列化对象
class serializer_i
{
public:
    virtual ~serializer_i(){}
    virtual int decode(const json_value_t& jval_) = 0;
    virtual string encode() const          = 0;
};
  1. 领域实体对象基类 entity_i,领域实体对象必须用于唯一的id以方便进行索引,并且如前文所述,实体对象必须是可以序列化、反序列话的,当对其进行snapshot时需要对其进行序列化,当构造该对象时需要从快照数据中反序列化构建对象。
class entity_i: public serializer_i
{
public:
    entity_i(uint64_t id_):
        m_id(id_)
    {}
    virtual ~entity_i(){}
    uint64_t id() const { return m_id; }

protected:
    uint64_t m_id;
};
  1. 事件基类 event_i,所有对领域对象的修改都是通过Raise一个特定事件完成的,由于C++是强类型的语言并且支持重载,领域对象针对每个event都有一个特定的apply接口。event也继承自serializer_i,当其被存储到eventStore中时需要序列化,当要回放event恢复entity时需要反序列化event。
class event_i: public serializer_i
{
public:
    virtual ~event_i(){}
};
  1. 事件派发器event_dispather_i,实际上就是用来实现反射功能,反序列化event时需要根据不同类型的event调用不同的entity的apply接口,此对象能够保证event会被正确的被apply调用。
class event_dispather_i
{
public:
    ~event_dispather_i(){}
    virtual int dispath(const string& json_) = 0;
};
  1. 事件仓库基类,其提供功能有三
class event_store_i
{
public:
    virtual ~event_store_i(){}
    virtual int save_event(uint64_t entity_id_, const event_i& event_) = 0;
    virtual int snapshot_entity(const entity_i& entity_) = 0;
    virtual int constuct_snapshot_last(entity_i& entity_, event_dispather_i& event_dispacher_, int version_ = 1) = 0;
};

  1. 存储event事件

  2.  保存领域对象实体的快照数据

  3.  通过某个版本的快照,回建领域对象

  1. 结构图如下

7. 示例代码

  1. User实体对象:User维护两个成员变量gold和level,用来表示当前用户的金钱和等级,inc_gold和inc_level是两个Cmd接口,验证参数有效Raise一个inc_gold_event事件,参见示例代码:
class event_store_i
{
public:
    virtual ~event_store_i(){}
    virtual int save_event(uint64_t entity_id_, const event_i& event_) = 0;
    virtual int snapshot_entity(const entity_i& entity_) = 0;
    virtual int constuct_snapshot_last(entity_i& entity_, event_dispather_i& event_dispacher_, int version_ = 1) = 0;
};

  为简化操作,我们假设金钱是经常变更的,而等级变化较慢,level变化为关键变化,每当level改变我们都会为实体建立新的快照:

int entity_user_t::inc_level(int32_t level_)
{
    if (level_ <= 0)
        return -1;
    inc_level_event_t event(level_);
    apply(event);
    m_event_store->save_event(this->id(), event);
    m_event_store->snapshot_entity(*this);
    return 0;
}

而对应的apply接口则非常的简单,因为参数已经进过验证,apply是实实在在的改变对象状态的内部方法:

void entity_user_t::apply(const inc_level_event_t& event_)
{
    m_level += event_.level;
}
  1. event 对象除了用于特定的数据字段,最主要的当属decode和encode接口。这里为了方便调试我使用了json序列化和反序列化方式,json的decode和encode有不小的开销,基于二进制的序列化和反序列化可以达到很高的实时性,存在很大的优化空间。
struct inc_level_event_t: public event_i
{
    inc_level_event_t():
        level(0)
    {}
    inc_level_event_t(int32_t level_):
    level(level_)
    {}
    int decode(const json_value_t& jval_)
    {
        json_instream_t in("inc_level_event_t");
        in.decode("level", jval_["level"], level);
        return 0;
    }
    string encode() const
    {
        rapidjson::Document::AllocatorType allocator;
        rapidjson::StringBuffer            str_buff;
        json_value_t                       ibj_json(rapidjson::kObjectType);
        json_value_t                       ret_json(rapidjson::kObjectType);
        
        json_outstream_t out(allocator);
        out.encode("level", ibj_json, level);
        ret_json.AddMember("inc_level_event_t", ibj_json, allocator);
        
        rapidjson::Writer<rapidjson::StringBuffer> writer(str_buff, &allocator);
        ret_json.Accept(writer);
        string output(str_buff.GetString(), str_buff.GetSize());
        return output;
    }
    int32_t level;
};
  1. event store 的实现,为了简便起见,本示例框架并没有将序列化的数据落盘,而是直接存储在内存中。真是的eventStore可以采用写文件的方式或者Sqlite也是很好的方案。
class event_store_mem_t: public event_store_i
{
    typedef vector<string>                event_record_t;
    typedef map<uint64_t, event_record_t> entity_event_map_t;
    
    struct  snapshot_info_t
    {
        long   event_version;
        string data;
    };
    typedef vector<snapshot_info_t>       entity_record_t;
    typedef map<uint64_t, entity_record_t>entity_snapshot_map_t;
public:
    event_store_mem_t();
    ~ event_store_mem_t();
    
    int save_event(uint64_t entity_id_, const event_i& event_);
    int snapshot_entity(const entity_i& entity_);
    int constuct_snapshot_last(entity_i& entity_, event_dispather_i& event_dispacher_, int version_ = 1);

private:
    entity_event_map_t    m_entity_events;
    entity_snapshot_map_t m_entity_snapshot;
};

下面时基于内存的eventStore的实现:

int event_store_mem_t::save_event(uint64_t entity_id_, const event_i& event_)
{
    m_entity_events[entity_id_].push_back(event_.encode());
    return 0;
}

int event_store_mem_t::snapshot_entity(const entity_i& entity_)
{
    snapshot_info_t info;
    info.event_version = m_entity_events[entity_.id()].size();
    info.data          = entity_.encode();
    m_entity_snapshot[entity_.id()].push_back(info);
    return 0;
}

int event_store_mem_t::constuct_snapshot_last(entity_i& entity_, event_dispather_i& event_dispacher_, int version_)
{
    int index = m_entity_snapshot.size() - version_;
    if (index >=0 && index < (int)m_entity_snapshot.size())
    {
        snapshot_info_t& info = m_entity_snapshot[entity_.id()][index];
        
        json_dom_t document;
        if (document.Parse<0>(info.data.c_str()).HasParseError())
        {
            throw msg_exception_t("json format not right");
        }
        if (false == document.IsObject() && false == document.Empty())
        {
            throw msg_exception_t("json must has one field");
        }

        entity_.decode(document.MemberBegin()->value);
        
        for (size_t i = info.event_version; i < m_entity_events[entity_.id()].size(); ++i)
        {
            event_dispacher_.dispath(m_entity_events[entity_.id()][i]);
        }
    }
    return 0;
}
  1. 恕不絮烦,详细实现代码参见google code

  svn co  https://ffown.googlecode.com/svn/trunk/example/event_store

8. 待改进之处