基于SCF实现批量备份Elasticsearch索引到COS

时间:2022-07-22
本文章向大家介绍基于SCF实现批量备份Elasticsearch索引到COS,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

在采用ELK架构的日志应用场景中,通常我们会按天或者按小时创建索引,从而避免单个索引的数据量太大。如果我们需要对过去一段时间的老索引进行冷备份,比如通过快照备份到S3或者腾讯云的对象存储服务COS中,然后降低索引的副本数量或者直接从ES删除索引,可以采取的方式有多种。一种是手动调用ES的API,一次性备份所有的老索引到COS中,但是如果数据量较大时只使用一个快照可能因为数据传输中断而导致快照执行失败;另外一种方式是自己编写脚本,通过crontab定时执行脚本对过去某一天或几天的索引打快照,执行成功后再对其它的索引打快照;本文尝试使用SCF(腾讯云无服务器云函数)对按小时新建的索引,持续批量的打快照到COS。

实施步骤

1. 创建COS仓库

我们把快照保存到腾讯云的COS对象存储中,首先需要调用ES的API创建一个COS repository:

PUT _snapshot/cos-repo
{
    "type": "cos",
    "settings": {
        "app_id": "xxxxxxx",
        "access_key_id": "xxxxxx",
        "access_key_secret": "xxxxxxx",
        "bucket": "xxxxxx",
        "region": "ap-guangzhou",
        "compress": true,
        "chunk_size": "500mb",
        "base_path": "/"
    }
}

更详细的基于COS备份和恢复ES数据的步骤可以参考使用 COS 进行备份及恢复.

2. 创建SCF云函数

如图,基于名为"ES写入函数"的模板,创建一个新的函数:

点击"下一步"进入函数编辑界面,直接复制如下函数代码粘贴到编辑框,修改ES的vip和用户名密码,以及索引前缀名称等信息:

# -*- coding: utf8 -*-
import datetime
from elasticsearch import Elasticsearch

# ES集群的用户名密码信息
ESServer = Elasticsearch(["xxxx:9200"],http_auth=('elastic', 'xx'))
# 索引前缀
esPrefix = "my-index-"
# 从过去哪一天的索引开始打快照
beforeOfDay = 60

# COS仓库名称
cosRepository = "cos-repo"
# 快照名称前缀
cosSnapshotPrefix = "snapshot-"

# 临时索引名称,不需要修改
snapshotTempIndex = "temp-snapshot"
currentDate = ""

def check_or_create_snapshot():
    existed = ESServer.indices.exists(snapshotTempIndex)
    if existed == True:
        getTempDoc = ESServer.get(snapshotTempIndex, doc_type="_doc", id="1")
        print getTempDoc
        currentDate = getTempDoc["_source"]["currentDate"]
        print "current date: " + currentDate
    else:
        today = datetime.datetime.now()
        offset = datetime.timedelta(days=-beforeOfDay)
        re_date = (today + offset).strftime('%Y.%m.%d')
        currentDate = re_date

    currentSnapshot = cosSnapshotPrefix + currentDate
    params = {}
    params["ignore_unavailable"] = "true"
    getResult = ESServer.snapshot.get(cosRepository, currentSnapshot, params = params)
    snapshots = getResult["snapshots"]
    if len(snapshots) != 0:
        if snapshots[0]["state"] == "SUCCESS":
            print currentSnapshot +" executed finished!"
            newDate = getNewDate(currentDate)
            createSnapshot(newDate)
        else:
            print currentSnapshot +" is running!"
    else:
        createSnapshot(currentDate)

def createSnapshot(currentDate):
    currentSnapshot = cosSnapshotPrefix + currentDate
    body = {}
    body["indices"] = esPrefix + currentDate + "-*"
    body["ignore_unavailable"] = "true"
    body["include_global_state"] = "false"
    createResult = ESServer.snapshot.create(cosRepository, currentSnapshot,body)
    if createResult["accepted"] == True:
        print "create [" + currentSnapshot + "] success!"
        indexTempDate(currentDate)
        return
    else:
        print "create [" + currentSnapshot +"] failed!" + str(createResult)


def getNewDate(currentDate):
    dateTime_p = datetime.datetime.strptime(currentDate,'%Y.%m.%d')
    offset = datetime.timedelta(days=+1)
    re_date = (dateTime_p + offset).strftime('%Y.%m.%d')
    print "new date: "+ re_date
    return re_date

def indexTempDate(currentDate):
    indexBody ={}
    indexBody["currentDate"] = currentDate
    headers= {}
    headers["Content-Type"] = "application/json"
    indexResult = ESServer.index(index=snapshotTempIndex, doc_type="_doc", body=indexBody,id="1")
    print "index temp date success!"

def main_handler(event,context):
    check_or_create_snapshot()

点击"完成"即可完成云函数的创建。

上述代码的主要逻辑为:

  1. 第一次执行函数时,根据指定的beforeOfDay,确定从哪一天开始打快照,如示例中从60天前的索引开始,创建一个名为snapshot-2020.xx.xx的快照,备份的索引为my-index-2020.xx.xx-*, 也即60天前的当天创建的所有索引;同时创建一个名为temp-snapshot的临时索引,记录下"2020.xx.xx"
  2. 之后每次执行函数时会先从temp-snapshot索引中获取到当前正在执行哪一天的快照,判断快照是否执行成功,如果执行成功则进行对下一天的索引打快照,同时更新temp-snapshot索引

3. 配置云函数

创建完云函数后,需要进行配置才能使用,如下图,可以配置函数的私有网络VPC和Subnet(选择和ES相同的VPC和Subnet):

4. 测试云函数

配置完云函数后,可以对函数代码进行测试,保证能够正常运行;如果需要进行编辑,可以直接编辑然后点击"保存并测试":

5. 配置触发器

配置触发器,每5分钟执行一次函数: