聊聊flink的RestClientConfiguration
时间:2022-06-18
本文章向大家介绍聊聊flink的RestClientConfiguration,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
序
本文主要研究一下flink的RestClientConfiguration
RestClientConfiguration
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
public final class RestClientConfiguration {
@Nullable
private final SSLHandlerFactory sslHandlerFactory;
private final long connectionTimeout;
private final long idlenessTimeout;
private final int maxContentLength;
private RestClientConfiguration(
@Nullable final SSLHandlerFactory sslHandlerFactory,
final long connectionTimeout,
final long idlenessTimeout,
final int maxContentLength) {
checkArgument(maxContentLength > 0, "maxContentLength must be positive, was: %d", maxContentLength);
this.sslHandlerFactory = sslHandlerFactory;
this.connectionTimeout = connectionTimeout;
this.idlenessTimeout = idlenessTimeout;
this.maxContentLength = maxContentLength;
}
/**
* Returns the {@link SSLEngine} that the REST client endpoint should use.
*
* @return SSLEngine that the REST client endpoint should use, or null if SSL was disabled
*/
@Nullable
public SSLHandlerFactory getSslHandlerFactory() {
return sslHandlerFactory;
}
/**
* {@see RestOptions#CONNECTION_TIMEOUT}.
*/
public long getConnectionTimeout() {
return connectionTimeout;
}
/**
* {@see RestOptions#IDLENESS_TIMEOUT}.
*/
public long getIdlenessTimeout() {
return idlenessTimeout;
}
/**
* Returns the max content length that the REST client endpoint could handle.
*
* @return max content length that the REST client endpoint could handle
*/
public int getMaxContentLength() {
return maxContentLength;
}
/**
* Creates and returns a new {@link RestClientConfiguration} from the given {@link Configuration}.
*
* @param config configuration from which the REST client endpoint configuration should be created from
* @return REST client endpoint configuration
* @throws ConfigurationException if SSL was configured incorrectly
*/
public static RestClientConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
Preconditions.checkNotNull(config);
final SSLHandlerFactory sslHandlerFactory;
if (SSLUtils.isRestSSLEnabled(config)) {
try {
sslHandlerFactory = SSLUtils.createRestClientSSLEngineFactory(config);
} catch (Exception e) {
throw new ConfigurationException("Failed to initialize SSLContext for the REST client", e);
}
} else {
sslHandlerFactory = null;
}
final long connectionTimeout = config.getLong(RestOptions.CONNECTION_TIMEOUT);
final long idlenessTimeout = config.getLong(RestOptions.IDLENESS_TIMEOUT);
int maxContentLength = config.getInteger(RestOptions.CLIENT_MAX_CONTENT_LENGTH);
return new RestClientConfiguration(sslHandlerFactory, connectionTimeout, idlenessTimeout, maxContentLength);
}
}
- RestClientConfiguration有四个属性,分别是sslHandlerFactory、connectionTimeout、idlenessTimeout、maxContentLength
- fromConfiguration方法从Configuration中创建SSLHandlerFactory,其读取的是相关配置有security.ssl.rest.enabled,默认为false;security.ssl.protocol,默认为TLSv1.2;security.ssl.algorithms,默认为TLS_RSA_WITH_AES_128_CBC_SHA;security.ssl.rest.authentication-enabled,默认为false
- connectionTimeout读取的是rest.connection-timeout配置,默认是15000毫秒;idlenessTimeout读取的是rest.idleness-timeout配置,默认5分钟;maxContentLength读取的是rest.client.max-content-length配置,默认是104_857_600
RestClient
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
public class RestClient implements AutoCloseableAsync {
private static final Logger LOG = LoggerFactory.getLogger(RestClient.class);
private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
// used to open connections to a rest server endpoint
private final Executor executor;
private final Bootstrap bootstrap;
private final CompletableFuture<Void> terminationFuture;
private final AtomicBoolean isRunning = new AtomicBoolean(true);
public RestClient(RestClientConfiguration configuration, Executor executor) {
Preconditions.checkNotNull(configuration);
this.executor = Preconditions.checkNotNull(executor);
this.terminationFuture = new CompletableFuture<>();
final SSLHandlerFactory sslHandlerFactory = configuration.getSslHandlerFactory();
ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
try {
// SSL should be the first handler in the pipeline
if (sslHandlerFactory != null) {
socketChannel.pipeline().addLast("ssl", sslHandlerFactory.createNettySSLHandler());
}
socketChannel.pipeline()
.addLast(new HttpClientCodec())
.addLast(new HttpObjectAggregator(configuration.getMaxContentLength()))
.addLast(new ChunkedWriteHandler()) // required for multipart-requests
.addLast(new IdleStateHandler(configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), TimeUnit.MILLISECONDS))
.addLast(new ClientHandler());
} catch (Throwable t) {
t.printStackTrace();
ExceptionUtils.rethrow(t);
}
}
};
NioEventLoopGroup group = new NioEventLoopGroup(1, new ExecutorThreadFactory("flink-rest-client-netty"));
bootstrap = new Bootstrap();
bootstrap
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(configuration.getConnectionTimeout()))
.group(group)
.channel(NioSocketChannel.class)
.handler(initializer);
LOG.info("Rest client endpoint started.");
}
@Override
public CompletableFuture<Void> closeAsync() {
return shutdownInternally(Time.seconds(10L));
}
public void shutdown(Time timeout) {
final CompletableFuture<Void> shutDownFuture = shutdownInternally(timeout);
try {
shutDownFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
LOG.info("Rest endpoint shutdown complete.");
} catch (Exception e) {
LOG.warn("Rest endpoint shutdown failed.", e);
}
}
private CompletableFuture<Void> shutdownInternally(Time timeout) {
if (isRunning.compareAndSet(true, false)) {
LOG.info("Shutting down rest endpoint.");
if (bootstrap != null) {
if (bootstrap.group() != null) {
bootstrap.group().shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
.addListener(finished -> {
if (finished.isSuccess()) {
terminationFuture.complete(null);
} else {
terminationFuture.completeExceptionally(finished.cause());
}
});
}
}
}
return terminationFuture;
}
//......
}
- RestClient的构造器接收RestClientConfiguration及Executor两个参数,构造器里头创建了netty的Bootstrap,其中ChannelOption.CONNECT_TIMEOUT_MILLIS使用的是configuration.getConnectionTimeout();IdleStateHandler的readerIdleTime、writerIdleTime、allIdleTime使用的是configuration.getIdlenessTimeout();HttpObjectAggregator的maxContentLength使用的是configuration.getMaxContentLength();SSLHandlerFactory使用的是configuration.getSslHandlerFactory()
小结
- RestClientConfiguration有四个属性,分别是sslHandlerFactory、connectionTimeout、idlenessTimeout、maxContentLength;fromConfiguration方法从Configuration中创建SSLHandlerFactory,其读取的是相关配置有security.ssl.rest.enabled,默认为false;security.ssl.protocol,默认为TLSv1.2;security.ssl.algorithms,默认为TLS_RSA_WITH_AES_128_CBC_SHA;security.ssl.rest.authentication-enabled,默认为false
- connectionTimeout读取的是rest.connection-timeout配置,默认是15000毫秒;idlenessTimeout读取的是rest.idleness-timeout配置,默认5分钟;maxContentLength读取的是rest.client.max-content-length配置,默认是104_857_600
- RestClient的构造器接收RestClientConfiguration及Executor两个参数,构造器里头创建了netty的Bootstrap,其中ChannelOption.CONNECT_TIMEOUT_MILLIS使用的是configuration.getConnectionTimeout();IdleStateHandler的readerIdleTime、writerIdleTime、allIdleTime使用的是configuration.getIdlenessTimeout();HttpObjectAggregator的maxContentLength使用的是configuration.getMaxContentLength();SSLHandlerFactory使用的是configuration.getSslHandlerFactory()
doc
- 在Apache Spark上跑Logistic Regression算法
- 第四章:Shiro的身份认证(Authentication)——深入浅出学Shiro细粒度权限开发框架
- 第五章:Shiro的授权(Authorization)——深入浅出学Shiro细粒度权限开发框架
- 第六章:Shiro的Realms——深入浅出学Shiro细粒度权限开发框架
- 第八章:Shiro和Spring的集成——深入浅出学Shiro细粒度权限开发框架
- 第九章:Shiro的Web——深入浅出学Shiro细粒度权限开发框架
- 第十章:Shiro的Cache——深入浅出学Shiro细粒度权限开发框架
- Appboy基于MongoDB的数据密集型实践
- 微信企业号登录授权Java实现获取员工userid根据userid换openid
- 微信支付-微信红包Java版本
- Universe入门
- 分享一款值得分享的写作工具
- 微信二次开发Java自定义菜单事件实现
- 微信OAuth授权获取用户OpenId-JAVA(个人经验)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- ANDROID BottomNavigationBar底部导航栏的实现示例
- Android实现时间倒计时功能
- Android开发基于Drawable实现圆角矩形的方法
- Android开发中滑动分页功能实例详解
- Android登录注册功能 数据库SQLite验证
- CMQ消费者报错,无法获取本机ip地址问题排查
- 腾讯云TKE-Metrics-Server案例: TKE中自建Metrics-Server问题
- (建议收藏)关于JS事件循环, 这一篇就够啦
- TensorFlow2 开发指南 | 02 回归问题之汽车燃油效率预测
- 腾讯云TKE-Ingress案例: TKE-Ingress与Nginx-Ingress共存
- 玩转Kotlin 彻底弄懂Lambda和高阶函数
- leetcode之仅仅反转字母
- 3分钟短文:Laravel的“南天门”,过滤掉七七八八的数据
- 【1024,Serverless】maimai_DX 查分器
- TRTC/MLVB/IM案例:SDK用户日志提取与管理的一种实现方案