从Zookeeper 到 Elastic Job 的Simple Job使用(二)
elastic job demo
一、zookeeper要有
上一篇文章写过了。。
二、maven引入
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
<!-- 使用springframework自定义命名空间时引入 -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.9</version>
</dependency>
三、demo开发
package com.test.elasticjob;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
public class MyElasticJob implements SimpleJob {
@Override
public void execute(ShardingContext context) {
System.out.println(context.toString());
switch (context.getShardingItem()) {
case 0:
System.out.println("------------->>>>0");
break;
case 1:
System.out.println("------------->>>>1");
break;
case 2:
System.out.println("------------->>>>2");
break;
default:
System.out.println("------------->>>>default");
break;
}
}
public static void main(String[] args) {
LiteJobConfiguration jobConfiguration = new MyElasticJob().createJobConfiguration();
new JobScheduler(createRegistryCenter(), jobConfiguration).init();
}
private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("177.17.161.215:2181", "elastic-job-demo"));
regCenter.init();
return regCenter;
}
protected LiteJobConfiguration createJobConfiguration() {
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/5 * * * * ?", 4).shardingItemParameters("0=A,1=B,2=C,3=D").build();
// 定义SIMPLE类型配置
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MyElasticJob.class.getCanonicalName());
// 定义Lite作业根配置
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
return simpleJobRootConfig;
}
}
运行ok
查看远端zookeeper,
比之前多了一个
命名空间。
但是,如果此时将代码改为
public void execute(ShardingContext context) {
System.out.println(context.toString());
switch (context.getShardingParameter()) {
case "A":
System.out.println("------------->>>>0");
break;
case "B":
System.out.println("------------->>>>1");
break;
case "C":
System.out.println("------------->>>>2");
break;
default:
System.out.println("------------->>>>default");
break;
}
}
此时会报空指针异常,为什呢?
按理说,我赋值的是shardingparameter,但是结果确实jobparameter,因为我一开始使用了jobparameter,然后改成shardingparameter
跟踪源码,发现一开始都没问题
但是到了
发现我的sharadingparameter 已经变为了 jobparameter了
因为代码是直接从zk上读了。
我们去zk上看下
果然是从zk上直接读的。 我们换个名字看下
发现又可以了。说明是zk记录了第一次的配置,后面就直接从zk上拿了。 所以,有一个配置
增加 后,我们再看一下
发现也没问题了。
再次看一下zk,发现有两个job了。
elastic job 实现原理
弹性分布式实现
- 第一台服务器上线触发主服务器选举。主服务器一旦下线,则重新触发选举,选举过程中阻塞,只有主服务器选举完成,才会执行其他任务。
- 某作业服务器上线时会自动将服务器信息注册到注册中心,下线时会自动更新服务器状态。
- 主节点选举,服务器上下线,分片总数变更均更新重新分片标记。
- 定时任务触发时,如需重新分片,则通过主服务器分片,分片过程中阻塞,分片结束后才可执行任务。如分片过程中主服务器下线,则先选举主服务器,再分片。
- 通过上一项说明可知,为了维持作业运行时的稳定性,运行过程中只会标记分片状态,不会重新分片。分片仅可能发生在下次任务触发前。
- 每次分片都会按服务器IP排序,保证分片结果不会产生较大波动。
- 实现失效转移功能,在某台服务器执行完毕后主动抓取未分配的分片,并且在某台服务器下线后主动寻找可用的服务器执行任务。
注册中心数据结构
- 注册中心在定义的命名空间下,创建作业名称节点,用于区分不同作业,所以作业一旦创建则不能修改作业名称,如果修改名称将视为新的作业。
- 作业名称节点下又包含4个数据子节点,分别是config, instances, sharding, servers和leader。
config节点
作业配置信息,以JSON格式存储 刚刚,我们也看到了。
instances节点
作业运行实例信息,子节点是当前作业运行实例的主键。作业运行实例主键由作业运行服务器的IP地址和PID构成。作业运行实例主键均为临时节点,当作业实例上线时注册,下线时自动清理。注册中心监控这些节点的变化来协调分布式作业的分片以及高可用。 可在作业运行实例节点写入TRIGGER表示该实例立即执行一次。 当我们运行job时候,那么就会有。
如果下线那么就没有
sharding节点
作业分片信息,子节点是分片项序号,从零开始,至分片总数减一。分片项序号的子节点存储详细信息。每个分片项下的子节点用于控制和记录分片运行状态。节点详细信息说明:
servers节点
作业服务器信息,子节点是作业服务器的IP地址。可在IP地址节点写入DISABLED表示该服务器禁用。 在新的cloud native架构下,servers节点大幅弱化,仅包含控制服务器是否可以禁用这一功能。为了更加纯粹的实现job核心,servers功能未来可能删除,控制服务器是否禁用的能力应该下放至自动化部署系统。
目前就我跑得一台机器。
leader节点
作业服务器主节点信息,分为election,sharding和failover三个子节点。分别用于主节点选举,分片和失效转移处理。
leader节点是内部使用的节点,如果对作业框架原理不感兴趣,可不关注此节点。
最后贴上亮神画的图。
- virtualenvwrapper + pyenv 打造多版本 Python 环境
- 【关关的刷题日记56】Leetcode 107 Binary Tree Level Order Traversal II
- 新手Python渗透工具入门
- ReentrantLock 与 AQS 源码分析
- python识别验证码遇到问题解决方法
- Angular开发实践(七): 跨平台操作DOM及渲染器Renderer2
- Leetcode-Easy 121. Best Time to Buy and Sell Stock
- MongoDB初识
- Python的md5和sha1加密
- LinkedHashSet 源码分析
- Day2下午解题报告
- python获取打开网站的状态码
- 【关关的刷题日记57】Leetcode 101. Symmetric Tree
- FreeBuf官网发布《简易Python Selenium爬虫实现歌曲免费下载》
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- R语言通过WinBUGS对MGARCH和MSV模型进行贝叶斯估计和比较
- Spring Boot中使用 Swagger2 自动构建API文档
- dotNET:怎样处理程序中的异常(理论篇)?
- xmake从入门到精通12:通过自定义脚本实现更灵活地配置
- ggplot2火山图展示RNAseq差异表达分析结果
- 如何根据class_code筛选转录本?
- JNI线程相关
- JNI函数加载
- CSS中的传统布局、多列布局、弹性伸缩布局及Emmet工具
- 数据分析可视化(四)|Pyecharts制作地图的几种方法评析
- tensorflow运行提示未编译使用SSE4.1,SSE4.2等问题的解决方法
- [手把手系列之二]实现多层神经网络
- 使用SystemVerilog简化FPGA中的接口
- 想用深度学习谱写自己的音乐吗?这篇指南来帮你!(附代码)
- 你不是说你会aop吗?