spark教程(二)-python基础编程
hadoop 是 java 开发的,原生支持 java;spark 是 scala 开发的,原生支持 scala;
spark 还支持 java、python、R,本文只介绍 python
spark 1.x 和 spark 2.x 用法略有不同,spark 1.x 的用法大部分也适用于 spark 2.x
Pyspark
python + spark,简单来说,想用 python 操作 spark,就必须用 pyspark 模块
RDD
spark 最重要的一个概念叫 RDD,Resilient Distributed Dataset,弹性分布式数据集 【本文只做简单了解,后面会专门写一篇博客详细介绍】
RDD 可以从 hadoop 获取数据,也可以从其他地方获取数据,也可以从一种 RDD 转换成 另一种 RDD;
RDD 支持两种类型的操作:transformations 和 actions
transformations:转换,就是从一种 RDD 转换成 另一种 符合 要求 的 RDD,类似于 map
actions:行动,执行计算,类似于 reduce,返回结果
值得注意的是,transformations 是惰性计算,也就是说 transformations 并不立即执行,只有 actions 时,才会执行 transformations
这使得 spark 高效,以 map-reduce 为例,如果我们 map 并不是目的,reduce 才是目的,那么 map 之后 就计算的话,输出存在哪里呢?
如果存在文件,一浪费时间,二浪费地方,如果我们 reduce 时才执行 map,直接把 map 的庞大输出存入内存即可,甚至 流式 传给 reduce,非常高效。
DataFrame
dataframe 类似于 pandas 中的 dataframe,也就是 表格数据
这是 spark2.x 中新增的数据格式,由 SparkSession 直接读取,不管文件是什么类型,txt也好,csv也罢,输出格式都是 dataframe
而 SparkContext 不管读什么文件,输出格式都是 RDD
>>> spark.read.text('README.md') DataFrame[value: string] # dataframe 的属性 ['__class__', '__delattr__', '__dict__', '__doc__', '__format__', '__getattr__', '__getattribute__', '__getitem__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__reduce_ ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_collectAsArrow', '_jcols', '_jdf', '_jmap', '_jseq', '_lazy_rdd', '_repr_html_', '_sc', '_schema',
'_sort_cols', '_support_repr_html', 'agg', 'alias', 'approxQuantile', 'cache', 'checkpoint', 'coalesce', 'colRegex', 'collect', 'columns', 'corr', 'count', 'cov', 'createGlobalTempView',
'createOrReplaceGlobalTempView', 'createOrReplaceTempView', 'createTempView', 'crossJoin', 'crosstab', 'cube', 'describe', 'distinct', 'drop', 'dropDuplicates', 'drop_duplicates', 'dropna',
'dtypes', 'exceptAll', 'explain', 'fillna', 'filter', 'first', 'foreach', 'foreachPartition', 'freqItems', 'groupBy', 'groupby', 'head', 'hint', 'intersect', 'intersectAll', 'isLocal',
'isStreaming', 'is_cached', 'join', 'limit', 'localCheckpoint', 'na', 'orderBy', 'persist', 'printSchema', 'randomSplit', 'rdd', 'registerTempTable', 'repartition', 'repartitionByRange',
'replace', 'rollup', 'sample', 'sampleBy', 'schema', 'select', 'selectExpr', 'show', 'sort', 'sortWithinPartitions', 'sql_ctx', 'stat', 'storageLevel', 'subtract', 'summary', 'take', 'toDF',
'toJSON', 'toLocalIterator', 'toPandas', 'union', 'unionAll', 'unionByName', 'unpersist', 'where', 'withColumn', 'withColumnRenamed', 'withWatermark', 'write', 'writeStream']
datafram 的操作不同于 RDD,类似于 pandas
SparkSession and SparkContext
SparkSession 是 spark2.x 引入的新概念,SparkSession 为用户提供统一的切入点,字面理解是创建会话,或者连接 spark
在 spark1.x 中,SparkContext 是 spark 的主要切入点,由于 RDD 作为主要的 API,我们通过 SparkContext 来创建和操作 RDD,
这个问题在于:
1. 不同的应用中,需要使用不同的 context,在 Streaming 中需要使用 StreamingContext,在 sql 中需要使用 sqlContext,在 hive 中需要使用 hiveContext,比较麻烦
2. 随着 DataSet 和 DataFrame API 逐渐成为标准 API,需要为他们创建接入点,即 SparkSession
SparkSession 实际上封装了 SparkContext,另外也封装了 SparkConf、sqlContext,随着版本增加,可能更多,
所以我们尽量使用 SparkSession ,如果发现有些 API 不在 SparkSession 中,也可以通过 SparkSession 拿到 SparkContext 和其他 Context 等
在 shell 操作中,原生创建了 SparkSession,故无需再创建,创建了也不会起作用
在 shell 中,SparkContext 叫 sc,SparkSession 叫 spark
通过 spark 拿到 sc
>>> dir(spark) ['Builder', '__class__', '__delattr__', '__dict__', '__doc__', '__enter__', '__exit__', '__format__', '__getattribute__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__r educe_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_convert_from_pandas', '_createFromLocal', '_createFromRDD', '_create_from_pandas_with_arrow',
'_create_shell_session', '_get_numpy_record_dtype', '_inferSchema', '_inferSchemaFromList', '_instantiatedSession', '_jsc', '_jsparkSession', '_jvm', '_jwrapped', '_repr_html_', '_sc', '_wrapped',
'builder', 'catalog', 'conf', 'createDataFrame', 'newSession', 'range', 'read', 'readStream', 'sparkContext', 'sql', 'stop', 'streams', 'table', 'udf', 'version'] spark.sparkContext # 即 sc
简单操作
>>> sc.textFile('README.md').count() >>> spark.read.text('README.md').count()
shell 操作
spark 支持 shell 操作,支持各种语言的 shell,包括 scala shell、python shell、R shell、SQL shell 等
启动 python shell 模式的命令解析
[root@hadoop10 spark]# bin/pyspark --help Usage: ./bin/pyspark [options] Options: --master MASTER_URL spark://host:port, mesos://host:port, yarn, # 设定 master,即在哪里运行 spark, # mesos://host:port一般不用;yarn需要把spark部署到yarn上 k8s://https://host:port, or local (Default: local[*]). # local 本地模式,local 表示单线程,local[num]表示num个进程, # local[*]表示服务器cpu是几核就是几个进程 --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or on one of the worker machines inside the cluster ("cluster") (Default: client). --class CLASS_NAME Your application's main class (for Java / Scala apps). # 要执行的 class 类名 --name NAME A name of your application. --jars JARS Comma-separated list of jars to include on the driver and executor classpaths. --packages Comma-separated list of maven coordinates of jars to include # 逗号隔开的 maven 列表,给 当前会话 添加依赖 on the driver and executor classpaths. Will search the local maven repo, then maven central and any additional remote repositories given by --repositories. The format for the coordinates should be groupId:artifactId:version. --exclude-packages Comma-separated list of groupId:artifactId, to exclude while resolving the dependencies provided in --packages to avoid dependency conflicts. --repositories Comma-separated list of additional remote repositories to search for the maven coordinates given with --packages. --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place # 逗号隔开的 文件列表,替代 PYTHONPATH 的作用,也就是说如果不设置 PYTHONPATH,就需要这个参数,才能导入 文件中的模块 on the PYTHONPATH for Python apps. --files FILES Comma-separated list of files to be placed in the working directory of each executor. File paths of these files in executors can be accessed via SparkFiles.get(fileName). --conf PROP=VALUE Arbitrary Spark configuration property. --properties-file FILE Path to a file from which to load extra properties. If not specified, this will look for conf/spark-defaults.conf. --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 1024M). --driver-java-options Extra Java options to pass to the driver. --driver-library-path Extra library path entries to pass to the driver. --driver-class-path Extra class path entries to pass to the driver. Note that jars added with --jars are automatically included in the classpath. --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G). --proxy-user NAME User to impersonate when submitting the application. This argument does not work with --principal / --keytab. --help, -h Show this help message and exit. --verbose, -v Print additional debug output. --version, Print the version of current Spark. Cluster deploy mode only: --driver-cores NUM Number of cores used by the driver, only in cluster mode (Default: 1). Spark standalone or Mesos with cluster deploy mode only: --supervise If given, restarts the driver on failure. --kill SUBMISSION_ID If given, kills the driver specified. --status SUBMISSION_ID If given, requests the status of the driver specified. Spark standalone and Mesos only: --total-executor-cores NUM Total cores for all executors. Spark standalone and YARN only: --executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode, or all available cores on the worker in standalone mode) YARN-only: --queue QUEUE_NAME The YARN queue to submit to (Default: "default"). --num-executors NUM Number of executors to launch (Default: 2). If dynamic allocation is enabled, the initial number of executors will be at least NUM. --archives ARCHIVES Comma separated list of archives to be extracted into the working directory of each executor. --principal PRINCIPAL Principal to be used to login to KDC, while running on secure HDFS. --keytab KEYTAB The full path to the file that contains the keytab for the principal specified above. This keytab will be copied to the node running the Application Master via the Secure Distributed Cache, for renewing the login tickets and the delegation tokens periodically.
进入 shell 模式
[root@hadoop10 spark]# bin/pyspark Python 2.7.12 (default, Oct 2 2019, 19:43:15) [GCC 4.4.7 20120313 (Red Hat 4.4.7-4)] on linux2 Type "help", "copyright", "credits" or "license" for more information. 19/10/09 18:10:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.4.4 /_/ Using Python version 2.7.12 (default, Oct 2 2019 19:43:15) SparkSession available as 'spark'. # 自带 spark
shell 模式可以通过 http://192.168.10.10:4040 查看任务
shell 操作语法与 脚本 相同,示例如下
>>> distFile = sc.textFile('README.md') >>> distFile.map(lambda x: len(x)).reduce(lambda a, b: a + b) 3847 >>> distFile.count() 105
Python 编程基本语法
1. 首先创建 SparkSession
1.1 在 spark1.x 中是创建 SparkContext
class SparkContext(__builtin__.object): def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=<cl ass 'pyspark.profiler.BasicProfiler'>) '''Create a new SparkContext. At least the master and app name should be set, | either through the named parameters here or through C{conf}. | | :param master: Cluster URL to connect to | (e.g. mesos://host:port, spark://host:port, local[4]). local 表示本地运行,4 表示使用4个 cpu核 | :param appName: A name for your job, to display on the cluster web UI. | :param sparkHome: Location where Spark is installed on cluster nodes. | :param pyFiles: Collection of .zip or .py files to send to the cluster | and add to PYTHONPATH. These can be paths on the local file | system or HDFS, HTTP, HTTPS, or FTP URLs. | :param environment: A dictionary of environment variables to set on | worker nodes. | :param batchSize: The number of Python objects represented as a single | Java object. Set 1 to disable batching, 0 to automatically choose | the batch size based on object sizes, or -1 to use an unlimited | batch size | :param serializer: The serializer for RDDs. | :param conf: A L{SparkConf} object setting Spark properties. | :param gateway: Use an existing gateway and JVM, otherwise a new JVM | will be instantiated. | :param jsc: The JavaSparkContext instance (optional). | :param profiler_cls: A class of custom Profiler used to do profiling | (default is pyspark.profiler.BasicProfiler). | | | >>> from pyspark.context import SparkContext | >>> sc = SparkContext('local', 'test') | | >>> sc2 = SparkContext('local', 'test2') # doctest: +IGNORE_EXCEPTION_DETAIL | Traceback (most recent call last): | ... | ValueError:...'''
示例如下
from pyspark import SparkContext, SparkConf conf = SparkConf().setAppName('myapp1').setMaster('local[4]') # 设定 appname 和 master sc = SparkContext(conf=conf) ## 或者直接这样 sc = SparkContext("spark://hadoop10:7077")
1.2 在 spark2.x 中创建 SparkSession
class SparkSession(__builtin__.object): def __init__(self, sparkContext, jsparkSession=None): ''' Creates a new SparkSession. | | >>> from datetime import datetime | >>> spark = SparkSession(sc) | >>> allTypes = sc.parallelize([Row(i=1, s="string", d=1.0, l=1, | ... b=True, list=[1, 2, 3], dict={"s": 0}, row=Row(a=1), | ... time=datetime(2014, 8, 1, 14, 1, 5))]) | >>> df = allTypes.toDF() | >>> df.createOrReplaceTempView("allTypes") | >>> spark.sql('select i+1, d+1, not b, list[1], dict["s"], time, row.a ' | ... 'from allTypes where b and i > 0').collect() | [Row((i + CAST(1 AS BIGINT))=2, (d + CAST(1 AS DOUBLE))=2.0, (NOT b)=False, list[1]=2, dict[s]=0, time=datetime.datetime(2014, 8, 1, 14, 1, 5), a=1)] | >>> df.rdd.map(lambda x: (x.i, x.s, x.d, x.l, x.b, x.time, x.row.a, x.list)).collect() | [(1, u'string', 1.0, 1, True, datetime.datetime(2014, 8, 1, 14, 1, 5), 1, [1, 2, 3])]'''
示例如下
from pyspark.sql import SparkSession conf = SparkConf().setAppName('myapp1').setMaster('local[4]') # 设定 appname 和 master sess = SparkSession.builder.config(conf=conf) ## 或者这样 from pyspark import SparkContext, SparkConf conf = SparkConf().setAppName('myapp1').setMaster('local[4]') # 设定 appname 和 master sc = SparkContext(conf=conf) sess = SparkSession(sc)
2. 然后创建 RDD
spark 是以 RDD 概念为中心运行的,RDD 是一个容错的、可以被并行操作的元素集合。
创建 RDD 有两种方式:
1. 在驱动程序中并行化一个已经存在的集合 【内存中的数据】
2. 从外部存储系统引入数据,生成 RDD 【外部存储介质中的数据,注意 spark 本身没有存储功能】
// 这个存储系统可以是一个共享文件系统,如 hdfs、hbase
2.1 并行化数据集合
并行化集合是通过在一个 迭代器或者集合 上 调用 SparkContext 的 parallelize 方法生成的 【内存中的数据】
data = range(10000) distData = sc.parallelize(data) distData.reduce(lambda a, b: a+b)
为了创建一个能并行操作的分布式数据集,所有元素都将被拷贝;
然后我们可以调用 reduce 进行叠加,累计求和
并行化集合时一个重要的参数是将数据集切分的数量。一个切片对应一个 spark 任务,我们可指定切片数量
distData2 = sc.parallelize(data, 100) # 切 100 分
2.2 外部数据集
由于 spark 本身没有存储功能,一般是从 本地文件、hadoop 等获取外部数据集
2.2.1 本地文件
textFile(name, minPartitions=None, use_unicode=True)[source] Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings. If use_unicode is False, the strings will be kept as str (encoding as utf-8), which is faster and smaller than unicode. (Added in Spark 1.2)
minPartitions 指定分区数,在 hdfs 中,文件本身就是 以 block 存储的,此时这个 分区数 不能小于 block 数
示例代码
distFile = sc.textFile('README.md') distFile = sc.textFile('xx.csv') distFile = sc.textFile('logs') # 支持文件夹 # textFile("/my/directory/*.txt") # 支持通配符 # textFile("/my/directory/*.gz") # 支持压缩文件 type(sc.textFile('README.md')) # <class 'pyspark.rdd.RDD'> distFile.map(lambda x: int(x[-1])).reduce(lambda a, b: a + b) distFile.map(lambda x: len(x)).reduce(lambda a, b: a + b) # map 的输入是每一行,map 的作用是求每一行的 len # reduce 的输入是两个数字,reduce 的作用是求这两个数的和, # 也就是把 所有行的 len 逐次求当前元素(当前累计 len 的和)与下一元素的和
读取文件后自动生成 RDD;
各种读取方式都支持 多种 文件格式,如 文件夹,通配符、压缩文件
2.2.2 批量读取本地文件
distFile = sc.wholeTextFiles('files') # 读取 files 文件夹下所有文件内容,返回 (filename, filecontent) 键值对
输入必须是 路径
3. 操作 RDD
RDD 的操作有两种方式:转换 和 行动,而且 转换 是 惰性的
可以根据 是否有返回 判断是哪个操作,行动 有返回值,转换无返回值
详见官网 RDD
3.1 RDD 缓存
我们可以把 RDD 缓存到 内存中, 这其实就是 行动 操作
distFile = sc.textFile('README.md') m = distFile.map(lambda x: len(x)) # map 是 转换 操作,并不立即执行 m.cache() # 把 map 的输出缓存到内存中,其实 cache 就是 执行 操作
或者 m.persist()
3.2 转换 操作
惰性,无返回值
map(func[, preservesPartitioning=False]):把一个序列中的元素逐个送入 map,经 func 处理后,返回一个新的 序列
rdd = sc.parallelize([2, 3, 4]) rdd.map(lambda x: x + 1).collect() # [3, 4, 5]
filter(func):类似 map,func 是个过滤函数
rdd = sc.parallelize([2, 3, 4]) rdd.map(lambda x: x > 3).collect() # [False, False, True]
flatMap(func[, preservesPartitioning=False]):也类似 map,只是 它会把 每次经过 func 处理的结果进行 合并,输入和输出的 list 长度可能不同
rdd = sc.parallelize([2, 3, 4]) rdd.flatMap(lambda x: range(1, x)).collect() # [1, 1, 2, 1, 2, 3] # range(1, 2): 1 # range(1, 3): 1, 2 # range(1, 4): 1, 2, 3 ### vs map rdd.map(lambda x: range(1, x)).collect() # [[1], [1, 2], [1, 2, 3]]
mapPartitions(func [, preservesPartitioning=False]) :map的一个变种,map 是把序列的单个元素送入 func ,而 mapPartitions 是把 序列分区后 每个 分区 整体送入 func
rdd = sc.parallelize([1,2,3,4,5], 3) # 分 3 个区 def f(iterator): yield sum(iterator) # 必须是生成器,即 yield,不能 return rdd.mapPartitions(f).collect() # [1, 5, 9]
mapPartitionsWithIndex(func [, preservesPartitioning=False]) :func 有两个参数,分片的序号 和 迭代器,返回 分片序号,也必须是 迭代器
rdd = sc.parallelize(range(15), 13) # 分 13 个区 def f(splitIndex, iterator): yield splitIndex rdd.mapPartitionsWithIndex(f).collect() # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
cartesian(otherDataset):利用两个 序列 生成 笛卡尔內积 的数据集
x = sc.parallelize([1,2,3]) y = sc.parallelize([4,5]) x.cartesian(y).collect() # [(1, 4), (1, 5), (2, 4), (2, 5), (3, 4), (3, 5)]
以下方法只适用 key-value 数据
reduceByKey(func [, numPartitions=None, partitionFunc=<function portable_hash at 0x7fa664f3cb90>]):针对 k-v 对的处理方法,把 key 相同的 value 进行 reduce,然后重新组成 key-reduce 对
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) def f(x, y): return x + y rdd.reduceByKey(f).collect() # [('a', 2), ('b', 1)]
sortByKey([ascending=True, numPartitions=None, keyfunc=<function <lambda> at 0x7fa665048c80>]):根据 key 进行排序,默认升序,numPartitions 代表分区数,keyfunc 是处理 key 的,在 排序过程中对 key 进行处理
tmp = [('a', 4), ('b', 3), ('c', 2), ('D', 1)] sc.parallelize(tmp).sortByKey(True, 1).collect() # 升序[('D', 1), ('a', 4), ('b', 3), ('c', 2)] 1代表分区数 sc.parallelize(tmp).sortByKey(True, 2, keyfunc=lambda k:k.lower()).collect() # 升序[('a', 4), ('b', 3), ('c', 2), ('D', 1)] D跑到后面了 sc.parallelize(tmp).sortByKey(False, 2, keyfunc=lambda k:k.lower()).collect()# 降序[('D', 1), ('c', 2), ('b', 3), ('a', 4)]
keyfunc 只在 排序过程中起作用,在输出时 keyfunc 不起作用
join(otherDataset [, numPartitions=None]):将 两个 k-v RDD 中 共有的 key 的 value 交叉组合
x = sc.parallelize([("a", 1), ("b", 4)]) y = sc.parallelize([("a", 2), ("a", 3)]) x.join(y).collect() # [('a', (1, 2)), ('a', (1, 3))]
3.3 行动 操作
有返回值
collect:返回 RDD 中的数据
count:返回 RDD 中元素个数
first:返回 RDD 中第一个元素
max. min.sum:不解释
take(n):返回 RDD 中 前 n 个元素
takeOrdered(n [, key=None]):对 RDD 先进行排序,然后取排序后的 前 n 个数据,key 表示先经过 keyfunc 处理后再进行排序,最终返回的还是原数据
sc.parallelize([9,7,3,2,6,4]).takeOrdered(3) # [2, 3, 4] sc.parallelize([9,7,3,2,6,4]).takeOrdered(3, key=lambda x: -x) # [9, 7, 6] ## 过程如下 # 9, 7, 3, 2, 6, 4 ## 原数据 # -9, -7, -3, -2, -6, -4 ## 经过 keyfunc 处理后的数据 # -9, -7, -6, -4, -3, -2 ## 对处理后的数据升序排序 # -9, -7, -6 ## 取前3个 # 9, 7, 6 ## 对应到原数据
也就是说,keyfunc 只在排序时起作用,在输出时不起作用
foreach(func):运行 func 函数 并行处理 RDD 的所有元素
sc.parallelize([1, 2, 3, 4, 5]).foreach(print) # 并行打印,不按顺序输出 # 1 # 2 # 4 # 5 # 3
reduce(func):把 RDD 中前两个元素送入 func,得到一个 value,把这个 value 和 下一个元素 送入 func,直至最后一个元素
sc.parallelize([1,2,3,4,5]).reduce(lambda x, y: x + y) # 15 求和
fold:与 reduce 类似,fold 是有一个 基数,然后 把每个元素 和 基数 送入 func,然后替换该基数,循环,直到最后一个元素
x = sc.parallelize([1,2,3]) neutral_zero_value = 0 # 0 for sum, 1 for multiplication y = x.fold(neutral_zero_value, lambda obj, accumulated: accumulated + obj) # computes cumulative sum print(x.collect()) # [1,2,3] print(y) # 6
aggregate:对每个分区进行聚合,然后聚合每个分区的聚合结果,详见我的博客 aggregate
countByValue:统计相同元素的个数
sc.parallelize([1,2,3,1,2,5,3,2,3,2]).countByValue().items() # [(1, 2), (2, 4), (3, 3), (5, 1)] # 输入 k-v 不按 value 统计,按 k-v 统计 sc.parallelize([('a', 1), ('b', 1)]).countByValue().items() # [(('a', 1), 1), (('b', 1), 1)]
saveAsTextFile(path [, compressionCodecClass=None]):把 RDD 存储到文件系统中
counts.saveAsTextFile('/usr/lib/spark/out')
输入必须是 路径,且该路径不能事先存在
以下方法只适用 key-value 数据
countByKey:统计相同 key 的个数,返回 key-count
sc.parallelize([("a",1), ("b",1), ("a", 3)]).countByKey() # defaultdict(<type 'int'>, {'a': 2, 'b': 1}) dictdata= sc.parallelize([("a",1), ("b",1), ("a", 3)]).countByKey() dictdata.items() # [('a', 2), ('b', 1)]
篇幅太大,到这先告一段落。
Python 脚本
如何运行 python 脚本?如何 在 python 中 调用 spark?,这两个问题答案相同。
首先需要配置 /etc/profile
# python can call pyspark directly export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/pyspark:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH
python 的搜索路径 ,加上 spark 中 python 和 pyspark,以及 py4j-0.10.4-src.zip,他的作用是 负责 python 和 java 之间的 转换。
python 脚本 test1.py
from __future__ import print_function from pyspark import * import os print(os.environ['SPARK_HOME']) print(os.environ['HADOOP_HOME']) if __name__ == '__main__': sc = SparkContext("spark://hadoop10:7077") rdd = sc.parallelize("hello Pyspark world".split(' ')) counts = rdd.map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a + b) counts.saveAsTextFile('/usr/lib/spark/out') counts.foreach(print) sc.stop()
命令行执行
bin/spark-submit test1.py
或者 之间运行 py 文件
python test1.py
脚本模式 通过 http://192.168.10.10:8080/ 查看任务
先写到这吧,太长了,其他内容新写博客吧
参考资料:
https://www.cnblogs.com/yangzhang-home/p/6056133.html 快速入门
https://blog.csdn.net/kl28978113/article/details/80361452 较全教程
http://spark.apache.org/docs/latest/ spark 2.4.4 官网
http://spark.apache.org/docs/latest/api/python/index.html spark 2.4.4 python API
https://www.cnblogs.com/Vito2008/p/5216324.html
https://blog.csdn.net/proplume/article/details/79798289
https://blog.csdn.net/qq_21383435/article/details/77371142 spark学习-SparkSQL--12-SparkSession与SparkContext
https://www.iteblog.com/archives/1396.html#aggregate RDD 操作 API
https://www.cnblogs.com/yxpblog/p/5269314.html RDD 操作 API
原文地址:https://www.cnblogs.com/yanshw/p/11620204.html
- 绝对定位下的盒模型
- 运行shell脚本时报错"[[ : not found"解决方法
- 关于表联结方法(二) (r4笔记第23天)
- Spring+SpringMVC+MyBatis+easyUI整合进阶篇(一)设计一套好的RESTful API
- XSS分析及预防
- 关于ORA-01779问题的分析和解决 (r4笔记第22天)
- 想看爱奇艺VIP视频?一个python脚本帮你搞定
- Spring+SpringMVC+MyBatis+easyUI整合进阶篇(十四)Redis缓存正确的使用姿势
- 关于shell中的pl/sql脚本错误排查与分析(r4笔记第21天)
- 关于BFC不会被浮动元素遮盖的一些解释
- MyBatis + MySQL返回插入成功后的主键id
- struts2+spring+hibernate整合步骤(1)
- 微信公众号问题:{"errcode":40125,"errmsg":"invalid appsecret, view more at http://t.cn/LOEdzVq, hints: [
- reflow和repaint(摘录自张鑫旭的翻译)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 10大高性能开发宝石,我要消灭一半程序员!
- 面试官:你说你会RabbitMQ,那聊聊它的交换机(Exchange)吧
- Kubeadm 1.9 HA 高可用集群本地离线镜像部署【已验证】
- kubernetes(k8s)集群安装calico
- kubernetes(k8s) Prometheus+grafana监控告警安装部署
- 基于OpencvCV的情绪检测
- 设计模式 之 抽象工厂模式
- Angular应用里HTTP请求的错误处理
- 使用npm安装TypeScript
- TypeScript的interface关键字
- TypeScript的class关键字
- TypeScript里一些特殊的类型
- TypeScript的类型断言,有点像ABAP的强制类型转换
- 什么是TypeScript的字符串索引签名
- [初探] proxy 的优势与使用场景