spark教程(二)-python基础编程

时间:2019-10-11
本文章向大家介绍spark教程(二)-python基础编程,主要包括spark教程(二)-python基础编程使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

hadoop 是 java 开发的,原生支持 java;spark 是 scala 开发的,原生支持 scala;

spark 还支持 java、python、R,本文只介绍 python

spark 1.x 和 spark 2.x 用法略有不同,spark 1.x 的用法大部分也适用于 spark 2.x 

Pyspark

python + spark,简单来说,想用 python 操作 spark,就必须用 pyspark 模块

RDD

spark 最重要的一个概念叫 RDD,Resilient Distributed Dataset,弹性分布式数据集    【本文只做简单了解,后面会专门写一篇博客详细介绍】

RDD 可以从 hadoop 获取数据,也可以从其他地方获取数据,也可以从一种 RDD 转换成 另一种 RDD;

RDD 支持两种类型的操作:transformations 和 actions

transformations:转换,就是从一种 RDD 转换成 另一种 符合 要求 的 RDD,类似于 map

actions:行动,执行计算,类似于 reduce,返回结果

值得注意的是,transformations 是惰性计算,也就是说 transformations 并不立即执行,只有 actions 时,才会执行 transformations

这使得 spark 高效,以 map-reduce 为例,如果我们 map 并不是目的,reduce 才是目的,那么 map 之后 就计算的话,输出存在哪里呢?

如果存在文件,一浪费时间,二浪费地方,如果我们 reduce 时才执行 map,直接把 map 的庞大输出存入内存即可,甚至 流式 传给 reduce,非常高效。

DataFrame

dataframe 类似于 pandas 中的 dataframe,也就是 表格数据

这是 spark2.x 中新增的数据格式,由 SparkSession 直接读取,不管文件是什么类型,txt也好,csv也罢,输出格式都是 dataframe

而 SparkContext 不管读什么文件,输出格式都是 RDD

>>> spark.read.text('README.md')
DataFrame[value: string]

# dataframe 的属性
['__class__', '__delattr__', '__dict__', '__doc__', '__format__', '__getattr__', '__getattribute__', '__getitem__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__reduce_
ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_collectAsArrow', '_jcols', '_jdf', '_jmap', '_jseq', '_lazy_rdd', '_repr_html_', '_sc', '_schema', 
'_sort_cols', '_support_repr_html', 'agg', 'alias', 'approxQuantile', 'cache', 'checkpoint', 'coalesce', 'colRegex', 'collect', 'columns', 'corr', 'count', 'cov', 'createGlobalTempView',
'createOrReplaceGlobalTempView', 'createOrReplaceTempView', 'createTempView', 'crossJoin', 'crosstab', 'cube', 'describe', 'distinct', 'drop', 'dropDuplicates', 'drop_duplicates', 'dropna',
'dtypes', 'exceptAll', 'explain', 'fillna', 'filter', 'first', 'foreach', 'foreachPartition', 'freqItems', 'groupBy', 'groupby', 'head', 'hint', 'intersect', 'intersectAll', 'isLocal',
'isStreaming', 'is_cached', 'join', 'limit', 'localCheckpoint', 'na', 'orderBy', 'persist', 'printSchema', 'randomSplit', 'rdd', 'registerTempTable', 'repartition', 'repartitionByRange',
'replace', 'rollup', 'sample', 'sampleBy', 'schema', 'select', 'selectExpr', 'show', 'sort', 'sortWithinPartitions', 'sql_ctx', 'stat', 'storageLevel', 'subtract', 'summary', 'take', 'toDF',
'toJSON', 'toLocalIterator', 'toPandas', 'union', 'unionAll', 'unionByName', 'unpersist', 'where', 'withColumn', 'withColumnRenamed', 'withWatermark', 'write', 'writeStream']

datafram 的操作不同于 RDD,类似于 pandas

SparkSession and SparkContext

SparkSession 是 spark2.x 引入的新概念,SparkSession 为用户提供统一的切入点,字面理解是创建会话,或者连接 spark

在 spark1.x 中,SparkContext 是 spark 的主要切入点,由于 RDD 作为主要的 API,我们通过 SparkContext 来创建和操作 RDD,

这个问题在于:

1. 不同的应用中,需要使用不同的 context,在 Streaming 中需要使用 StreamingContext,在 sql 中需要使用 sqlContext,在 hive 中需要使用 hiveContext,比较麻烦

2. 随着 DataSet 和 DataFrame API 逐渐成为标准 API,需要为他们创建接入点,即 SparkSession

SparkSession 实际上封装了 SparkContext,另外也封装了 SparkConf、sqlContext,随着版本增加,可能更多,

所以我们尽量使用 SparkSession ,如果发现有些 API 不在 SparkSession 中,也可以通过 SparkSession 拿到 SparkContext 和其他 Context 等

在 shell 操作中,原生创建了 SparkSession,故无需再创建,创建了也不会起作用

在 shell 中,SparkContext 叫 sc,SparkSession 叫 spark

通过 spark 拿到 sc

>>> dir(spark)
['Builder', '__class__', '__delattr__', '__dict__', '__doc__', '__enter__', '__exit__', '__format__', '__getattribute__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__r
educe_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_convert_from_pandas', '_createFromLocal', '_createFromRDD', '_create_from_pandas_with_arrow', 
'_create_shell_session', '_get_numpy_record_dtype', '_inferSchema', '_inferSchemaFromList', '_instantiatedSession', '_jsc', '_jsparkSession', '_jvm', '_jwrapped', '_repr_html_', '_sc', '_wrapped',
'builder', 'catalog', 'conf', 'createDataFrame', 'newSession', 'range', 'read', 'readStream', 'sparkContext', 'sql', 'stop', 'streams', 'table', 'udf', 'version'] spark.sparkContext # 即 sc

简单操作

>>> sc.textFile('README.md').count()
>>> spark.read.text('README.md').count()

shell 操作

spark 支持 shell 操作,支持各种语言的 shell,包括 scala shell、python shell、R shell、SQL shell 等

启动 python shell 模式的命令解析

[root@hadoop10 spark]# bin/pyspark --help
Usage: ./bin/pyspark [options]

Options:
  --master MASTER_URL         spark://host:port, mesos://host:port, yarn,               # 设定 master,即在哪里运行 spark,
                                                                                        # mesos://host:port一般不用;yarn需要把spark部署到yarn上
                              k8s://https://host:port, or local (Default: local[*]).    # local 本地模式,local 表示单线程,local[num]表示num个进程,
                                                                                        # local[*]表示服务器cpu是几核就是几个进程
  --deploy-mode DEPLOY_MODE   Whether to launch the driver program locally ("client") or
                              on one of the worker machines inside the cluster ("cluster")
                              (Default: client).
  --class CLASS_NAME          Your application's main class (for Java / Scala apps).    # 要执行的 class 类名
  --name NAME                 A name of your application.
  --jars JARS                 Comma-separated list of jars to include on the driver
                              and executor classpaths.
  --packages                  Comma-separated list of maven coordinates of jars to include  # 逗号隔开的 maven 列表,给 当前会话 添加依赖
                              on the driver and executor classpaths. Will search the local
                              maven repo, then maven central and any additional remote
                              repositories given by --repositories. The format for the
                              coordinates should be groupId:artifactId:version.
  --exclude-packages          Comma-separated list of groupId:artifactId, to exclude while
                              resolving the dependencies provided in --packages to avoid
                              dependency conflicts.
  --repositories              Comma-separated list of additional remote repositories to
                              search for the maven coordinates given with --packages.
  --py-files PY_FILES         Comma-separated list of .zip, .egg, or .py files to place  # 逗号隔开的 文件列表,替代 PYTHONPATH 的作用,也就是说如果不设置 PYTHONPATH,就需要这个参数,才能导入 文件中的模块
                              on the PYTHONPATH for Python apps.
  --files FILES               Comma-separated list of files to be placed in the working
                              directory of each executor. File paths of these files
                              in executors can be accessed via SparkFiles.get(fileName).

  --conf PROP=VALUE           Arbitrary Spark configuration property.
  --properties-file FILE      Path to a file from which to load extra properties. If not
                              specified, this will look for conf/spark-defaults.conf.

  --driver-memory MEM         Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
  --driver-java-options       Extra Java options to pass to the driver.
  --driver-library-path       Extra library path entries to pass to the driver.
  --driver-class-path         Extra class path entries to pass to the driver. Note that
                              jars added with --jars are automatically included in the
                              classpath.

  --executor-memory MEM       Memory per executor (e.g. 1000M, 2G) (Default: 1G).

  --proxy-user NAME           User to impersonate when submitting the application.
                              This argument does not work with --principal / --keytab.

  --help, -h                  Show this help message and exit.
  --verbose, -v               Print additional debug output.
  --version,                  Print the version of current Spark.

 Cluster deploy mode only:
  --driver-cores NUM          Number of cores used by the driver, only in cluster mode
                              (Default: 1).

 Spark standalone or Mesos with cluster deploy mode only:
  --supervise                 If given, restarts the driver on failure.
  --kill SUBMISSION_ID        If given, kills the driver specified.
  --status SUBMISSION_ID      If given, requests the status of the driver specified.

 Spark standalone and Mesos only:
  --total-executor-cores NUM  Total cores for all executors.

 Spark standalone and YARN only:
  --executor-cores NUM        Number of cores per executor. (Default: 1 in YARN mode,
                              or all available cores on the worker in standalone mode)

 YARN-only:
  --queue QUEUE_NAME          The YARN queue to submit to (Default: "default").
  --num-executors NUM         Number of executors to launch (Default: 2).
                              If dynamic allocation is enabled, the initial number of
                              executors will be at least NUM.
  --archives ARCHIVES         Comma separated list of archives to be extracted into the
                              working directory of each executor.
  --principal PRINCIPAL       Principal to be used to login to KDC, while running on
                              secure HDFS.
  --keytab KEYTAB             The full path to the file that contains the keytab for the
                              principal specified above. This keytab will be copied to
                              the node running the Application Master via the Secure
                              Distributed Cache, for renewing the login tickets and the
                              delegation tokens periodically.

进入 shell 模式

[root@hadoop10 spark]# bin/pyspark 
Python 2.7.12 (default, Oct  2 2019, 19:43:15) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-4)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
19/10/09 18:10:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/

Using Python version 2.7.12 (default, Oct  2 2019 19:43:15)
SparkSession available as 'spark'.  # 自带 spark

shell 模式可以通过 http://192.168.10.10:4040 查看任务

shell 操作语法与 脚本 相同,示例如下

>>> distFile = sc.textFile('README.md')
>>> distFile.map(lambda x: len(x)).reduce(lambda a, b: a + b)
3847                                                                            
>>> distFile.count()
105

Python 编程基本语法

1. 首先创建 SparkSession

1.1 在 spark1.x 中是创建 SparkContext

class SparkContext(__builtin__.object):
    def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=<cl
    ass 'pyspark.profiler.BasicProfiler'>)
    '''Create a new SparkContext. At least the master and app name should be set,
     |      either through the named parameters here or through C{conf}.
     |
     |      :param master: Cluster URL to connect to
     |             (e.g. mesos://host:port, spark://host:port, local[4]).   local 表示本地运行,4 表示使用4个 cpu核
     |      :param appName: A name for your job, to display on the cluster web UI.
     |      :param sparkHome: Location where Spark is installed on cluster nodes.
     |      :param pyFiles: Collection of .zip or .py files to send to the cluster
     |             and add to PYTHONPATH.  These can be paths on the local file
     |             system or HDFS, HTTP, HTTPS, or FTP URLs.
     |      :param environment: A dictionary of environment variables to set on
     |             worker nodes.
     |      :param batchSize: The number of Python objects represented as a single
     |             Java object. Set 1 to disable batching, 0 to automatically choose
     |             the batch size based on object sizes, or -1 to use an unlimited
     |             batch size
     |      :param serializer: The serializer for RDDs.
     |      :param conf: A L{SparkConf} object setting Spark properties.
     |      :param gateway: Use an existing gateway and JVM, otherwise a new JVM
     |             will be instantiated.
     |      :param jsc: The JavaSparkContext instance (optional).
     |      :param profiler_cls: A class of custom Profiler used to do profiling
     |             (default is pyspark.profiler.BasicProfiler).
     |
     |
     |      >>> from pyspark.context import SparkContext
     |      >>> sc = SparkContext('local', 'test')
     |
     |      >>> sc2 = SparkContext('local', 'test2') # doctest: +IGNORE_EXCEPTION_DETAIL
     |      Traceback (most recent call last):
     |          ...
     |      ValueError:...'''

示例如下

from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('myapp1').setMaster('local[4]')   # 设定 appname 和 master
sc = SparkContext(conf=conf)

## 或者直接这样
sc = SparkContext("spark://hadoop10:7077")

1.2 在 spark2.x 中创建 SparkSession

class SparkSession(__builtin__.object):
     def __init__(self, sparkContext, jsparkSession=None):
     ''' Creates a new SparkSession.
     |
     |      >>> from datetime import datetime
     |      >>> spark = SparkSession(sc)
     |      >>> allTypes = sc.parallelize([Row(i=1, s="string", d=1.0, l=1,
     |      ...     b=True, list=[1, 2, 3], dict={"s": 0}, row=Row(a=1),
     |      ...     time=datetime(2014, 8, 1, 14, 1, 5))])
     |      >>> df = allTypes.toDF()
     |      >>> df.createOrReplaceTempView("allTypes")
     |      >>> spark.sql('select i+1, d+1, not b, list[1], dict["s"], time, row.a '
     |      ...            'from allTypes where b and i > 0').collect()
     |      [Row((i + CAST(1 AS BIGINT))=2, (d + CAST(1 AS DOUBLE))=2.0, (NOT b)=False, list[1]=2,             dict[s]=0, time=datetime.datetime(2014, 8, 1, 14, 1, 5), a=1)]
     |      >>> df.rdd.map(lambda x: (x.i, x.s, x.d, x.l, x.b, x.time, x.row.a, x.list)).collect()
     |      [(1, u'string', 1.0, 1, True, datetime.datetime(2014, 8, 1, 14, 1, 5), 1, [1, 2, 3])]'''

示例如下

from pyspark.sql import SparkSession

conf = SparkConf().setAppName('myapp1').setMaster('local[4]')   # 设定 appname 和 master
sess = SparkSession.builder.config(conf=conf)

## 或者这样
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('myapp1').setMaster('local[4]')   # 设定 appname 和 master
sc = SparkContext(conf=conf)
sess = SparkSession(sc)

2. 然后创建 RDD

spark 是以 RDD 概念为中心运行的,RDD 是一个容错的、可以被并行操作的元素集合。

创建 RDD 有两种方式:

1. 在驱动程序中并行化一个已经存在的集合    【内存中的数据】

2. 从外部存储系统引入数据,生成 RDD      【外部存储介质中的数据,注意 spark 本身没有存储功能】

  // 这个存储系统可以是一个共享文件系统,如 hdfs、hbase

2.1 并行化数据集合

并行化集合是通过在一个 迭代器或者集合 上 调用 SparkContext 的 parallelize 方法生成的    【内存中的数据】

data = range(10000)
distData = sc.parallelize(data)
distData.reduce(lambda a, b: a+b)

为了创建一个能并行操作的分布式数据集,所有元素都将被拷贝;

然后我们可以调用 reduce 进行叠加,累计求和

并行化集合时一个重要的参数是将数据集切分的数量。一个切片对应一个 spark 任务,我们可指定切片数量

distData2 = sc.parallelize(data, 100)   # 切 100 分

2.2 外部数据集

由于 spark 本身没有存储功能,一般是从 本地文件、hadoop 等获取外部数据集

2.2.1 本地文件

textFile(name, minPartitions=None, use_unicode=True)[source]
Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.

If use_unicode is False, the strings will be kept as str (encoding as utf-8), which is faster and smaller than unicode. (Added in Spark 1.2)

minPartitions 指定分区数,在 hdfs 中,文件本身就是 以 block 存储的,此时这个 分区数 不能小于 block 数

示例代码

distFile = sc.textFile('README.md')
distFile = sc.textFile('xx.csv')
distFile = sc.textFile('logs')      # 支持文件夹
# textFile("/my/directory/*.txt")   # 支持通配符
# textFile("/my/directory/*.gz")    # 支持压缩文件

type(sc.textFile('README.md'))      # <class 'pyspark.rdd.RDD'>


distFile.map(lambda x: int(x[-1])).reduce(lambda a, b: a + b)
distFile.map(lambda x: len(x)).reduce(lambda a, b: a + b)   # map 的输入是每一行,map 的作用是求每一行的 len
                                                            # reduce 的输入是两个数字,reduce 的作用是求这两个数的和,
                                                            # 也就是把 所有行的 len 逐次求当前元素(当前累计 len 的和)与下一元素的和

读取文件后自动生成 RDD;

各种读取方式都支持 多种 文件格式,如 文件夹,通配符、压缩文件

2.2.2 批量读取本地文件

distFile = sc.wholeTextFiles('files')   # 读取 files 文件夹下所有文件内容,返回 (filename, filecontent) 键值对

 输入必须是 路径

3. 操作 RDD

RDD 的操作有两种方式:转换 和 行动,而且 转换 是 惰性的

可以根据 是否有返回 判断是哪个操作,行动 有返回值,转换无返回值

详见官网 RDD

3.1 RDD 缓存

我们可以把 RDD 缓存到 内存中, 这其实就是 行动 操作

distFile = sc.textFile('README.md')
m = distFile.map(lambda x: len(x))      # map 是 转换 操作,并不立即执行
m.cache()       # 把 map 的输出缓存到内存中,其实 cache 就是 执行 操作

或者 m.persist() 

3.2  转换 操作

惰性,无返回值

map(func[, preservesPartitioning=False]):把一个序列中的元素逐个送入 map,经 func 处理后,返回一个新的 序列

rdd = sc.parallelize([2, 3, 4])
rdd.map(lambda x: x + 1).collect()          # [3, 4, 5]

filter(func):类似 map,func 是个过滤函数

rdd = sc.parallelize([2, 3, 4])
rdd.map(lambda x: x > 3).collect()          # [False, False, True]

flatMap(func[, preservesPartitioning=False]):也类似 map,只是 它会把 每次经过 func 处理的结果进行 合并,输入和输出的 list 长度可能不同

rdd = sc.parallelize([2, 3, 4])
rdd.flatMap(lambda x: range(1, x)).collect()            # [1, 1, 2, 1, 2, 3]
# range(1, 2): 1
# range(1, 3): 1, 2
# range(1, 4): 1, 2, 3

### vs map
rdd.map(lambda x: range(1, x)).collect()                # [[1], [1, 2], [1, 2, 3]]

mapPartitions(func [, preservesPartitioning=False]) :map的一个变种,map 是把序列的单个元素送入 func ,而 mapPartitions 是把 序列分区后 每个 分区 整体送入 func

rdd = sc.parallelize([1,2,3,4,5], 3)    # 分 3 个区
def f(iterator): yield sum(iterator)    # 必须是生成器,即 yield,不能 return
rdd.mapPartitions(f).collect()          # [1, 5, 9]

mapPartitionsWithIndex(func [, preservesPartitioning=False]) :func 有两个参数,分片的序号 和 迭代器,返回 分片序号,也必须是 迭代器

rdd = sc.parallelize(range(15), 13)    # 分 13 个区
def f(splitIndex, iterator): yield splitIndex
rdd.mapPartitionsWithIndex(f).collect() # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]

cartesian(otherDataset):利用两个 序列 生成 笛卡尔內积 的数据集

x = sc.parallelize([1,2,3])
y = sc.parallelize([4,5])
x.cartesian(y).collect()        # [(1, 4), (1, 5), (2, 4), (2, 5), (3, 4), (3, 5)]

以下方法只适用 key-value 数据

reduceByKey(func [, numPartitions=None, partitionFunc=<function portable_hash at 0x7fa664f3cb90>]):针对 k-v 对的处理方法,把 key 相同的 value 进行 reduce,然后重新组成 key-reduce 对

rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
def f(x, y): return x + y
rdd.reduceByKey(f).collect()        # [('a', 2), ('b', 1)]

sortByKey([ascending=True, numPartitions=None, keyfunc=<function <lambda> at 0x7fa665048c80>]):根据 key 进行排序,默认升序,numPartitions 代表分区数,keyfunc 是处理 key 的,在 排序过程中对 key 进行处理

tmp = [('a', 4), ('b', 3), ('c', 2), ('D', 1)]
sc.parallelize(tmp).sortByKey(True, 1).collect()    # 升序[('D', 1), ('a', 4), ('b', 3), ('c', 2)] 1代表分区数
sc.parallelize(tmp).sortByKey(True, 2, keyfunc=lambda k:k.lower()).collect()   # 升序[('a', 4), ('b', 3), ('c', 2), ('D', 1)] D跑到后面了

sc.parallelize(tmp).sortByKey(False, 2, keyfunc=lambda k:k.lower()).collect()# 降序[('D', 1), ('c', 2), ('b', 3), ('a', 4)]

keyfunc 只在 排序过程中起作用,在输出时 keyfunc 不起作用

join(otherDataset [, numPartitions=None]):将 两个 k-v RDD 中 共有的 key 的 value 交叉组合

x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
x.join(y).collect()     # [('a', (1, 2)), ('a', (1, 3))]

3.3 行动 操作

有返回值

collect:返回 RDD 中的数据

count:返回 RDD 中元素个数

first:返回 RDD 中第一个元素

max. min.sum:不解释

take(n):返回 RDD 中 前 n 个元素

takeOrdered(n [, key=None]):对 RDD 先进行排序,然后取排序后的 前 n 个数据,key 表示先经过 keyfunc 处理后再进行排序,最终返回的还是原数据

sc.parallelize([9,7,3,2,6,4]).takeOrdered(3)    # [2, 3, 4]
sc.parallelize([9,7,3,2,6,4]).takeOrdered(3, key=lambda x: -x)   # [9, 7, 6]
## 过程如下
#  9,  7,  3,  2,  6,  4  ## 原数据
# -9, -7, -3, -2, -6, -4  ## 经过 keyfunc 处理后的数据
# -9, -7, -6, -4, -3, -2  ## 对处理后的数据升序排序
# -9, -7, -6              ## 取前3个
#  9,  7,  6              ## 对应到原数据

也就是说,keyfunc 只在排序时起作用,在输出时不起作用

foreach(func):运行 func 函数 并行处理 RDD 的所有元素

sc.parallelize([1, 2, 3, 4, 5]).foreach(print)  # 并行打印,不按顺序输出
# 1
# 2
# 4
# 5
# 3

reduce(func):把 RDD 中前两个元素送入 func,得到一个 value,把这个 value 和 下一个元素 送入 func,直至最后一个元素

sc.parallelize([1,2,3,4,5]).reduce(lambda x, y: x + y)  # 15 求和

fold:与 reduce 类似,fold 是有一个 基数,然后 把每个元素 和 基数 送入 func,然后替换该基数,循环,直到最后一个元素

x = sc.parallelize([1,2,3])
neutral_zero_value = 0  # 0 for sum, 1 for multiplication
y = x.fold(neutral_zero_value, lambda obj, accumulated: accumulated + obj) # computes cumulative sum
print(x.collect())  # [1,2,3]
print(y)            # 6

aggregate:对每个分区进行聚合,然后聚合每个分区的聚合结果,详见我的博客 aggregate

countByValue:统计相同元素的个数

sc.parallelize([1,2,3,1,2,5,3,2,3,2]).countByValue().items()    # [(1, 2), (2, 4), (3, 3), (5, 1)]

# 输入 k-v 不按 value 统计,按 k-v 统计
sc.parallelize([('a', 1), ('b', 1)]).countByValue().items()     # [(('a', 1), 1), (('b', 1), 1)]

saveAsTextFile(path [, compressionCodecClass=None]):把 RDD 存储到文件系统中

counts.saveAsTextFile('/usr/lib/spark/out')

输入必须是 路径,且该路径不能事先存在

以下方法只适用 key-value 数据

countByKey:统计相同 key 的个数,返回 key-count 

sc.parallelize([("a",1), ("b",1), ("a", 3)]).countByKey()   # defaultdict(<type 'int'>, {'a': 2, 'b': 1})

dictdata= sc.parallelize([("a",1), ("b",1), ("a", 3)]).countByKey()
dictdata.items()        # [('a', 2), ('b', 1)]

篇幅太大,到这先告一段落。 

Python 脚本

如何运行 python 脚本?如何 在 python 中 调用 spark?,这两个问题答案相同。

首先需要配置 /etc/profile

# python can call pyspark directly
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/pyspark:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH

python 的搜索路径 ,加上 spark 中 python 和 pyspark,以及 py4j-0.10.4-src.zip,他的作用是 负责 python 和 java 之间的 转换。

python 脚本 test1.py

from __future__ import print_function
from pyspark import *
import os
print(os.environ['SPARK_HOME'])
print(os.environ['HADOOP_HOME'])
if __name__ == '__main__':
    sc = SparkContext("spark://hadoop10:7077")
    rdd = sc.parallelize("hello Pyspark world".split(' '))
    counts = rdd.map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a + b)
    counts.saveAsTextFile('/usr/lib/spark/out')
    counts.foreach(print)

    sc.stop()

命令行执行

bin/spark-submit test1.py

或者 之间运行 py 文件

python test1.py

脚本模式 通过 http://192.168.10.10:8080/ 查看任务

先写到这吧,太长了,其他内容新写博客吧

参考资料:

https://www.cnblogs.com/yangzhang-home/p/6056133.html  快速入门

https://blog.csdn.net/kl28978113/article/details/80361452  较全教程

http://spark.apache.org/docs/latest/    spark 2.4.4 官网

http://spark.apache.org/docs/latest/api/python/index.html    spark 2.4.4 python API

https://www.cnblogs.com/Vito2008/p/5216324.html

https://blog.csdn.net/proplume/article/details/79798289

https://blog.csdn.net/qq_21383435/article/details/77371142  spark学习-SparkSQL--12-SparkSession与SparkContext

https://www.iteblog.com/archives/1396.html#aggregate  RDD 操作 API

https://www.cnblogs.com/yxpblog/p/5269314.html    RDD 操作 API

原文地址:https://www.cnblogs.com/yanshw/p/11620204.html