资源控制在大数据和云计算平台中的应用

时间:2022-05-06
本文章向大家介绍资源控制在大数据和云计算平台中的应用,主要内容包括简介、cgroup简介、JobObjects简介、Docker容器资源控制、云计算中Docker容器的资源收集、YARN容器管理、结束语、基本概念、基础应用、原理机制和需要注意的事项等,并结合实例形式分析了其使用技巧,希望通过本文能帮助到大家理解应用这部分内容。

简介

在大数据迅速发展的今天,很大一部分支持来自于底层技术的不断发展,其中非常重要的一点就是系统资源的管理和控制,大数据平台的核心就是对资源的调度管理,在调度和管理之后如何对这些资源进行控制便成了另一个重要的问题。大数据系统中用户成千上万的作业进程跑在集群中,如果不能对这些进程的资源进行控制,那么大数据平台将变得举步维艰,整个集群便会随时崩溃。同时,大数据作业的调度也是基于资源的配额进行分配,大数据的作业本身就承载了资源配额的属性,但是这些作业是否按照配额进行运行和计算,是否超过了指定的配额导致overuse,是否达不到指定的配额导致资源浪费,这一直以来都是大数据平台面对和要解决的问题。

本文针对大数据平台中资源控制这个层面来详细介绍资源控制在不同操作系统上的具体技术实现,以及大数据平台和资源控制的集成。

资源控制使用的系统功能

平台

功能

简述

Linux

control groups

control groups 简称为cgroup,是Linux内核的一部分,cgroup可以为一组进程定义组群分配资源,这个组群分配资源可以包含CPU时间,内存,网络带宽,并且定义的这些资源分配可以动态修改。

Windows

Job Objects

Job Objects 是Windows内核的一部分,简称“作业对象”,作业对象将一组进程作为一个单元进行管理。作业对象可以命名,是安全的、可共享的对象,对作业对象执行的操作会影响与作业对象关联的所有进程。可以对作业对象设置CPU时间,内存,磁盘访问等限制。

cgroup简介

cgroup是Linux内核的一部分,cgroup可以为一组进程定义组群分配资源,这个组群分配资源可以包含CPU时间,内存,网络带宽,并且定义的这些资源分配可以动态修改。cgroup以一种层级结构(hierarchical)聚合和管理进程,将所有任务进程以文件夹的形式组成一个控制族群树,子控制组自动继承父节点的特定属性,子控制组还可以有自己特定的属性。

cgroup提供一些subsystem作为控制族群树的根节点,所有的任务进程都以这些子系统为入口按树状结构设置资源配额。Red Hat Linux 7.3 提供 12 个 cgroup 子系统,根据名称和功能列出如下。

cgroup各子系统功能

blkio

为I/O设备设定输入/输出限制,比如物理设备(磁盘,固态硬盘,USB 等等)

cpu

提供对CPU使用额度的控制

cpuacct

自动生成cgroup中任务所使用CPU的报告

cpuset

为 cgroup 中的任务分配独立 CPU(在多核系统)内核

devices

允许或者拒绝 cgroup 中的任务访问设备

freezer

挂起或者恢复 cgroup 中的任务

memory

设定 cgroup 中任务使用的内存限制,包括物理内存和虚拟内存,并自动生成由那些任务使用的内存资源报告

net_cls

使用等级识别符(classid)标记网络数据包,可允许 Linux 流量控制程序(tc)识别从具体 cgroup 中生成的数据包

ns

名称空间子系统

perf_event

增加了对每group的监测跟踪的能力,即可以监测属于某个特定的group的所有线程以及运行在特定CPU上的线程

hugetlb

允许限制cgroup 的HubeTLB使用

pids

实现对某个控制组中进程和线程的总数进行限制

可通过以下命令查看操作系统支持的cgroup子系统,同时显示各个子系统挂载的根目录(也可以查看系统文件/proc/mount或者使用命令lssubsys -a):

[root@docker ~]# mount | grep cgroup
tmpfs on /sys/fs/cgroup type tmpfs (ro,nosuid,nodev,noexec,mode=755)
cgroup on /sys/fs/cgroup/systemd type cgroup (rw,nosuid,nodev,noexec,relatime,xattr,release_agent=/usr/lib/systemd/systemd-cgroups-agent,name=systemd)
cgroup on /sys/fs/cgroup/cpu,cpuacct type cgroup (rw,nosuid,nodev,noexec,relatime,cpuacct,cpu)
cgroup on /sys/fs/cgroup/blkio type cgroup (rw,nosuid,nodev,noexec,relatime,blkio)
cgroup on /sys/fs/cgroup/hugetlb type cgroup (rw,nosuid,nodev,noexec,relatime,hugetlb)
cgroup on /sys/fs/cgroup/perf_event type cgroup (rw,nosuid,nodev,noexec,relatime,perf_event)
cgroup on /sys/fs/cgroup/pids type cgroup (rw,nosuid,nodev,noexec,relatime,pids)
cgroup on /sys/fs/cgroup/cpuset type cgroup (rw,nosuid,nodev,noexec,relatime,cpuset)
cgroup on /sys/fs/cgroup/net_cls,net_prio type cgroup (rw,nosuid,nodev,noexec,relatime,net_prio,net_cls)
cgroup on /sys/fs/cgroup/memory type cgroup (rw,nosuid,nodev,noexec,relatime,memory)
cgroup on /sys/fs/cgroup/freezer type cgroup (rw,nosuid,nodev,noexec,relatime,freezer)
cgroup on /sys/fs/cgroup/devices type cgroup (rw,nosuid,nodev,noexec,relatime,devices)

cgroup的操作没有提供系统API调用或者命令行,而是直接访问cgroup mount的这个文件系统,举个例子描述下cgroup接口的使用方式。

1. 创建一个目录用于指定需要控制的作业进程,创建之后系统在会每一级自动生成所有的配置文件,可以将该目录认为是一个资源控制组。

[root@docker sym]# mkdir -p /sys/fs/cgroup/memory/ibm/symphony
[root@docker sym]# ls /sys/fs/cgroup/memory/ibm/symphony
cgroup.clone_children  memory.kmem.failcnt             memory.kmem.tcp.limit_in_bytes      memory.max_usage_in_bytes        memory.move_charge_at_immigrate  memory.stat            tasks
cgroup.event_control   memory.kmem.limit_in_bytes      memory.kmem.tcp.max_usage_in_bytes  memory.memsw.failcnt             memory.numa_stat                 memory.swappiness
cgroup.procs           memory.kmem.max_usage_in_bytes  memory.kmem.tcp.usage_in_bytes      memory.memsw.limit_in_bytes      memory.oom_control               memory.usage_in_bytes
memory.failcnt         memory.kmem.slabinfo            memory.kmem.usage_in_bytes          memory.memsw.max_usage_in_bytes  memory.pressure_level            memory.use_hierarchy
memory.force_empty     memory.kmem.tcp.failcnt         memory.limit_in_bytes               memory.memsw.usage_in_bytes      memory.soft_limit_in_bytes       notify_on_release

2. 添加需要的进程到该资源控制组,可以添加多个进程ID

[root@docker sym]# echo 23790 >> /sys/fs/cgroup/memory/ibm/symphony/tasks

3. 设置该资源控制组的物理内存使用配额

[root@docker sym]# echo 1024M > /sys/fs/cgroup/memory/ibm/symphony/memory.limit_in_bytes 
[root@docker sym]# cat /sys/fs/cgroup/memory/ibm/symphony/memory.limit_in_bytes
1073741824

如果不进行设置,默认情况下,继承根目录的内存配置,即系统内存。

[root@docker sym]# cat /sys/fs/cgroup/memory/memory.limit_in_bytes 
9223372036854771712

下面介绍下大数据系统中常用的配额设置。

内存:物理内存的设置文件为memory子系统下的memory.limit_in_bytes,虚拟内存为memory.memsw.limit_in_bytes。如果同时设置这两个参数,需要先设置memory.limit_in_bytes,因为虚拟内存的配额只有在物理内存用完后开始生效。在Linux系统上,只有当物理内存消耗完后才开始消耗虚拟内存,超过配额后再申请的话就会触发OOM kill掉进程。注意:OOM killer也可以关闭,需要向memory.oom_control中写入1,这样当进程尝试申请的内存超过允许,那么它就会被暂停,直到额外的内存被释放。

CPU:对CPU的配额控制是通过CPU子系统下的cpu.cfs_period_us和cpu.cfs_quota_us两个参数控制。cpu.cfs_period_us表示重新分配CPU时间的周期,默认为 100000,即百毫秒。cpu.cfs_quota_us就是在这期间内可使用的 cpu 时间,默认 -1,即无限制。所以默认情况下CPU的使用为100%。如果需要将CPU的使用设置为50%,可以将 cpu.cfs_quota_us设为 50000,cpu.cfs_period_us保持100000,表示每隔100毫秒分配CPU时间,持续使用50毫秒。对CPU的限制不像内存,超过配额后再申请的话就会触发OOM kill掉进程,CPU设置配额后进程不会超过该配额的使用。

JobObjects简介

Windows平台也有对应的内核对象用来控制作业对系统资源的访问,而且控制的范围比Linux广,包括剪切板,关闭Windows的权限,窗口权限等。不同于Linux,Windows通过系统API来实现对作业对象的访问。

Windows JobObjects支持的列表

限制类型

结构体

描述

基本限制

JOBOBJECT_BASIC_LIMIT_INFORMATION

CPU使用时间,物理内存,优先级等。

基本UI限制

JOBOBJECT_BASIC_UI_RESTRICTIONS

对作业中的进程UI基本限制(如指定桌面,限制调用ExitWindows函数,限制剪切板读写操作等)

CPU使用率限制

JOBOBJECT_CPU_RATE_CONTROL_INFORMATION

设置CPU使用率和权重

扩展基本限制

JOBOBJECT_EXTENDED_LIMIT_INFORMATION

增加了虚拟内存和Peak内存的使用限制

通知限制

JOBOBJECT_NOTIFICATION_LIMIT_INFORMATION

作业对象可以关联一个IO完成端口,当使用量超过通知Limit后,系统向该IO端口发送通知

Windows上使用内核作业对象的流程大概如下:

1. 创建内核作业对象:调用::CreateJobObject()创建一个内核对象,刚创建的对象没有和进程关联。

2. 把限制属性设置到作业对象:调用:: SetInformationJobObject()可以设置如上列表中的限制属性到该作业对象。

3. 将进程加入到作业对象:调用:: AssignProcessToJobObject()将进程加入到作业中,如果该进程产生子进程,那么该子进程会自动成为作业的一部分。

4. 关闭作业对象:调用:: CloseHandle()关闭作业对象的句柄。

需要注意以下几点:

1. 一个进程属于一个作业对象之后,不能再assign给另一个作业对象。

2. 在Windows开启UAC的系统中,没有提示权限的进程会被加入到一个默认的兼容性系统作业对象中,所以必须使用CREATE_BREAKAWAY_FROM_JOB参数创建进程使该进程脱离默认的作业对象。

3. 新启动的进程最好使用CREATE_SUSPEND参数这样可以在进程启动之前加入到作业对象中,防止起启动的新的子进程逃离作业对象。

Windows对于内存的管理与Linux不同,Windows上的物理内存指的是WorkingSet,虚拟内存指的是committed memory,在Windows任务管理器中看的话物理内存指的是“工作设置(内存)”,虚拟内存指的是“提交大小”。CPU通过CpuRate设置,CpuRate的含义是线程在每10000个处理器调度周期内被调度的周期数,比如需要限制到20%,就设置CpuRate为2000。

下面直接以C++代码为例来说明如果创建和管理作业对象,同时包含如何与ACE进程对象如何集成。

#include "stdafx.h"
#include <stdio.h>
#include <iostream>
#include <ace/ACE.h>  
#include <ace/Init_ACE.h>  
#include <ace/Process.h> 
#include <windows.h>
using namespace std;
HANDLE getJobObj(const HANDLE& processHandle, const HANDLE& threadHandle)
{
    auto jobName = _T("MyJob");
    HANDLE jobHandle = NULL;
    bool success = false;
    //////////////////////////////////////////////////////////////////////////
    // If process is already in Job, we use OpenJobObject instead of CreateJobObject
    //////////////////////////////////////////////////////////////////////////
    BOOL injob = FALSE;
    IsProcessInJob(processHandle, NULL, &injob);
    if (injob)
    {
        // 如果已经属于一个内核对象,直接打开
        jobHandle = OpenJobObject(JOB_OBJECT_ALL_ACCESS, FALSE, jobName);
    }
    else
    {
        //创建一个新的作业内核对象
        jobHandle = CreateJobObject(NULL, jobName);
    }
     //
    //////////////////////////////////////////////////////////////////////////
    // 关联完成端口
    //////////////////////////////////////////////////////////////////////////
    auto hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
    JOBOBJECT_ASSOCIATE_COMPLETION_PORT joacp;
    joacp.CompletionKey = (PVOID)1; // Any value to uniquely identify this job
    joacp.CompletionPort = hIOCP; // Handle of completion port that
    // receives notifications
    SetInformationJobObject(jobHandle,
        JobObjectAssociateCompletionPortInformation,
        &joacp,
        sizeof(joacp));
    //////////////////////////////////////////////////////////////////////////
    //为作业添加一些基本限制
    //////////////////////////////////////////////////////////////////////////
    JOBOBJECT_EXTENDED_LIMIT_INFORMATION extendLimit = { 0 };
    extendLimit.BasicLimitInformation.LimitFlags = 0;
    // 作业物理内存 500M
    extendLimit.BasicLimitInformation.LimitFlags |= JOB_OBJECT_LIMIT_WORKINGSET;
    extendLimit.BasicLimitInformation.MaximumWorkingSetSize = 1024 * 1024 * 500;
    extendLimit.BasicLimitInformation.MinimumWorkingSetSize = 64;
    std::cout << "Set Physical MEM = " << extendLimit.BasicLimitInformation.MaximumWorkingSetSize / 1024 / 1024 << " M" << std::endl;
    // 作业虚拟内存 700M
    extendLimit.BasicLimitInformation.LimitFlags |= JOB_OBJECT_LIMIT_JOB_MEMORY;
    extendLimit.JobMemoryLimit = 1024 * 1024 * 700;
    std::cout << "Set Virtual MEM = " << extendLimit.JobMemoryLimit/1024/1024 << " M"<< std::endl;
    //设定作业限制
    success = SetInformationJobObject(jobHandle, JobObjectExtendedLimitInformation, &extendLimit, sizeof(extendLimit));
    if (!success)
    {
        std::cout << "Set JobObjectExtendedLimitInformation Failed :" << GetLastError() << std::endl;
    }
    // CPU 使用率
    JOBOBJECT_CPU_RATE_CONTROL_INFORMATION cpuRate = { 0 };
    cpuRate.ControlFlags = 0;
    cpuRate.ControlFlags |= JOB_OBJECT_CPU_RATE_CONTROL_ENABLE;
    cpuRate.ControlFlags |= JOB_OBJECT_CPU_RATE_CONTROL_HARD_CAP;
    cpuRate.CpuRate = 100 * 20; // 20%
    success = SetInformationJobObject(jobHandle, JobObjectCpuRateControlInformation, &cpuRate, sizeof(cpuRate));
    if (!success)
    {
        std::cout << "Set JobObjectCpuRateControlInformation Failed :" << GetLastError() << std::endl;
    }
    //将进程添加到作业
    success = AssignProcessToJobObject(jobHandle, processHandle);
    if (!success)
    {
        std::cout << "AssignProcessToJobObject Failed :" << GetLastError() << std::endl;
    }
    //唤醒进程(的主线程)
    ResumeThread(threadHandle);
    //////////////////////////////////////////////////////////////////////////
    // Check IO Completion
    //////////////////////////////////////////////////////////////////////////
    DWORD dwReasonID = 0; //事件ID,参数lpNumberOfBytes
    DWORD dwProcessID = 0;
    OVERLAPPED* lpOverlapped = NULL;
    DWORD dwMilliseconds = 1000 * 3;// INFINITE
    if (!GetQueuedCompletionStatus(hIOCP, &dwReasonID,
        (PULONG_PTR)&jobHandle, &lpOverlapped, dwMilliseconds))
    {
        printf(("IOCPThread:GetQueueCompletionStatus调用失败,错误代码:0x%08xn"),
            GetLastError());
    }
    else
    {
        //checkJobEvent(dwReasonID);
    }
    Sleep(1000 * 10);
    //////////////////////////////////////////////////////////////////////////
    // 关闭IO完成端口
    //////////////////////////////////////////////////////////////////////////
    CloseHandle(hIOCP);
    return jobHandle;
}
class MyProcess : public ACE_Process
{
public:
    MyProcess::MyProcess() : m_jobHandle(NULL){}
    virtual MyProcess::~MyProcess() { if (m_jobHandle != NULL) CloseHandle(m_jobHandle); }
    // return process thread handle
    HANDLE MyProcess::getThreadHandle() { return process_info_.hThread; }
    // Save windows job object handle for close when process gone.
    void MyProcess::jobHandle(HANDLE& handle) { m_jobHandle = handle; }
    // For some other reason, we need detach job handle
    virtual void MyProcess::detach() { m_jobHandle = NULL; }
    // Close job object when terminating.
    virtual void MyProcess::terminate()
    {
        ACE_Process::terminate();
        if (m_jobHandle != NULL)
        {
            CloseHandle(m_jobHandle);
            m_jobHandle = NULL;
        }
    }
private:
    HANDLE m_jobHandle;
};
int _tmain(int argc, _TCHAR* argv[])
{
    ACE::init();
    MyProcess p;
    ACE_Process_Options option;
    option.command_line(_T("C:\Windows\System32\notepad.exe"));
    option.creation_flags(CREATE_SUSPENDED | CREATE_BREAKAWAY_FROM_JOB);
    auto pid = p.spawn(option);
    auto handle = getJobObj(p.gethandle(), p.getThreadHandle());
    p.jobHandle(handle);
    std::cout << "PID:" << pid << endl;
    pid = p.wait();
    std::cout << "Wait:" << pid << endl;
    getchar();
    p.terminate();
    getchar();
    ACE::fini();
    return 0;
}

Docker容器资源控制

目前在Linux生态圈,用Docker发布和运行程序基本已经成为一个标准,同时用Docker管理本地私有云也越来越流行,尤其对于用Kubernetes管理的容器云,如何限制容器资源变得非常重要。

在RedHat上,Docker拥有自己的cgroup控制目录,位于各个子系统下的system.slice的文件夹里面。当我们启动一个docker容器之后,就会产生这个容器ID开头的一个子目录,用来配置这个容器里面的所有进程对系统资源的使用。

[root@rhel711 ~]# docker run -ti --rm ubuntu /bin/sh
[root@rhel711 ~]# docker ps -a
CONTAINER ID        IMAGE               COMMAND             CREATED              STATUS              PORTS               NAMES
e9102a6edb29        ubuntu              "/bin/sh"           About a minute ago   Up About a minute                       trusting_ritchie
[root@rhel711 ~]# ls /sys/fs/cgroup/memory/system.slice/docker-e9102a6edb29491ee92ed0441238d41ca7c3d5d5841757679d17025ffa4b9f39.scope/
cgroup.clone_children       memory.kmem.max_usage_in_bytes      memory.limit_in_bytes            memory.numa_stat            memory.use_hierarchy
cgroup.event_control        memory.kmem.slabinfo                memory.max_usage_in_bytes        memory.oom_control          notify_on_release
cgroup.procs                memory.kmem.tcp.failcnt             memory.memsw.failcnt             memory.pressure_level       tasks
memory.failcnt              memory.kmem.tcp.limit_in_bytes      memory.memsw.limit_in_bytes      memory.soft_limit_in_bytes
memory.force_empty          memory.kmem.tcp.max_usage_in_bytes  memory.memsw.max_usage_in_bytes  memory.stat
memory.kmem.failcnt         memory.kmem.tcp.usage_in_bytes      memory.memsw.usage_in_bytes      memory.swappiness
memory.kmem.limit_in_bytes  memory.kmem.usage_in_bytes          memory.move_charge_at_immigrate  memory.usage_in_bytes

其中task目录中存放的为容器中进程的PID,以我们这个示例来说,我们在容器中启动了 /bin/sh 进程,这个进程ID为2730。

[root@rhel711 ~]# cat /sys/fs/cgroup/memory/system.slice/docker-e9102a6edb29491ee92ed0441238d41ca7c3d5d5841757679d17025ffa4b9f39.scope/tasks 
27230
[root@rhel711 ~]# ps -ef | grep 27230
root     27230  1182  0 02:39 pts/3    00:00:00 /bin/sh

云计算中Docker容器的资源收集

目前通过Docker容器部署大数据平台也比较流行,但是大数据平台需要获取每个节点运行环境的资源配额,对于已经运行在Docker容器里面的进程,如何判断自己拥有多少系统资源也可以通过cgroup文件系统获取。但是Docker容器里面看到的cgroup的文件目录和宿主机不同,docker容器里面没有system.slice文件夹,直接以/sys/fs/cgroup/开头,可以通过命令查看。所以可以通过这个目录下的memory.limit_in_bytes获取容器自身的物理内存配额。对于容器中CPU core数目的获取,可以通过这个公式获取到近似的core数:min(1, (int)ceil(cpu.cfs_quota_us/cpu.cfs_period_us))。

[root@docker sym]# docker exec -ti tensorflow cat /proc/mounts | grep memory
cgroup /sys/fs/cgroup/memory cgroup rw,nosuid,nodev,noexec,relatime,memory 0 0

用Kubernetes部署的容器平台需要提前定义资源配额,否则容器可以使用到宿主机的所有资源,资源配额在YAML文件的resources中定义:

    spec:
      containers:
        - name: sym-compute
          image: sequenceiq/spark
          command: ["/bin/sh", "-c", "start.sh"]
          resources:
            requests:
              memory: 4096M
            limits:
              memory: 4096M

YARN容器管理

作为容器管理的平台,Kubernetes主要用来在容器中部署分布式应用程序,YARN作为一个资源管理平台也支持容器的管理,主要用来以容器的方式运行大数据作业。像Spark将YARN作为资源管理器运行Spark job。

YARN支持对现有容器大小的调整(cgroup和jobobjects都支持修改资源配额),当用户从YARN申请了一些固定大小的容器,想改变容器资源配额的大小的时候不需要释放掉这些容器重新申请,YARN支持动态改变已经分配的容器的大小。

结束语

随着大数据和云计算技术的发展,资源控制和管理作为底层技术已经非常成熟,掌握这些技术便可以在大数据处理中游刃有余。

参考资源

  • JobObjects MSDN(https://msdn.microsoft.com/en-us/library/windows/desktop/ms684161%28v=vs.85%29.aspx)
  • Spark on YARN(http://blog.csdn.net/dev_csdn/article/details/spark.apache.org/docs/latest/running-on-yarn.html)
  • cgroup man pages(https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/6/html/resource_management_guide/ch01)

作者:吕金明,系统架构师,2011加入IBM至今,一直从事分布式计算以及大数据相关的研发工作,以及大数据产品的集成,如Spark,Docker, Kubernetes, Tensorflow等开源框架及技术。