【spark】读取高版本的elasticsearch
时间:2021-08-19
本文章向大家介绍【spark】读取高版本的elasticsearch,主要包括【spark】读取高版本的elasticsearch使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
出现异常栈:
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: SortAggregate(key=[template_id#166], functions=[last(template_content#167, false), last(highlight_index_template#169, false), last(template_pattern#170, false)], output=[template_id#166, template_content#281, highlight_index#283, template_pattern#285]) +- *(2) Sort [template_id#166 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(template_id#166, 200) +- SortAggregate(key=[template_id#166], functions=[partial_last(template_content#167, false), partial_last(highlight_index_template#169, false), partial_last(template_pattern#170, false)], output=[template_id#166, last#476, valueSet#477, last#478, valueSet#479, last#480, valueSet#481]) +- *(1) Sort [template_id#166 ASC NULLS FIRST], false, 0 +- InMemoryTableScan [template_id#166, template_content#167, highlight_index_template#169, template_pattern#170] +- InMemoryRelation [@hostname#160, @message#161, @path#162, @rownumber#163L, @timestamp#164, _metadata#165, template_id#166, template_content#167, highlight_index#168, highlight_index_template#169, template_pattern#170, @@oId#171, @@oIndex#172, @@oEs#173, @@extractedVars#174], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) SerializeFromObject [if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, @hostname), StringType), true, false) AS @hostname#160, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, @message), StringType), true, false) AS @message#161, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, @path), StringType), true, false) AS @path#162, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, @rownumber), LongType) AS @rownumber#163L, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, @timestamp), TimestampType), true, false) AS @timestamp#164, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else newInstance(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData) AS _metadata#165, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, template_id), StringType), true, false) AS template_id#166, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, template_content), StringType), true, false) AS template_content#167, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, highlight_index), StringType), true, false) AS highlight_index#168, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, highlight_index_template), StringType), true, false) AS highlight_index_template#169, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 10, template_pattern), StringType), true, false) AS template_pattern#170, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 11, @@oId), StringType), true, false) AS @@oId#171, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 12, @@oIndex), StringType), true, false) AS @@oIndex#172, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, @@oEs), StringType), true, false) AS @@oEs#173, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 14, @@extractedVars), StringType), true, false) AS @@extractedVars#174] +- *(1) MapElements <function1>, obj#159: org.apache.spark.sql.Row +- *(1) DeserializeToObject createexternalrow(@hostname#0.toString, @message#127.toString, @path#2.toString, @rownumber#3L, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, @timestamp#12, true, false), staticinvoke(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData$, ObjectType(interface scala.collection.Map), toScalaMap, staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(MapObjects_loopValue46, MapObjects_loopIsNull46, StringType, lambdavariable(MapObjects_loopValue46, MapObjects_loopIsNull46, StringType, true).toString, _metadata#5.keyArray, None).array, true, false), staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(MapObjects_loopValue47, MapObjects_loopIsNull47, StringType, lambdavariable(MapObjects_loopValue47, MapObjects_loopIsNull47, StringType, true).toString, _metadata#5.valueArray, None).array, true, false), true, false), template_id#19.toString, template_content#27.toString, highlight_index#36.toString, highlight_index_template#46.toString, template_pattern#57.toString, @@oId#69.toString, @@oIndex#82.toString, @@oEs#96.toString, @@extractedVars#111.toString, StructField(@hostname,StringType,true), StructField(@message,StringType,true), StructField(@path,StringType,true), StructField(@rownumber,LongType,true), StructField(@timestamp,TimestampType,true), StructField(_metadata,MapType(StringType,StringType,true),true), StructField(template_id,StringType,false), StructField(template_content,StringType,false), StructField(highlight_index,StringType,false), ... 6 more fields), obj#158: org.apache.spark.sql.Row +- *(1) Project [@hostname#0, substring(@message#1, 0, 1000) AS @message#127, @path#2, @rownumber#3L, UDF(@timestamp#4, yyyy-MM-dd'T'HH:mm:ss.SSSZ) AS @timestamp#12, _metadata#5, -3 AS template_id#19, AS template_content#27, AS highlight_index#36, AS highlight_index_template#46, AS template_pattern#57, AS @@oId#69, AS @@oIndex#82, 192.168.101.65:9200 AS @@oEs#96, AS @@extractedVars#111] +- *(1) Scan ElasticsearchRelation(Map(es.query -> {"query":{"bool":{"must":[{"range":{"@timestamp":{"gte":"2020-04-01T00:00:00.000+0800"}}},{"range":{"@timestamp":{"lte":"2021-05-31T00:00:00.000+0800"}}}]}}}, es.resource.read -> jiankong.data_eoi_2021_04/, es.read.field.include -> @timestamp,@message,@@oId,@@oIndex,@@oEs,@rownumber,@path,@hostname, es.resource -> jiankong.data_eoi_2021_04/, es.read.metadata -> true, es.nodes -> http://192.168.101.65:9200, es.scroll.size -> 5000),org.apache.spark.sql.SQLContext@62ded874,None) [@rownumber#3L,@message#1,@timestamp#4,@hostname#0,@path#2,_metadata#5] PushedFilters: [], ReadSchema: struct<@rownumber:bigint,@message:string,@timestamp:timestamp,@hostname:string,@path:string,_meta... at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) at org.apache.spark.sql.execution.aggregate.SortAggregateExec.doExecute(SortAggregateExec.scala:75) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:89) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:3037) at org.apache.spark.sql.Dataset.rdd(Dataset.scala:3035) at org.elasticsearch.spark.sql.EsSparkSQL$.saveToEs(EsSparkSQL.scala:101) at org.elasticsearch.spark.sql.ElasticsearchRelation.insert(DefaultSource.scala:620) at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:107) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270) at com.eoi.jax.job.spark.sink.ElasticsearchSinkDFJob.build(ElasticsearchSinkDFJob.scala:26) at com.eoi.jax.job.spark.sink.ElasticsearchSinkDFJob.build(ElasticsearchSinkDFJob.scala:18) at com.eoi.jax.core.SparkJobDAGBuilder.buildResultsOfSink(SparkJobDAGBuilder.java:284) ... 13 more Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Exchange hashpartitioning(template_id#166, 200) +- SortAggregate(key=[template_id#166], functions=[partial_last(template_content#167, false), partial_last(highlight_index_template#169, false), partial_last(template_pattern#170, false)], output=[template_id#166, last#476, valueSet#477, last#478, valueSet#479, last#480, valueSet#481]) +- *(1) Sort [template_id#166 ASC NULLS FIRST], false, 0 +- InMemoryTableScan [template_id#166, template_content#167, highlight_index_template#169, template_pattern#170] +- InMemoryRelation [@hostname#160, @message#161, @path#162, @rownumber#163L, @timestamp#164, _metadata#165, template_id#166, template_content#167, highlight_index#168, highlight_index_template#169, template_pattern#170, @@oId#171, @@oIndex#172, @@oEs#173, @@extractedVars#174], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) SerializeFromObject [if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, @hostname), StringType), true, false) AS @hostname#160, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, @message), StringType), true, false) AS @message#161, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, @path), StringType), true, false) AS @path#162, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, @rownumber), LongType) AS @rownumber#163L, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, @timestamp), TimestampType), true, false) AS @timestamp#164, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else newInstance(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData) AS _metadata#165, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, template_id), StringType), true, false) AS template_id#166, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, template_content), StringType), true, false) AS template_content#167, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, highlight_index), StringType), true, false) AS highlight_index#168, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, highlight_index_template), StringType), true, false) AS highlight_index_template#169, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 10, template_pattern), StringType), true, false) AS template_pattern#170, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 11, @@oId), StringType), true, false) AS @@oId#171, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 12, @@oIndex), StringType), true, false) AS @@oIndex#172, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, @@oEs), StringType), true, false) AS @@oEs#173, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 14, @@extractedVars), StringType), true, false) AS @@extractedVars#174] +- *(1) MapElements <function1>, obj#159: org.apache.spark.sql.Row +- *(1) DeserializeToObject createexternalrow(@hostname#0.toString, @message#127.toString, @path#2.toString, @rownumber#3L, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, @timestamp#12, true, false), staticinvoke(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData$, ObjectType(interface scala.collection.Map), toScalaMap, staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(MapObjects_loopValue46, MapObjects_loopIsNull46, StringType, lambdavariable(MapObjects_loopValue46, MapObjects_loopIsNull46, StringType, true).toString, _metadata#5.keyArray, None).array, true, false), staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(MapObjects_loopValue47, MapObjects_loopIsNull47, StringType, lambdavariable(MapObjects_loopValue47, MapObjects_loopIsNull47, StringType, true).toString, _metadata#5.valueArray, None).array, true, false), true, false), template_id#19.toString, template_content#27.toString, highlight_index#36.toString, highlight_index_template#46.toString, template_pattern#57.toString, @@oId#69.toString, @@oIndex#82.toString, @@oEs#96.toString, @@extractedVars#111.toString, StructField(@hostname,StringType,true), StructField(@message,StringType,true), StructField(@path,StringType,true), StructField(@rownumber,LongType,true), StructField(@timestamp,TimestampType,true), StructField(_metadata,MapType(StringType,StringType,true),true), StructField(template_id,StringType,false), StructField(template_content,StringType,false), StructField(highlight_index,StringType,false), ... 6 more fields), obj#158: org.apache.spark.sql.Row +- *(1) Project [@hostname#0, substring(@message#1, 0, 1000) AS @message#127, @path#2, @rownumber#3L, UDF(@timestamp#4, yyyy-MM-dd'T'HH:mm:ss.SSSZ) AS @timestamp#12, _metadata#5, -3 AS template_id#19, AS template_content#27, AS highlight_index#36, AS highlight_index_template#46, AS template_pattern#57, AS @@oId#69, AS @@oIndex#82, 192.168.101.65:9200 AS @@oEs#96, AS @@extractedVars#111] +- *(1) Scan ElasticsearchRelation(Map(es.query -> {"query":{"bool":{"must":[{"range":{"@timestamp":{"gte":"2020-04-01T00:00:00.000+0800"}}},{"range":{"@timestamp":{"lte":"2021-05-31T00:00:00.000+0800"}}}]}}}, es.resource.read -> jiankong.data_eoi_2021_04/, es.read.field.include -> @timestamp,@message,@@oId,@@oIndex,@@oEs,@rownumber,@path,@hostname, es.resource -> jiankong.data_eoi_2021_04/, es.read.metadata -> true, es.nodes -> http://192.168.101.65:9200, es.scroll.size -> 5000),org.apache.spark.sql.SQLContext@62ded874,None) [@rownumber#3L,@message#1,@timestamp#4,@hostname#0,@path#2,_metadata#5] PushedFilters: [], ReadSchema: struct<@rownumber:bigint,@message:string,@timestamp:timestamp,@hostname:string,@path:string,_meta... at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391) at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1.apply(SortAggregateExec.scala:77) at org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1.apply(SortAggregateExec.scala:75) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) ... 57 more Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: No data nodes with HTTP-enabled available at org.elasticsearch.hadoop.rest.InitializationUtils.filterNonDataNodesIfNeeded(InitializationUtils.java:159) at org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:224) at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions$lzycompute(AbstractEsRDD.scala:79) at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions(AbstractEsRDD.scala:78) at org.elasticsearch.spark.rdd.AbstractEsRDD.getPartitions(AbstractEsRDD.scala:48) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:94) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:321) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:91) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) ... 76 more
原因:
低版本的es的node有个角色是data,高版本的node角色又多了data-hot和data-cold之类的属性。低版本的elasticsearch-hadoop判断依据是 "data".equals(node),而高版本的判断条件是node.contains("data")。
解决方案:
spark-es配置 es.nodes.data.only:false 即可
原文地址:https://www.cnblogs.com/zhouwenyang/p/14637026.html
- Centos6.X 下安装并使用VNC的操作记录
- Linux系统是否被植入木马的排查流程梳理
- 添加php的memcached扩展模块
- Android TextView中显示图片
- Nginx配置中的log_format用法梳理(设置详细的日志格式)
- 分享一个刷网页PV的python小脚本
- mysql完整备份时过滤掉某些库
- Jquery 结合Json控制Select下拉框
- ExtJs学习笔记(23)-ScriptTagProxy+XTemplate+WCF跨域取数据
- Centos7.2下Jumpserver V4.0环境安装部署记录
- 利用JQuery实现更简单的Ajax跨域请求
- 运维工作中sed常规操作命令梳理
- linux下安装php的imagick扩展模块(附php升级脚本)
- 用JS + WCF打造轻量级WebPart
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 如何理解python对象
- Virtualenv 搭建 Py项目运行环境的教程详解
- python字符串的index和find的区别详解
- 浅谈Python 参数与变量
- 宝塔面板成功部署Django项目流程(图文)
- Python celery原理及运行流程解析
- Python Scrapy图片爬取原理及代码实例
- Python-for循环的内部机制
- 解决Tensorflow2.0 tf.keras.Model.load_weights() 报错处理问题
- 基于python实现模拟数据结构模型
- keras的siamese(孪生网络)实现案例
- 浅谈cv2.imread()和keras.preprocessing中的image.load_img()区别
- Python数据可视化图实现过程详解
- Python matplotlib 绘制双Y轴曲线图的示例代码
- keras 读取多标签图像数据方式