C++ FFLIB之ffcount:通用数据分析系统

时间:2022-05-03
本文章向大家介绍C++ FFLIB之ffcount:通用数据分析系统,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

摘要:

数据分析已经变得不可或缺,几乎每个公司都依赖数据分析进行决策。在我从事的网游领域,数据分析是策划新功能、优化游戏体验最重要的手段之一。网游领域的数据分析有如下特点(开发角度):

  • 数据量大;网游用户量大,用户行为多,存储数据量较大。
  • 实时性要求高;比如新上的游戏功能,玩家体验和反馈希望尽快的被分析出来。
  • 需求变化快。网游的需求变化日新月异,故要求数据分析系统能够快速的响应需求变化。

常见的数据分析系统

数据分析系统应该分为数据存储和数据分析,常见的数据分析架构有:

  • 直接在逻辑服务中定制数据分析;这种情况往往使用mysql或这mongodb作为数据存储,优点是定制化的数据存储更加节省空间,缺点是mysql和mongodb的数据存储服务器往往成本更高,并且若增加新需求,定制化需要的开发量极大,并且维护老的数据分析代码往往十分困难,因为是高度定制化的,往往会绑定在特殊的应用背景下。
  • 使用scribe做数据存储,使用hadoop分析数据。Facabook scribe server 可以利用hadoop分布式文件系统来存储大数据,电子商务或者sns网站往往使用这种可扩展的成熟的方案,缺点是部署和维护成本较高,中小型团队要建立hadoop集群无论从人力还是物力都相对困难。

确定需求:

  • 数据存储尽量简单和低成本,由于日志数据的读取效率要求并不高,所以使用普通机器一般磁盘存储即可,而不需要另外使用mysql及其他nosql等。
  • 数据分析尽量简单易开发,目前来讲,sql查询是最方便最基础的方式,所以数据应该是sql结构化的。
  • hadoop的部署对于中小团队仍然是望而生畏的,故要求数据分析系统部署要简单,配置容易。

ffcount 的架构

内部工作机制

时序图说明内部工作机制:

示例C++客户端代码:

#include "count/ffcount.h"
#include "rpc/broker_application.h"
#include "base/daemon_tool.h"
#include "base/arg_helper.h"

using namespace ff;
#include <stdio.h>

#define NUM 0
int main(int argc, char* argv[])
{
    arg_helper_t arg_helper(argc, argv);
    if (false == arg_helper.is_enable_option("-l"))
    {
        printf("usage: app -l tcp://127.0.0.1:10241n");
        return 1;
    }
    
    assert(0 == singleton_t<msg_bus_t>::instance().open(arg_helper.get_option_value("-l")) && "can't connnect to broker");
    
    assert(singleton_t<msg_bus_t>::instance().get_service_group("event_log_service") && "event_log_service group not exist");

    assert(singleton_t<msg_bus_t>::instance().get_service_group("event_log_service")->get_service(0) && "event_log_service 0 not exist");
    
    event_log_t el("test"/*dbname*/,"dumy"/*tablename*/, "A,B,C"/*fields name*/);el.def(100, "p"T'p", 5.4);
    singleton_t<msg_bus_t>::instance().get_service_group("event_log_service")->get_service(0)->async_call(el);
    for (int i = 0; i < NUM; ++i)
    {
        char buff[64];
        snprintf(buff, sizeof(buff), "dumy_%d", i%8);
        event_log_t el(buff, "A,B,C");el.def(100, "pp", 5.4);
        singleton_t<msg_bus_t>::instance().get_service_group("event_log_service")->get_service(0)->async_call(el);
    }
    
    event_queryt_t::in_t in_msg;
    in_msg.db_name = "test";
    in_msg.sql = "select * from dumy";
    
    struct lambda_t
    {
        static void callback(event_queryt_t::out_t& msg_)
        {
            printf("=====>>>>> callback dump data [%s]<<<<<<=======n", msg_.err_msg.c_str());
            ffdb_t::dump(msg_.ret_data, msg_.col_names);
            
            event_log_t el("test", "dumy", "A,B,C");el.def(100, "p"T'p", 5.4);
            singleton_t<msg_bus_t>::instance().get_service_group("event_log_service")->get_service(0)->async_call(el);
            for (int i = 0; i < NUM; ++i)
            {
                char buff[64];
                snprintf(buff, sizeof(buff), "dumy_%d", i%8);
                event_log_t el(buff, "A,B,C");el.def(100, "pp", 5.4);
                singleton_t<msg_bus_t>::instance().get_service_group("event_log_service")->get_service(0)->async_call(el);
            }
            sleep(1);
            
            event_queryt_t::in_t in_msg;
            //in_msg.str_time = "2013/2";//! 查询1月的数据
            in_msg.db_name = "test";
            in_msg.sql = "select * from dumy order by logtime desc limit 5";
            singleton_t<msg_bus_t>::instance().get_service_group("event_log_service")->get_service(0)->async_call(in_msg, &lambda_t::callback);
        }
    };
    
    singleton_t<msg_bus_t>::instance().get_service_group("event_log_service")->get_service(0)->async_call(in_msg, &lambda_t::callback);
    signal_helper_t::wait();
    singleton_t<msg_bus_t>::instance().close();

    return 0;
}

示例php客户端

<?php

function ffcount_query($host, $port, $str_time, $db_name, $sql)
{
    //以下为引用的内容:

    // 1. 初始化
    $ch = curl_init();
    // 2. 设置选项,包括URL
    $url = "http://".$host.":".$port."/".$str_time."/".$db_name."/".rawurlencode($sql);
    //echo $url."n";
    curl_setopt($ch, CURLOPT_URL, $url);
    curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
    curl_setopt($ch, CURLOPT_HEADER, 0);
    // 3. 执行并获取HTML文档内容
    $output = curl_exec($ch);
    // 4. 释放curl句柄
    curl_close($ch);
    if ($output === FALSE) {
        //echo "cURL Error: " . curl_error($ch);
        $ret = array("err_msg" =>"http request failed by curl", "col_names"=>array(), "ret_data"=>array());
    }
    else
    {
        $ret = json_decode($output);
        if (!$ret)
        {
            $ret = array("err_msg" =>$output, "col_names"=>array(), "ret_data"=>array());
        }
    }
    return $ret;
}

$host = "127.0.0.1";
$port = 8080;
$str_time = "2013/2";
$db_name = "test";
$sql = "select * from dumy";
$ret = ffcount_query($host, $port, $str_time, $db_name, $sql);

print_r($ret);
?>

示例C++ server启动:

./app_count -l tcp://127.0.0.1:10241 -http tcp://127.0.0.1:8080

总结:

  • ffcount 根本上提供的是数据日志存储
  • ffcount 使用sql来组织日志文件,从而拥有了sql数据分析能力
  • ffcount 数据文件按照每月归档
  • ffcount 自动创建表和字段,默认创建autoid和logtime两字段,前者为自增主键,后者为timestamp类型,默认为当前时间
  • ffcount 支持http查询,数据存储接口已经有C++ 类库接口

build server:

git clone https://github.com/fanchy/fflib

cd fflib/example/book/count && make && ./app_count -l tcp://127.0.0.1:10241 -http tcp://127.0.0.1:8080

build client:

cd fflib/example/book/count_client && make && ./app_client -l tcp://127.0.0.1:10241

php client:

cd fflib/example/book/count_client/php && php test.php