数据并行和任务并行
OpenCL并行加减乘除示例——数据并行与任务并行
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/zhouxuanyuye/article/details/79949409
OpenCL并行加减乘除示例——数据并行与任务并行
关键词:OpenCL; data parallel; task parallel
数据并行化计算与任务并行化分解可以加快程序的运行速度。
如下基本算术例子,输入数组A和数组B,得到输出数组C,C的结果如图中output所示。
图1、加减乘除例子
我们可以通过以下代码计算结果,这块代码我们暂且称为功能函数:
float C[16];
int i;
for(i=0; i<4; i++)
{
C[i*4+0] = A[i*4+0] + B[i*4+0]; //task A
C[i*4+1] = A[i*4+1] - B[i*4+1];//task B
C[i*4+2] = A[i*4+2] * B[i*4+2];//task C
C[i*4+3] = A[i*4+3] / B[i*4+3];// task D
}
1、数据并行(data parallel)
可以发现每一个for循环都由加减乘除4个任务组成,分别为task A、task B、task C和task D。按时间顺序从0时刻开始执行i=0到i=3的4个计算单元,运行完成时间假设为T。
图2. 顺序执行图
从图2我们也可以看出,对于每个程序块,A,B的数据来源都不同,图中的颜色对应task的颜色,由于数据之间并没有依赖关系,所以在程序设计时可以使i=0,1,2,3四个程序块一起运行,将不同的数据给相同的处理函数同时运行,理想化得使运行时间缩减到T/4,如图3所示。这种办法对不同的数据使用相同的核函数,称为数据并行。
图3. 数据并行方法图
数据化并行使用的OpenCL的API函数是:clEnqueueNDRangeKernel()
以下是参考程序:
host.cpp:
#include "stdafx.h"
#include <stdio.h>
#include <stdlib.h>
#include <string>
#include <CL/cl.h>
#include <time.h>
#define MAX_SOURCE_SIZE (0x100000)
//data parallel
int main()
{
cl_platform_id platform_id = NULL;
cl_device_id device_id = NULL;
cl_context context = NULL;
cl_command_queue command_queue = NULL;
cl_mem Amobj = NULL;
cl_mem Bmobj = NULL;
cl_mem Cmobj = NULL;
cl_program program = NULL;
cl_kernel kernel = NULL;
cl_uint ret_num_devices;
cl_uint ret_num_platforms;
cl_int ret;
int i, j;
float *A;
float *B;
float *C;
A = (float *)malloc(4 * 4 * sizeof(float));
B = (float *)malloc(4 * 4 * sizeof(float));
C = (float *)malloc(4 * 4 * sizeof(float));
FILE *fp;
const char fileName[] = "./dataParallel.cl";
size_t source_size;
char *source_str;
/* Load kernel source file */
fp = fopen(fileName, "r");
if (!fp) {
fprintf(stderr, "Failed to load kernel.гдn");
exit(1);
}
source_str = (char *)malloc(MAX_SOURCE_SIZE);
source_size = fread(source_str, 1, MAX_SOURCE_SIZE, fp);
fclose(fp);
/* Initialize input data */
printf("Initialize input data");
for (i = 0; i < 4; i++) {
for (j = 0; j < 4; j++) {
A[i * 4 + j] = i * 4 + j + 1;
B[i * 4 + j] = j * 4 + i + 1;
}
}
printf("n");
printf("A array data:n");
for (i = 0; i < 4; i++) {
for (int j=0; j<4; j++){
printf("%.2ft",A[i*4+j]);
}
printf("n");
}
printf("B array data:n");
for (i = 0; i < 4; i++) {
for (int j=0; j<4; j++){
printf("%.2ft",B[i*4+j]);
}
printf("n");
}
clock_t start, finish;
double duration;
printf("DataParallel kernels tart to executen");
start = clock();
/* Get Platform/Device Information */
ret = clGetPlatformIDs(1, &platform_id, &ret_num_platforms);
ret = clGetDeviceIDs(platform_id, CL_DEVICE_TYPE_DEFAULT, 1, &device_id,
&ret_num_devices);
/* Create OpenCL Context */
context = clCreateContext(NULL, 1, &device_id, NULL, NULL, &ret);
/* Create command queue */
command_queue = clCreateCommandQueue(context, device_id, 0, &ret);
/* Create Buffer Object */
Amobj = clCreateBuffer(context, CL_MEM_READ_WRITE, 4 * 4 * sizeof(float), NULL,
&ret);
Bmobj = clCreateBuffer(context, CL_MEM_READ_WRITE, 4 * 4 * sizeof(float), NULL,
&ret);
Cmobj = clCreateBuffer(context, CL_MEM_READ_WRITE, 4 * 4 * sizeof(float), NULL,
&ret);
/* Copy input data to the memory buffer */
ret = clEnqueueWriteBuffer(command_queue, Amobj, CL_TRUE, 0, 4 * 4 * sizeof(float),
A, 0, NULL, NULL);
ret = clEnqueueWriteBuffer(command_queue, Bmobj, CL_TRUE, 0, 4 * 4 * sizeof(float),
B, 0, NULL, NULL);
/* Create kernel program from source file*/
program = clCreateProgramWithSource(context, 1, (const char **)&source_str, (const
size_t *)&source_size, &ret);
ret = clBuildProgram(program, 1, &device_id, NULL, NULL, NULL);
/* Create data parallel OpenCL kernel */
kernel = clCreateKernel(program, "dataParallel", &ret);
/* Set OpenCL kernel arguments */
ret = clSetKernelArg(kernel, 0, sizeof(cl_mem), (void *)&Amobj);
ret = clSetKernelArg(kernel, 1, sizeof(cl_mem), (void *)&Bmobj);
ret = clSetKernelArg(kernel, 2, sizeof(cl_mem), (void *)&Cmobj);
size_t global_item_size = 4;
size_t local_item_size = 1;
/* Execute OpenCL kernel as data parallel */
ret = clEnqueueNDRangeKernel(command_queue, kernel, 1, NULL,
&global_item_size, &local_item_size, 0, NULL, NULL);
/* Transfer result to host */
ret = clEnqueueReadBuffer(command_queue, Cmobj, CL_TRUE, 0, 4 * 4 * sizeof(float),
C, 0, NULL, NULL);
//end of execution
finish = clock();
duration = (double)(finish - start) / CLOCKS_PER_SEC;
printf("n%f secondsn", duration);
/* Display Results */
printf("Calculation result:n");
for (i = 0; i < 4; i++) {
for (j = 0; j < 4; j++) {
printf("%7.2ft", C[i * 4 + j]);
}
printf("n");
}
/* Finalization */
ret = clFlush(command_queue);
ret = clFinish(command_queue);
ret = clReleaseKernel(kernel);
ret = clReleaseProgram(program);
ret = clReleaseMemObject(Amobj);
ret = clReleaseMemObject(Bmobj);
ret = clReleaseMemObject(Cmobj);
ret = clReleaseCommandQueue(command_queue);
ret = clReleaseContext(context);
free(source_str);
free(A);
free(B);
free(C);
system("pause");
return 0;
}
kernel.cl:
__kernel void dataParallel(__global float* A, __global float* B, __global float* C)
{
int base = 4*get_global_id(0);
C[base+0] = A[base+0] + B[base+0];
C[base+1] = A[base+1] - B[base+1];
C[base+2] = A[base+2] * B[base+2];
C[base+3] = A[base+3] / B[base+3];
}
2、任务并行(task parallel)
另外还有一种就是任务并行化,可以使所有功能函数内部的语句并行执行,即任务并行化,如本文中的功能函数可以分解为“加减乘除”这四个任务,可以产生“加减乘除”四个核函数,让四个函数同时执行,如下图所示。
图4、任务并行方法图
以图4中的红色核函数为例,执行的是数组A和数组B中第一列的加法运行,此加法核函数随着时间运行,分别执行了A[0] + B[0]、A[4] + B[4]、A[8] + B[8]和A[12] + B[12]。
数据化并行使用的OpenCL的API函数是:clEnqueueTask()
以下是参考程序:
host.cpp:
// taskparallel.cpp : 定义控制台应用程序的入口点。
//
#include "stdafx.h"
#include <string>
#include <CL/cl.h>
#include <time.h>
#define MAX_SOURCE_SIZE (0x100000)
int main()
{
cl_platform_id platform_id = NULL;
cl_device_id device_id = NULL;
cl_context context = NULL;
cl_command_queue command_queue = NULL;
cl_mem Amobj = NULL;
cl_mem Bmobj = NULL;
cl_mem Cmobj = NULL;
cl_program program = NULL;
cl_kernel kernel[4] = {NULL, NULL, NULL, NULL};
cl_uint ret_num_devices;
cl_uint ret_num_platforms;
cl_int ret;
int i,j;
float *A, *B, *C;
A = (float *) malloc(4*4*sizeof(float));
B = (float *) malloc(4*4*sizeof(float));
C = (float *) malloc(4*4*sizeof(float));
FILE *fp;
const char fileName[] = "./taskParallel.cl";
size_t source_size;
char *source_str;
//load kernel source file
fp = fopen(fileName, "rb");
if(!fp) {
fprintf(stderr, "Failed to load kerneln");
exit(1);
}
source_str = (char *)malloc(MAX_SOURCE_SIZE);
source_size = fread(source_str, 1, MAX_SOURCE_SIZE, fp);
fclose(fp);
//initialize input data
for(i=0; i<4; i++) {
for(j=0; j<4; j++) {
A[i*4+j] = i*4+j+1;
B[i*4+j] = j*4+i+1;
}
}
//print A
printf("nA initilization data: n");
for(i=0; i<4; i++) {
for(j=0; j<4; j++) {
printf("%.2ft", A[i*4+j]);
}
printf("n");
}
//print B
printf("nB initilization data: n");
for(i=0; i<4; i++) {
for(j=0; j<4; j++) {
printf("%.2ft", B[i*4+j]);
}
printf("n");
}
clock_t start, finish;
double duration;
printf("TaskParallel kernels start to executen");
start = clock();
//get platform/device information
ret = clGetPlatformIDs(1, &platform_id, &ret_num_platforms);
ret = clGetDeviceIDs(platform_id, CL_DEVICE_TYPE_DEFAULT,1,&device_id, &ret_num_devices);
//create opencl context
context = clCreateContext(NULL, 1,&device_id, NULL, NULL, &ret);
//create command queue
command_queue = clCreateCommandQueue(context, device_id, CL_QUEUE_OUT_OF_ORDER_EXEC_MODE_ENABLE, &ret);
//create buffer object
Amobj = clCreateBuffer(context, CL_MEM_READ_WRITE, 4*4*sizeof(float), NULL,&ret);
Bmobj = clCreateBuffer(context, CL_MEM_READ_WRITE, 4*4*sizeof(float), NULL,&ret);
Cmobj = clCreateBuffer(context, CL_MEM_READ_WRITE, 4*4*sizeof(float), NULL,&ret);
//copy input data to memory buffer
ret = clEnqueueWriteBuffer(command_queue, Amobj, CL_TRUE, 0, 4*4*sizeof(float), A, 0, NULL, NULL);
ret = clEnqueueWriteBuffer(command_queue, Bmobj, CL_TRUE, 0, 4*4*sizeof(float), B, 0, NULL, NULL);
//create kernel from source
program = clCreateProgramWithSource(context, 1, (const char **)&source_str, (const size_t *)&source_size, &ret);
ret = clBuildProgram(program, 1, &device_id, NULL, NULL, NULL);
//create task parallel
kernel[0] = clCreateKernel(program, "add_parallel", &ret);
kernel[1] = clCreateKernel(program, "sub_parallel", &ret);
kernel[2] = clCreateKernel(program, "mul_parallel", &ret);
kernel[3] = clCreateKernel(program, "div_parallel", &ret);
//set opencl kernel arguments
for (i=0; i<4; i++) {
ret = clSetKernelArg(kernel[i], 0, sizeof(cl_mem), (void *) &Amobj);
ret = clSetKernelArg(kernel[i], 1, sizeof(cl_mem), (void *) &Bmobj);
ret = clSetKernelArg(kernel[i], 2, sizeof(cl_mem), (void *) &Cmobj);
}
//execute opencl kernels
for(i=0; i<4; i++) {
ret = clEnqueueTask(command_queue, kernel[i], 0, NULL, NULL);
}
//copy result to host
ret = clEnqueueReadBuffer(command_queue, Cmobj, CL_TRUE, 0, 4*4*sizeof(float), C, 0, NULL, NULL);
//end of execution
finish = clock();
duration = (double)(finish - start) / CLOCKS_PER_SEC;
printf("n%f secondsn", duration);
//display result
printf("nC result: n");
for(i=0; i<4; i++) {
for(j=0; j<4; j++) {
printf("%.2ft", C[i*4+j]);
}
printf("n");
}
printf("n");
//free
ret = clFlush(command_queue);
ret = clFinish(command_queue);
ret = clReleaseKernel(kernel[0]);
ret = clReleaseKernel(kernel[1]);
ret = clReleaseKernel(kernel[2]);
ret = clReleaseKernel(kernel[3]);
ret = clReleaseProgram(program);
ret = clReleaseMemObject(Amobj);
ret = clReleaseMemObject(Bmobj);
ret = clReleaseMemObject(Cmobj);
ret = clReleaseCommandQueue(command_queue);
ret = clReleaseContext(context);
free(source_str);
free(A);
free(B);
free(C);
system("pause");
return 0;
}
kernel.cl:
__kernel void add_parallel(__global float *A, __global float *B, __global float *C)
{
int base = 0;
for(int i=0;i<4;i++)
{
C[base+i*4] = A[base+i*4] + B[base+i*4];
}
//C[base+0] = A[base+0] + B[base+0];
//C[base+4] = A[base+4] + B[base+4];
//C[base+6] = A[base+8] + B[base+8];
//C[base+12] = A[base+12] + B[base+12];
}
__kernel void sub_parallel(__global float *A, __global float *B, __global float *C)
{
int base = 1;
for(int i=0;i<4;i++)
{
C[base+i*4] = A[base+i*4] - B[base+i*4];
}
}
__kernel void mul_parallel(__global float *A, __global float *B, __global float *C)
{
int base=2;
for(int i=0; i<4; i++)
{
C[base+i*4] = A[base+i*4]*B[base+i*4];
}
}
__kernel void div_parallel(__global float *A, __global float *B, __global float *C)
{
int base = 3;
for(int i=0; i<4; i++)
{
C[base+i*4] = A[base+i*4] / B[base+i*4];
}
}
3、参考
例子及程序来自《The OpenCL Programming Book》,以上例子其实还可以并行化,只要需要足够多的并行度,完全可以利用16个任务一起算,即让加减乘除四个任务里的四个按时间执行的任务同时计算。
- ASP.NET MVC路由扩展:路由映射
- 如何改善遗留的代码库
- ASP.NET的路由系统:根据路由规则生成URL
- ASP.NET Core 1.0中实现文件上传的两种方式(提交表单和采用AJAX)
- 通过3个Hello World应用来了解ASP.NET 5应用是如何运行的(1)
- 工业X.0将至 企业数字化转型该怎么做?
- 通过3个Hello World应用来了解ASP.NET 5应用是如何运行的(2)
- 通过3个Hello World应用来了解ASP.NET 5应用是如何运行的(3)
- 为什么说2018年互联网创业机会将变少
- ASP.NET MVC Controller激活系统详解:IoC的应用[上篇]
- ASP.NET Core的配置(1):读取配置信息
- 权限管理和备份实例
- “协变”、“逆变”与Delegate类型转换
- 如今的人工智能是不是真的已经很聪明了?
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Android加载Assets目录中Xml布局文件
- Android系统在shell中的df命令实现
- 面向对象版tab 栏切换案例
- 构造函数和原型
- 解决json中ensure_ascii=False的问题
- Django自定义列表 models字段显示方式
- 无人驾驶环境感知 | 01 车道线检测网络LanNet原理及实现
- 对象存储COS-数据处理能力升级!“组合拳”助力存储新时代!
- 基于python图像处理API的使用示例
- python 的topk算法实例
- android使用viewpager计算偏移量实现选项卡功能
- Android画板开发之撤销反撤销功能
- Android实现复制Assets文件到SD卡
- Android画板开发之添加背景和保存画板内容为图片
- 直播插件体系设计