码农技术炒股之路——数据库管理器、正则表达式管理器

时间:2022-06-17
本文章向大家介绍码农技术炒股之路——数据库管理器、正则表达式管理器,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

        我选用的数据库是Mysql。选用它是因为其可以满足我的需求,而且资料多。因为作为第三方工具,难免有一些配置问题。所以本文也会讲解一些和Mysql配置及开发相关的问题。(转载请指明出于breaksoftware的csdn博客)

数据库管理器

        Mysql的安装我就不说了。我先说说和我习惯相关的一个问题:我希望在我Windows系统上可以通过Navicat for Mysql连接到我Ubuntu上的Mysql服务器。这块问题的解决可以参见《允许ubuntu下mysql远程连接》

        然后需要准备Python下进行Mysql开发的一些环境

apt-get install python-dev
apt-get install libmysqld-dev
apt-get install libmysqlclient-dev
updatedb
locate mysql_config
pip install MySQL-python -i http://pypi.douban.com/simple

        由于我们要进行分表,所以数据库连接数要进行增大。于是需要修改mysql的配置

max_connections=1000

        基础环境配置好后,我们就可以开始进行数据库管理器的设计和实现了。

数据库连接类

        数据库连接我们使用PooledDB连接池,使用这个库的最大好处是我们可以不用考虑很多底层的重连和多线程问题。

from DBUtils.PooledDB import PooledDB
class mysql_conn():
    def __init__(self, host_name, port_num, user_name, password, db_name, charset_name = "utf8"):
        self._host = host_name
        self._port = int(port_num)
        self._user = user_name
        self._passwd = password
        self._db = db_name
        self._charset = charset_name
        self._pool = None
        self._table_info = {}
        self.re_connect()

        re_connect方法要考虑数据库不存在的情况。

    def re_connect(self):
        self._try_close_connect()
        
        try:
            self._pool = PooledDB(creator=MySQLdb, mincached=1, maxcached=20, maxconnections = 3, host = self._host, port = self._port, user = self._user, passwd = self._passwd, db = self._db, charset = self._charset)
            LOG_INFO("connect %s success" %(self._db))
            self.refresh_tables_info()
            return
        except MySQLdb.Error, e :
            if e.args[0] == 1049:
                self._create_db()
            else:
                LOG_WARNING("%s connect error %s" % (self._db, str(e)))
                return
        except Exception as e:
            LOG_WARNING("connect mysql %s:%d %s error" % (self._host, self._port, self._db))
            return 

        如果数据库不存在,MySQLdb.Error对象的值是1049,这种场景我们就需要创建数据库。如果发生其他错误,就直接报错

    def _create_db(self):
        conn = None
        cursor = None
        try:
            conn = MySQLdb.connect(host=self._host, port=self._port, user=self._user, passwd=self._passwd)
            cursor = conn.cursor()
            sql = """create database if not exists %s""" %(self._db)
            #LOG_INFO(sql)
            cursor.execute(sql)
            conn.select_db(self._db);
            conn.commit()
        except MySQLdb.Error, e :
            LOG_WARNING("%s execute error %s" % (sql, str(e)))
        finally:
            try:
                if cursor:
                    cursor.close()
                if conn:
                    conn.close()
            except:
                pass

        创建完数据后,要关闭连接。然后再走一遍数据库连接过程,但是这次就用不判断数据库是否存在了

        try:
            self._pool = PooledDB(creator=MySQLdb, mincached=1, maxcached=20, maxconnections = 3, host = self._host, port = self._port, user = self._user, passwd = self._passwd, db = self._db, charset = self._charset)
            LOG_INFO("connect %s success" %(self._db))
            self.refresh_tables_info()
            return
        except Exception as e:
            LOG_WARNING("connect mysql %s:%d %s error" % (self._host, self._port, self._db))
            return
        
        if None == self._pool:
            LOG_WARNING("connect mysql %s:%d %s error" % (self._host, self._port, self._db))
            return

        连接完数据库后,我们需要通过refresh_tables_info获取该库中表的信息。为什么我们需要获取这个信息呢?因为我希望在调用数据库操作时,mysql_conn类已经知晓一些字段的类型和长度,这样就可以将用户传入的数据进行相应的格式化,而从让调用者不用太多关心表字段类型。

    def refresh_tables_info(self):
        self._table_info = self._get_tables_info()

    def _get_tables_info(self):
        tables_info = {}
        tables_sql = "show tables"
        tables_name = self.execute(tables_sql, select = True)
        for table_name_item in tables_name:
            table_name = table_name_item[0]
            if 0 == len(table_name):
                continue
            columns_sql = "show columns from " + table_name 
            table_info = self.execute(columns_sql, select = True)
            table_name = table_name_item[0]
            columns_info = self._get_table_info(table_info)
            if len(columns_info):
                tables_info[table_name] = columns_info
        return tables_info

    def _get_table_info(self, table_info):
        columns_info = {}
        for item in table_info:
            column_name = item[0]
            column_type_info = item[1]
            (type, len) = self._get_column_type_info(column_type_info)
            columns_info[column_name] = {"type":type, "length":len}
        return columns_info

    def _get_column_type_info(self, type_info):
        re_str = '(w*)((d*),?.*)'
        kw = re.findall(re_str, type_info)
        if len(kw):
            if len(kw[0]) > 1:
                return (kw[0][0], kw[0][1])
        return (None, None)

        连接完数据库后,我们需要对表进行一系列操作,比如表查询

    def select(self, table_name, fields_array, conditions, pre = "", extend = ""):
        fields_str = "," . join(fields_array)
        conds = []
        for (column_name, column_data_info) in conditions.items():
            column_type = self._get_column_type(table_name, column_name)
            column_data = column_data_info[0]
            operation = column_data_info[1]
            if isinstance(column_data, list):
                new_datas = []
                for item in column_data:
                    new_data = self._conv_data(item, column_type)
                    try:
                        new_datas.append(new_data)
                    except:
                        LOG_WARNING("%s %s conv error" %(item, column_type))
                temp_str = "," . join(new_datas)
                cond = column_name + " " + operation  + " (" + temp_str + ")"
                conds.append(cond)
            else:
                new_data = self._conv_data(column_data, column_type)
                try:
                    cond = column_name + " " + operation + " " + new_data
                    conds.append(cond)
                except:
                    LOG_WARNING("%s %s conv error" %(column_data, column_type))
        conds_str = " and " . join(conds)

        sql = "select " + pre + " " + fields_str + " from " + table_name
        if len(conds_str) > 0:
            sql = sql + " where " + conds_str
        
        if len(extend) > 0:
            sql = sql + " " + extend
        
        data_info = self.execute(sql, select = True)
        return data_info

        select方法中table_name是表名;fields_array是需要查询的字段数组;conditions是查询条件的Key/Value对,其中Key是字段名称,Value是个数组,数组的第一个元素是表达式右值,第二个元素是表达式的操作符。比如条件a>1 and b < 2,则conditions是{"a":["1",">"],"b":["2","<"] }。这儿需要考虑表达式右值是一个数组的场景,比如 a in (1,2,3)这样的条件,所以组装条件时做了特殊处理。

        在处理表中数据的时候,比如查询语句的条件中有表中字段信息,再比如更新、插入数据语句中也有相关信息,这个时候都需要调用_get_column_type方法获取字段类型,然后通过_conv_data方法将数据进行格式化——当然目前这个函数还不能涵盖所有类型。

    def _get_column_type(self, table_name, column_name):
        if 0 == len(self._table_info):
            self.refresh_tables_info()
        if table_name not in self._table_info.keys():
            LOG_WARNING("table_%s info in not exist" %(table_name))
            return "None"
        if column_name not in self._table_info[table_name].keys():
            LOG_WARNING("column name %s is not in table %s" % (column_name, table_name))
            return "None"
        return self._table_info[table_name][column_name]["type"]
    
    def _conv_data(self, data, type):
        if type == "varchar" or type == "char":
            return '"%s"' % (data)
        elif type == "float" or type == "double":
            try:
                conv_data = float(data)
                return "%.8f"  % (conv_data)
            except Exception as e:
                LOG_WARNING("conv %s to %s error" % (data, type))
                return "0"
        elif type == "tinyint" or type == "bigint" or type == "int":
            return "%d" % (int(data))

        数据的更新操作和插入操作我就不把代码贴出来了。大家可以到之后公布的源码地址里看。

        最后说明下操作执行的方法

    def execute(self, sql, select = False, commit=False):
        try:
            data = ()
            conn = self._pool.connection()
            cursor = conn.cursor()
            data = cursor.execute(sql)
            if select:
                data = cursor.fetchall()
            if commit:
                conn.commit()
            cursor.close()
        except Exception as e:
            LOG_WARNING("excute sql error %s" % (str(e)))
            LOG_ERROR_SQL("%s" % (sql))
        finally:
            cursor.close()
            conn.close()

        return data

        一些操作我们需要数据库服务马上去执行,如创建数据库和创建表操作,因为我们在创建后立即会去使用或者查询相关信息。如果不及时执行,将影响之后的结果。这个场景下我们需要把commit参数设置为True。当然这种方式不要滥用,否则会影响数据库执行效率。

        还有一些操作我们需要关心返回结果,比如select指令。这个时候就需要通过fetchall获取全部数据并返回。而创建表等操作则不需要fetchall结果。

连接管理类   

        因为我们数据库是分库的,而上述每个连接只管理一个数据库的操作,所以我们需要一个连接管理器去管理这些连接。

        连接管理类是个单例,它通过modify_conns方法连接每个数据库

@singleton
class mysql_manager():
    def __init__(self):
        self._conns = {}

    def modify_conns(self, conns_info):
        for (conn_name, conn_info) in conns_info.items():
            conn_info_hash = frame_tools.hash(json.dumps(conn_info))
            if conn_name in self._conns.keys():
                if conn_info_hash in self._conns[conn_name].conns_dict.keys():
                    continue
            else:
                self._conns[conn_name] = mysql_conn_info()

            for key in conf_keys.mysql_conn_keys:
                if key not in conn_info.keys():
                    continue
            conn_obj = mysql_conn(conn_info["host"], conn_info["port"], conn_info["user"], conn_info["passwd"], conn_info["db"], conn_info["charset"])
            self._conns[conn_name].conns_dict[conn_info_hash] = conn_obj
            self._conns[conn_name].valid = 1
        self._print_conns()

        如果数据库连接信息发生改变,则需要将发生改变的数据库连接置为无效,然后再重新连接并记录

    def add_conns(self, conns_info):
        self.modify_conns(conns_info)

    def remove_conns(self, conns_info):
        for (conn_name, conn_info) in conns_info.items():
            conn_info_hash = frame_tools.hash(json.dumps(conn_info))
            if conn_name in  self._conns.keys():
                if conn_info_hash in self._conns[conn_name].conns_dict.keys():
                    self._conns[conn_name].valid = 0
        self._print_conns()

        连接管理类通过get_mysql_conn方法暴露连接对象

    def get_mysql_conn(self, conn_name):
        if conn_name not in self._conns.keys():
            return None
        conn_info = self._conns[conn_name]
        valid = self._conns[conn_name].valid
        if 0 == valid:
            return None
        conns_dict_keys = self._conns[conn_name].conns_dict.keys()
        if len(conns_dict_keys) == 0:
            return None
        key = conns_dict_keys[-1]
        ret_conn = self._conns[conn_name].conns_dict[key]
        return ret_conn

        它还暴露了一个刷新所有数据库中表信息的方法,用于在系统任务中定期更新内存中信息,保证数据稳定写入。

    def refresh_all_conns_tables_info(self):
        for (conn_name, conn_info) in self._conns.items():
            conn = self.get_mysql_conn(conn_name)
            if None != conn:
                conn.refresh_tables_info()

连接管理配置

        我共设计了三种数据库。一种是保存股票基础数据的数据库,其配置是

[stock_db]
host=127.0.0.1
port=3306
user=root
passwd=fangliang
db=stock
charset=utf8

        一个是保存每日实时数据的数据库

[daily_temp]
host=127.0.0.1
port=3306
user=root
passwd=fangliang
db=daily_temp
charset=utf8

        最后一种是按股票代码分类的库,这种库有300个,设计原因我在《码农技术炒股之路——架构和设计》有说明

[stock_part]
host=127.0.0.1
port=3306
user=root
passwd=fangliang
db=stock_part
charset=utf8
range_max=300

        注意range_max这个参数,如果配置中有该参数,则代表其是一个数据库组

class mysql_conf_parser:

    def parse(self, job_conf_path):

        cp = ConfigParser.SafeConfigParser()
        cp.read(job_conf_path)
        sections = cp.sections()
        conns_info = {}
        for section in sections:
            conn_info = {}
            for key in conf_keys.mysql_conn_keys:
                if False == cp.has_option(section, key):
                    LOG_WARNING()
                    continue
                conn_info[key] = cp.get(section, key)
            if cp.has_option(section, "range_max"):
                range_max = int(cp.get(section, "range_max"))
                db_name_base = conn_info["db"] 
                for index in range(0, range_max):
                    conn_info["db"] = db_name_base + "_" + str(index)
                    section_index_name = section + "_" + str(index)
                    conns_info[section_index_name] = copy.deepcopy(conn_info)
            else:
                conns_info[section] = conn_info
        return conns_info

        最终我们将建成下图所示数据库

正则表达式管理器

        当我们从数据源获取数据后,需要使用一系列正则将原始数据转换成一组数据。然后才可以将这些数据写入数据库。举个例子,我们先看下正则管理器的配置

[string_comma_regular]
regular_expression_0 = data:[(.*)]
regular_expression_1 = "[^"]+"
regular_expression_2 = [^,"]+

[hq_sinajs_cn_list]
regular_expression_0 = var hq_str_([^;]*);
regular_expression_1 = ([^,="shz]+)

[quotes_money_163]
regular_expression_0 = ([^rn]+)
regular_expression_1 = ([^,'rn]+)

        每一节都是一个正则名称,其下都是以“regular_expression_”开始的关键字。正则执行的顺序从序号小的向序号大的方向执行。我们通过下面的配置解释器读取配置

import ConfigParser

class regulars_manager_conf_parser:
    
    def parse(self, regulars_conf_path):
        cp = ConfigParser.SafeConfigParser()
        cp.read(regulars_conf_path)
        sections = cp.sections()
        regulars_info = {}
        for section in sections:
            regular_info = []
            regular_name_pre = "regular_expression_"
            for index in range(0, len(cp.options(section))):
                regular_name = regular_name_pre + str(index)
                if cp.has_option(section, regular_name):
                    regular_info.append(cp.get(section, regular_name))
                else:
                    break
            regulars_info[section] = regular_info
        return regulars_info

        正则表达式管理通过下面方法维护信息

@singleton
class regular_split_manager():
    def __init__(self):
        self._regulars = {}

    def modify_regulars(self, regulars_info):
        for (regular_name, regular_info) in regulars_info.items():
            self._regulars[regular_name] = regulars_info

    def add_regulars(self, regulars_info):
        for (regular_name, regular_info) in regulars_info.items():
            self._regulars[regular_name] = regular_info
    
    def remove_regulars(self, regulars_info):
        for (regular_name, regular_info) in regulars_info.items():
            if regular_name in self._regulars.keys():
                del self._regulars[regular_name]

        通过get_split_data方法可以将数据通过指定的正则名称进行分解,且分解到最后一步

    def get_split_data(self, data, regular_name):
        data_array = []
        self._recursion_regular(data, regular_name, 0, data_array)   
        return data_array

    def _get_regular(self, regular_name, deep):
        if regular_name not in self._regulars.keys():
            LOG_WARNING("regular manager has no %s" % (regular_name))
            return ""
        if deep >= len(self._regulars[regular_name]):
            return ""
        return self._regulars[regular_name][deep]

    def _recursion_regular(self, data, regular_name, deep, data_array):
        regular_str = self._get_regular(regular_name, deep)
        split_data = re.findall(regular_str, data)
        regualer_next_str = self._get_regular(regular_name, deep + 1)
        split_array = []
        if len(regualer_next_str) > 0:
            for item in split_data:
                self._recursion_regular(item, regular_name, deep + 1, data_array)
        else:
            for item in split_data:
                split_array.append(item)
            if len(split_array) > 0:
                data_array.append(split_array)

        有了上述各种管理器,我们已经把主要的准备工作做好。下一篇我将介绍最核心的任务调取管理器,它才是上述管理器最终的使用方。