NiFi.java 源码解读
时间:2022-07-24
本文章向大家介绍NiFi.java 源码解读,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
在RunNiFi.java源码解读中有提到,最终RunNiFi进程在主程序中启动了新的进程NiFi,并循环监听NIFI进程的状态,直到NIFI进程不在运行,RunNiFi主程序才结束。
以下便是NIFI进程的入口类,从main方法开始即可,关键地方有注释。(自己跟着源码逻辑读更好)
package org.apache.nifi;
public class NiFi {
private static final Logger LOGGER = LoggerFactory.getLogger(NiFi.class);
private static final String KEY_FILE_FLAG = "-K";
private final NiFiServer nifiServer;
private final BootstrapListener bootstrapListener;
// RunNiFi进程的Socket监听端口 进程间通信
public static final String BOOTSTRAP_PORT_PROPERTY = "nifi.bootstrap.listen.port";
//标记 是否关闭
private volatile boolean shutdown = false;
// nifi.properties
public NiFi(final NiFiProperties properties)
throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
this(properties, ClassLoader.getSystemClassLoader());
}
public NiFi(final NiFiProperties properties, ClassLoader rootClassLoader)
throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
// 整个Java进程只能有一个krb5.conf,所以在启动期间全局设置它,这样处理器和Kerberos身份验证代码就不必设置它
final File kerberosConfigFile = properties.getKerberosConfigurationFile();
if (kerberosConfigFile != null) {
final String kerberosConfigFilePath = kerberosConfigFile.getAbsolutePath();
LOGGER.info("Setting java.security.krb5.conf to {}", new Object[]{kerberosConfigFilePath});
System.setProperty("java.security.krb5.conf", kerberosConfigFilePath);
}
setDefaultUncaughtExceptionHandler();
// 注册 shutdown hook
addShutdownHook();
//RunNiFi 启动NIFI时设置 RunNIFI进程 的Socket的监听端口 NIFI进程将本进程的Socket监听端口和pid传 给RunNIFI ,RunNIFI便可以传达指令
final String bootstrapPort = System.getProperty(BOOTSTRAP_PORT_PROPERTY);
if (bootstrapPort != null) {
try {
final int port = Integer.parseInt(bootstrapPort);
if (port < 1 || port > 65535) {
throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535");
}
bootstrapListener = new BootstrapListener(this, port);
bootstrapListener.start();
} catch (final NumberFormatException nfe) {
throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535");
}
} else {
LOGGER.info("NiFi started without Bootstrap Port information provided; will not listen for requests from Bootstrap");
bootstrapListener = null;
}
//删除web工作目录——如果应用程序没有成功启动,web应用程序目录可能处于无效状态。
//当这种情况发生时,jetty将不会尝试重新将war解压缩到目录中。
//通过删除工作目录,我们可以确信它将尝试在每次应用程序启动时提取war。
// nifi.web.jetty.working.directory= 默认值:./work/jetty
File webWorkingDir = properties.getWebWorkingDirectory();
FileUtils.deleteFilesInDirectory(webWorkingDir, null, LOGGER, true, true);
FileUtils.deleteFile(webWorkingDir, LOGGER, 3);
//确定我们运行的机器是否有时间问题。
detectTimingIssues();
// 重定向JUL日志事件
initLogging();
// 这里可以看另一篇讲解 【NIFI nar包加载机制源码解读】
// nifi.nar.library.directory=./lib 获取lib bundle
final Bundle systemBundle = SystemBundle.create(properties);
// 期间过滤了非nar包的文件 解压nar到 /work/nar/framework /work/nar/extending /work/docs/components
// 解压doc file
// 读取 各个JarFile META-INF/services/org.apache.nifi.processor.Processor META-INF/services/org.apache.nifi.reporting.ReportingTask META-INF/services/org.apache.nifi.controller.ControllerService
// 返回的extensionMapping有三个 Map 分别存了Processor ControllerService ReportingTask
final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties, systemBundle);
// 获取 extensions classloaders (单例)
NarClassLoaders narClassLoaders = NarClassLoaders.getInstance();
//初始化 为所有的nar包创建唯一 的类加载器
//
narClassLoaders.init(rootClassLoader,
properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory());
// load the framework classloader
final ClassLoader frameworkClassLoader = narClassLoaders.getFrameworkBundle().getClassLoader();
if (frameworkClassLoader == null) {
throw new IllegalStateException("Unable to find the framework NAR ClassLoader.");
}
//此集合是有序的 首先是framework 其次其余nar包按 依赖,被依赖在先 ,Bundle中有此nar包各种信息以及nar的类加载器
final Set<Bundle> narBundles = narClassLoaders.getBundles();
// frameworkClassLoader类加载器加载framework bundle(nifi-framework-nar)
Thread.currentThread().setContextClassLoader(frameworkClassLoader);
// 顾名思义 其中启用 了Jetty
Class<?> jettyServer = Class.forName("org.apache.nifi.web.server.JettyServer", true, frameworkClassLoader);
Constructor<?> jettyConstructor = jettyServer.getConstructor(NiFiProperties.class, Set.class);
final long startTime = System.nanoTime();
nifiServer = (NiFiServer) jettyConstructor.newInstance(properties, narBundles);
nifiServer.setExtensionMapping(extensionMapping);
nifiServer.setBundles(systemBundle, narBundles);
if (shutdown) {
LOGGER.info("NiFi has been shutdown via NiFi Bootstrap. Will not start Controller");
} else {
//余下的启动就交给JettyServer了
nifiServer.start();
if (bootstrapListener != null) {
bootstrapListener.sendStartedStatus(true);
}
final long duration = System.nanoTime() - startTime;
LOGGER.info("Controller initialization took " + duration + " nanoseconds "
+ "(" + (int) TimeUnit.SECONDS.convert(duration, TimeUnit.NANOSECONDS) + " seconds).");
}
}
protected void setDefaultUncaughtExceptionHandler() {
Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread t, final Throwable e) {
LOGGER.error("An Unknown Error Occurred in Thread {}: {}", t, e.toString());
LOGGER.error("", e);
}
});
}
protected void addShutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
// shutdown the jetty server
shutdownHook();
}
}));
}
protected void initLogging() {
SLF4JBridgeHandler.removeHandlersForRootLogger();
SLF4JBridgeHandler.install();
}
private static ClassLoader createBootstrapClassLoader() {
//获取lib文件夹中的文件列表
final List<URL> urls = new ArrayList<>();
try {
Files.list(Paths.get("lib/bootstrap")).forEach(p -> {
try {
urls.add(p.toUri().toURL());
} catch (final MalformedURLException mef) {
LOGGER.warn("Unable to load " + p.getFileName() + " due to " + mef, mef);
}
});
} catch (IOException ioe) {
LOGGER.warn("Unable to access lib/bootstrap to create bootstrap classloader", ioe);
}
//创建bootstrap classloader
return new URLClassLoader(urls.toArray(new URL[0]), Thread.currentThread().getContextClassLoader());
}
protected void shutdownHook() {
try {
shutdown();
} catch (final Throwable t) {
LOGGER.warn("Problem occurred ensuring Jetty web server was properly terminated due to " + t);
}
}
protected void shutdown() {
this.shutdown = true;
LOGGER.info("Initiating shutdown of Jetty web server...");
if (nifiServer != null) {
nifiServer.stop();
}
if (bootstrapListener != null) {
bootstrapListener.stop();
}
LOGGER.info("Jetty web server shutdown completed (nicely or otherwise).");
}
/**
* 确定我们运行的机器是否有时间问题。
*/
private void detectTimingIssues() {
final int minRequiredOccurrences = 25;
final int maxOccurrencesOutOfRange = 15;
final AtomicLong lastTriggerMillis = new AtomicLong(System.currentTimeMillis());
final ScheduledExecutorService service = Executors.newScheduledThreadPool(1, new ThreadFactory() {
private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
@Override
public Thread newThread(final Runnable r) {
final Thread t = defaultFactory.newThread(r);
t.setDaemon(true);
t.setName("Detect Timing Issues");
return t;
}
});
final AtomicInteger occurrencesOutOfRange = new AtomicInteger(0);
final AtomicInteger occurrences = new AtomicInteger(0);
final Runnable command = new Runnable() {
@Override
public void run() {
final long curMillis = System.currentTimeMillis();
final long difference = curMillis - lastTriggerMillis.get();
final long millisOff = Math.abs(difference - 2000L);
occurrences.incrementAndGet();
if (millisOff > 500L) {
occurrencesOutOfRange.incrementAndGet();
}
lastTriggerMillis.set(curMillis);
}
};
final ScheduledFuture<?> future = service.scheduleWithFixedDelay(command, 2000L, 2000L, TimeUnit.MILLISECONDS);
final TimerTask timerTask = new TimerTask() {
@Override
public void run() {
future.cancel(true);
service.shutdownNow();
if (occurrences.get() < minRequiredOccurrences || occurrencesOutOfRange.get() > maxOccurrencesOutOfRange) {
LOGGER.warn("NiFi has detected that this box is not responding within the expected timing interval, which may cause "
+ "Processors to be scheduled erratically. Please see the NiFi documentation for more information.");
}
}
};
final Timer timer = new Timer(true);
timer.schedule(timerTask, 60000L);
}
/**
* 应用程序的主要入口点。
*/
public static void main(String[] args) {
LOGGER.info("Launching NiFi...");
try {
NiFiProperties properties = convertArgumentsToValidatedNiFiProperties(args);
new NiFi(properties);
} catch (final Throwable t) {
LOGGER.error("Failure to launch NiFi due to " + t, t);
}
}
protected static NiFiProperties convertArgumentsToValidatedNiFiProperties(String[] args) {
final ClassLoader bootstrap = createBootstrapClassLoader();
NiFiProperties properties = initializeProperties(args, bootstrap);
properties.validate();
return properties;
}
private static NiFiProperties initializeProperties(final String[] args, final ClassLoader boostrapLoader) {
// Try to get key
// If key doesn't exist, instantiate without
// Load properties
// If properties are protected and key missing, throw RuntimeException
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
final String key;
try {
key = loadFormattedKey(args);
// The key might be empty or null when it is passed to the loader
} catch (IllegalArgumentException e) {
final String msg = "The bootstrap process did not provide a valid key";
throw new IllegalArgumentException(msg, e);
}
Thread.currentThread().setContextClassLoader(boostrapLoader);
try {
final Class<?> propsLoaderClass = Class.forName("org.apache.nifi.properties.NiFiPropertiesLoader", true, boostrapLoader);
final Method withKeyMethod = propsLoaderClass.getMethod("withKey", String.class);
final Object loaderInstance = withKeyMethod.invoke(null, key);
final Method getMethod = propsLoaderClass.getMethod("get");
final NiFiProperties properties = (NiFiProperties) getMethod.invoke(loaderInstance);
LOGGER.info("Loaded {} properties", properties.size());
return properties;
} catch (InvocationTargetException wrappedException) {
final String msg = "There was an issue decrypting protected properties";
throw new IllegalArgumentException(msg, wrappedException.getCause() == null ? wrappedException : wrappedException.getCause());
} catch (final IllegalAccessException | NoSuchMethodException | ClassNotFoundException reex) {
final String msg = "Unable to access properties loader in the expected manner - apparent classpath or build issue";
throw new IllegalArgumentException(msg, reex);
} catch (final RuntimeException e) {
final String msg = "There was an issue decrypting protected properties";
throw new IllegalArgumentException(msg, e);
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
private static String loadFormattedKey(String[] args) {
String key = null;
List<String> parsedArgs = parseArgs(args);
// Check if args contain protection key
if (parsedArgs.contains(KEY_FILE_FLAG)) {
key = getKeyFromKeyFileAndPrune(parsedArgs);
// Format the key (check hex validity and remove spaces)
key = formatHexKey(key);
}
if (null == key) {
return "";
} else if (!isHexKeyValid(key)) {
throw new IllegalArgumentException("The key was not provided in valid hex format and of the correct length");
} else {
return key;
}
}
private static String getKeyFromKeyFileAndPrune(List<String> parsedArgs) {
String key = null;
LOGGER.debug("The bootstrap process provided the " + KEY_FILE_FLAG + " flag");
int i = parsedArgs.indexOf(KEY_FILE_FLAG);
if (parsedArgs.size() <= i + 1) {
LOGGER.error("The bootstrap process passed the {} flag without a filename", KEY_FILE_FLAG);
throw new IllegalArgumentException("The bootstrap process provided the " + KEY_FILE_FLAG + " flag but no key");
}
try {
String passwordfile_path = parsedArgs.get(i + 1);
// Slurp in the contents of the file:
byte[] encoded = Files.readAllBytes(Paths.get(passwordfile_path));
key = new String(encoded,StandardCharsets.UTF_8);
if (0 == key.length())
throw new IllegalArgumentException("Key in keyfile " + passwordfile_path + " yielded an empty key");
LOGGER.info("Now overwriting file in "+passwordfile_path);
// Overwrite the contents of the file (to avoid littering file system
// unlinked with key material):
File password_file = new File(passwordfile_path);
FileWriter overwriter = new FileWriter(password_file,false);
// Construe a random pad:
Random r = new Random();
StringBuffer sb = new StringBuffer();
// Note on correctness: this pad is longer, but equally sufficient.
while(sb.length() < encoded.length){
sb.append(Integer.toHexString(r.nextInt()));
}
String pad = sb.toString();
LOGGER.info("Overwriting key material with pad: "+pad);
overwriter.write(pad);
overwriter.close();
LOGGER.info("Removing/unlinking file: "+passwordfile_path);
password_file.delete();
} catch (IOException e) {
LOGGER.error("Caught IOException while retrieving the "+KEY_FILE_FLAG+"-passed keyfile; aborting: "+e.toString());
System.exit(1);
}
LOGGER.info("Read property protection key from key file provided by bootstrap process");
return key;
}
private static List<String> parseArgs(String[] args) {
List<String> parsedArgs = new ArrayList<>(Arrays.asList(args));
for (int i = 0; i < parsedArgs.size(); i++) {
if (parsedArgs.get(i).startsWith(KEY_FILE_FLAG + " ")) {
String[] split = parsedArgs.get(i).split(" ", 2);
parsedArgs.set(i, split[0]);
parsedArgs.add(i + 1, split[1]);
break;
}
}
return parsedArgs;
}
private static String formatHexKey(String input) {
if (input == null || input.trim().isEmpty()) {
return "";
}
return input.replaceAll("[^0-9a-fA-F]", "").toLowerCase();
}
private static boolean isHexKeyValid(String key) {
if (key == null || key.trim().isEmpty()) {
return false;
}
// Key length is in "nibbles" (i.e. one hex char = 4 bits)
return Arrays.asList(128, 196, 256).contains(key.length() * 4) && key.matches("^[0-9a-fA-F]*$");
}
}
- scala中常用但其他语言不常见的符号含义 - 心灵空谷幽兰 - 博客园
- 每天一个Linux命令:telnet
- c++基础 explicit
- 每天一个Linux命令:tail
- Hive高级优化
- OpenStack中的RESTful API是如何实现的?
- servlet乱码问题总结
- Servlet学习之web服务器Tomcat 详解
- 结合源码彻底讲解Aggregate vs treeAggregate
- Nginx+uWSGI部署Django网站的详细步骤,脱坑必备,值得收藏!
- 友元类
- 大数据最佳实践 | HBase客户端
- 模板类的友元
- Qt学习笔记 TableWidget使用说明和增删改操作的实现
- java教程
- Java快速入门
- Java 开发环境配置
- Java基本语法
- Java 对象和类
- Java 基本数据类型
- Java 变量类型
- Java 修饰符
- Java 运算符
- Java 循环结构
- Java 分支结构
- Java Number类
- Java Character类
- Java String类
- Java StringBuffer和StringBuilder类
- Java 数组
- Java 日期时间
- Java 正则表达式
- Java 方法
- Java 流(Stream)、文件(File)和IO
- Java 异常处理
- Java 继承
- Java 重写(Override)与重载(Overload)
- Java 多态
- Java 抽象类
- Java 封装
- Java 接口
- Java 包(package)
- Java 数据结构
- Java 集合框架
- Java 泛型
- Java 序列化
- Java 网络编程
- Java 发送邮件
- Java 多线程编程
- Java Applet基础
- Java 文档注释
- 谈谈 INotifyPropertyChanged 的实现
- C 语言小知识
- 使用代码配置 NHibernate
- [Introduction]Go特殊的引用类型:值传递/指针传递/引用传递
- PythonforResearch | 0_语法基础
- iOS 系统中的视图动画
- 在 Android 的 /data 目录下添加虚拟内存
- 玩转安卓模拟器命令行
- 如何利用NLog输出结构化日志,并在Kibana优雅分析日志?
- Android 应用保存状态
- 2020-8-9日报:修复zip在某些X64机器上的运行崩溃问题
- 专题一:预处理数据(使用sklearn-preprocessing)
- 「Docker」使用 Docker run 覆盖 ENTRYPOINT
- 尝试在 Mono 3.0 下运行 ASP.NET MVC 4
- CentOS7使用yum安装nginx报错:获取 GPG 密钥失败:[Errno 14] curl#60 - "Peer's Certificate has expired."