【spark】读取高版本的elasticsearch

时间:2021-08-19
本文章向大家介绍【spark】读取高版本的elasticsearch,主要包括【spark】读取高版本的elasticsearch使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

出现异常栈:

Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
SortAggregate(key=[template_id#166], functions=[last(template_content#167, false), last(highlight_index_template#169, false), last(template_pattern#170, false)], output=[template_id#166, template_content#281, highlight_index#283, template_pattern#285])
+- *(2) Sort [template_id#166 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(template_id#166, 200)
      +- SortAggregate(key=[template_id#166], functions=[partial_last(template_content#167, false), partial_last(highlight_index_template#169, false), partial_last(template_pattern#170, false)], output=[template_id#166, last#476, valueSet#477, last#478, valueSet#479, last#480, valueSet#481])
         +- *(1) Sort [template_id#166 ASC NULLS FIRST], false, 0
            +- InMemoryTableScan [template_id#166, template_content#167, highlight_index_template#169, template_pattern#170]
                  +- InMemoryRelation [@hostname#160, @message#161, @path#162, @rownumber#163L, @timestamp#164, _metadata#165, template_id#166, template_content#167, highlight_index#168, highlight_index_template#169, template_pattern#170, @@oId#171, @@oIndex#172, @@oEs#173, @@extractedVars#174], StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- *(1) SerializeFromObject [if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, @hostname), StringType), true, false) AS @hostname#160, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, @message), StringType), true, false) AS @message#161, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, @path), StringType), true, false) AS @path#162, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, @rownumber), LongType) AS @rownumber#163L, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, @timestamp), TimestampType), true, false) AS @timestamp#164, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else newInstance(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData) AS _metadata#165, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, template_id), StringType), true, false) AS template_id#166, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, template_content), StringType), true, false) AS template_content#167, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, highlight_index), StringType), true, false) AS highlight_index#168, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, highlight_index_template), StringType), true, false) AS highlight_index_template#169, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 10, template_pattern), StringType), true, false) AS template_pattern#170, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 11, @@oId), StringType), true, false) AS @@oId#171, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 12, @@oIndex), StringType), true, false) AS @@oIndex#172, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, @@oEs), StringType), true, false) AS @@oEs#173, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 14, @@extractedVars), StringType), true, false) AS @@extractedVars#174]
                           +- *(1) MapElements <function1>, obj#159: org.apache.spark.sql.Row
                              +- *(1) DeserializeToObject createexternalrow(@hostname#0.toString, @message#127.toString, @path#2.toString, @rownumber#3L, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, @timestamp#12, true, false), staticinvoke(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData$, ObjectType(interface scala.collection.Map), toScalaMap, staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(MapObjects_loopValue46, MapObjects_loopIsNull46, StringType, lambdavariable(MapObjects_loopValue46, MapObjects_loopIsNull46, StringType, true).toString, _metadata#5.keyArray, None).array, true, false), staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(MapObjects_loopValue47, MapObjects_loopIsNull47, StringType, lambdavariable(MapObjects_loopValue47, MapObjects_loopIsNull47, StringType, true).toString, _metadata#5.valueArray, None).array, true, false), true, false), template_id#19.toString, template_content#27.toString, highlight_index#36.toString, highlight_index_template#46.toString, template_pattern#57.toString, @@oId#69.toString, @@oIndex#82.toString, @@oEs#96.toString, @@extractedVars#111.toString, StructField(@hostname,StringType,true), StructField(@message,StringType,true), StructField(@path,StringType,true), StructField(@rownumber,LongType,true), StructField(@timestamp,TimestampType,true), StructField(_metadata,MapType(StringType,StringType,true),true), StructField(template_id,StringType,false), StructField(template_content,StringType,false), StructField(highlight_index,StringType,false), ... 6 more fields), obj#158: org.apache.spark.sql.Row
                                 +- *(1) Project [@hostname#0, substring(@message#1, 0, 1000) AS @message#127, @path#2, @rownumber#3L, UDF(@timestamp#4, yyyy-MM-dd'T'HH:mm:ss.SSSZ) AS @timestamp#12, _metadata#5, -3 AS template_id#19,  AS template_content#27,  AS highlight_index#36,  AS highlight_index_template#46,  AS template_pattern#57,  AS @@oId#69,  AS @@oIndex#82, 192.168.101.65:9200 AS @@oEs#96,  AS @@extractedVars#111]
                                    +- *(1) Scan ElasticsearchRelation(Map(es.query -> {"query":{"bool":{"must":[{"range":{"@timestamp":{"gte":"2020-04-01T00:00:00.000+0800"}}},{"range":{"@timestamp":{"lte":"2021-05-31T00:00:00.000+0800"}}}]}}}, es.resource.read -> jiankong.data_eoi_2021_04/, es.read.field.include -> @timestamp,@message,@@oId,@@oIndex,@@oEs,@rownumber,@path,@hostname, es.resource -> jiankong.data_eoi_2021_04/, es.read.metadata -> true, es.nodes -> http://192.168.101.65:9200, es.scroll.size -> 5000),org.apache.spark.sql.SQLContext@62ded874,None) [@rownumber#3L,@message#1,@timestamp#4,@hostname#0,@path#2,_metadata#5] PushedFilters: [], ReadSchema: struct<@rownumber:bigint,@message:string,@timestamp:timestamp,@hostname:string,@path:string,_meta...

    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
    at org.apache.spark.sql.execution.aggregate.SortAggregateExec.doExecute(SortAggregateExec.scala:75)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:89)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:3037)
    at org.apache.spark.sql.Dataset.rdd(Dataset.scala:3035)
    at org.elasticsearch.spark.sql.EsSparkSQL$.saveToEs(EsSparkSQL.scala:101)
    at org.elasticsearch.spark.sql.ElasticsearchRelation.insert(DefaultSource.scala:620)
    at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:107)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
    at com.eoi.jax.job.spark.sink.ElasticsearchSinkDFJob.build(ElasticsearchSinkDFJob.scala:26)
    at com.eoi.jax.job.spark.sink.ElasticsearchSinkDFJob.build(ElasticsearchSinkDFJob.scala:18)
    at com.eoi.jax.core.SparkJobDAGBuilder.buildResultsOfSink(SparkJobDAGBuilder.java:284)
    ... 13 more
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(template_id#166, 200)
+- SortAggregate(key=[template_id#166], functions=[partial_last(template_content#167, false), partial_last(highlight_index_template#169, false), partial_last(template_pattern#170, false)], output=[template_id#166, last#476, valueSet#477, last#478, valueSet#479, last#480, valueSet#481])
   +- *(1) Sort [template_id#166 ASC NULLS FIRST], false, 0
      +- InMemoryTableScan [template_id#166, template_content#167, highlight_index_template#169, template_pattern#170]
            +- InMemoryRelation [@hostname#160, @message#161, @path#162, @rownumber#163L, @timestamp#164, _metadata#165, template_id#166, template_content#167, highlight_index#168, highlight_index_template#169, template_pattern#170, @@oId#171, @@oIndex#172, @@oEs#173, @@extractedVars#174], StorageLevel(disk, memory, deserialized, 1 replicas)
                  +- *(1) SerializeFromObject [if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, @hostname), StringType), true, false) AS @hostname#160, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, @message), StringType), true, false) AS @message#161, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, @path), StringType), true, false) AS @path#162, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, @rownumber), LongType) AS @rownumber#163L, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, @timestamp), TimestampType), true, false) AS @timestamp#164, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else newInstance(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData) AS _metadata#165, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, template_id), StringType), true, false) AS template_id#166, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, template_content), StringType), true, false) AS template_content#167, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, highlight_index), StringType), true, false) AS highlight_index#168, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, highlight_index_template), StringType), true, false) AS highlight_index_template#169, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 10, template_pattern), StringType), true, false) AS template_pattern#170, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 11, @@oId), StringType), true, false) AS @@oId#171, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 12, @@oIndex), StringType), true, false) AS @@oIndex#172, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, @@oEs), StringType), true, false) AS @@oEs#173, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 14, @@extractedVars), StringType), true, false) AS @@extractedVars#174]
                     +- *(1) MapElements <function1>, obj#159: org.apache.spark.sql.Row
                        +- *(1) DeserializeToObject createexternalrow(@hostname#0.toString, @message#127.toString, @path#2.toString, @rownumber#3L, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, @timestamp#12, true, false), staticinvoke(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData$, ObjectType(interface scala.collection.Map), toScalaMap, staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(MapObjects_loopValue46, MapObjects_loopIsNull46, StringType, lambdavariable(MapObjects_loopValue46, MapObjects_loopIsNull46, StringType, true).toString, _metadata#5.keyArray, None).array, true, false), staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(MapObjects_loopValue47, MapObjects_loopIsNull47, StringType, lambdavariable(MapObjects_loopValue47, MapObjects_loopIsNull47, StringType, true).toString, _metadata#5.valueArray, None).array, true, false), true, false), template_id#19.toString, template_content#27.toString, highlight_index#36.toString, highlight_index_template#46.toString, template_pattern#57.toString, @@oId#69.toString, @@oIndex#82.toString, @@oEs#96.toString, @@extractedVars#111.toString, StructField(@hostname,StringType,true), StructField(@message,StringType,true), StructField(@path,StringType,true), StructField(@rownumber,LongType,true), StructField(@timestamp,TimestampType,true), StructField(_metadata,MapType(StringType,StringType,true),true), StructField(template_id,StringType,false), StructField(template_content,StringType,false), StructField(highlight_index,StringType,false), ... 6 more fields), obj#158: org.apache.spark.sql.Row
                           +- *(1) Project [@hostname#0, substring(@message#1, 0, 1000) AS @message#127, @path#2, @rownumber#3L, UDF(@timestamp#4, yyyy-MM-dd'T'HH:mm:ss.SSSZ) AS @timestamp#12, _metadata#5, -3 AS template_id#19,  AS template_content#27,  AS highlight_index#36,  AS highlight_index_template#46,  AS template_pattern#57,  AS @@oId#69,  AS @@oIndex#82, 192.168.101.65:9200 AS @@oEs#96,  AS @@extractedVars#111]
                              +- *(1) Scan ElasticsearchRelation(Map(es.query -> {"query":{"bool":{"must":[{"range":{"@timestamp":{"gte":"2020-04-01T00:00:00.000+0800"}}},{"range":{"@timestamp":{"lte":"2021-05-31T00:00:00.000+0800"}}}]}}}, es.resource.read -> jiankong.data_eoi_2021_04/, es.read.field.include -> @timestamp,@message,@@oId,@@oIndex,@@oEs,@rownumber,@path,@hostname, es.resource -> jiankong.data_eoi_2021_04/, es.read.metadata -> true, es.nodes -> http://192.168.101.65:9200, es.scroll.size -> 5000),org.apache.spark.sql.SQLContext@62ded874,None) [@rownumber#3L,@message#1,@timestamp#4,@hostname#0,@path#2,_metadata#5] PushedFilters: [], ReadSchema: struct<@rownumber:bigint,@message:string,@timestamp:timestamp,@hostname:string,@path:string,_meta...

    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
    at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1.apply(SortAggregateExec.scala:77)
    at org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1.apply(SortAggregateExec.scala:75)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    ... 57 more
Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: No data nodes with HTTP-enabled available
    at org.elasticsearch.hadoop.rest.InitializationUtils.filterNonDataNodesIfNeeded(InitializationUtils.java:159)
    at org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:224)
    at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions$lzycompute(AbstractEsRDD.scala:79)
    at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions(AbstractEsRDD.scala:78)
    at org.elasticsearch.spark.rdd.AbstractEsRDD.getPartitions(AbstractEsRDD.scala:48)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:94)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:321)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:91)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    ... 76 more

 原因:

低版本的es的node有个角色是data,高版本的node角色又多了data-hot和data-cold之类的属性。低版本的elasticsearch-hadoop判断依据是 "data".equals(node),而高版本的判断条件是node.contains("data")。

解决方案:

spark-es配置 es.nodes.data.only:false 即可

原文地址:https://www.cnblogs.com/zhouwenyang/p/14637026.html