利用PySpark对 Tweets 流数据进行情感分析实战

时间:2022-07-22
本文章向大家介绍利用PySpark对 Tweets 流数据进行情感分析实战,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。


磐创AI分享

作者 | LAKSHAY ARORA

编译 | VK

来源 | Analytics Vidhya

概述

  • 流数据是机器学习领域的一个新兴概念
  • 学习如何使用机器学习模型(如logistic回归)使用PySpark对流数据进行预测
  • 我们将介绍流数据和Spark流的基础知识,然后深入到实现部分

介绍

想象一下,每秒有超过8500条微博被发送,900多张照片被上传到Instagram上,超过4200个Skype电话被打,超过78000个谷歌搜索发生,超过200万封电子邮件被发送(根据互联网实时统计)。

我们正在以前所未有的速度和规模生成数据。在数据科学领域工作真是太好了!但是,随着大量数据的出现,同样面临着复杂的挑战。

主要是,我们如何收集这种规模的数据?我们如何确保我们的机器学习管道在数据生成和收集后继续产生结果?这些都是业界面临的重大挑战,也是为什么流式数据的概念在各组织中越来越受到重视的原因。

增加处理流式数据的能力将大大提高你当前的数据科学能力。这是业界急需的技能,如果你能掌握它,它将帮助你获得下一个数据科学的角色。

因此,在本文中,我们将了解什么是流数据,了解Spark流的基本原理,然后研究一个与行业相关的数据集,以使用Spark实现流数据。

目录

  1. 什么是流数据?
  2. Spark流基础
    1. 离散流
    2. 缓存
    3. 检查点
  3. 流数据中的共享变量
    1. 累加器变量
    2. 广播变量
  4. 利用PySpark对流数据进行情感分析

什么是流数据?

我们看到了上面的社交媒体数据——我们正在处理的数据令人难以置信。你能想象存储所有这些数据需要什么吗?这是一个复杂的过程!因此,在我们深入讨论本文的Spark方面之前,让我们花点时间了解流式数据到底是什么。

❝流数据没有离散的开始或结束。这些数据是每秒从数千个数据源生成的,需要尽快进行处理和分析。相当多的流数据需要实时处理,比如Google搜索结果。 ❞

我们知道,一些结论在事件发生后更具价值,它们往往会随着时间而失去价值。举个体育赛事的例子——我们希望看到即时分析、即时统计得出的结论,以便在那一刻真正享受比赛,对吧?

Spark流基础

❝Spark流是Spark API的扩展,它支持对实时数据流进行可伸缩和容错的流处理。 ❞

在跳到实现部分之前,让我们先了解Spark流的不同组件。

离散流

离散流或数据流代表一个连续的数据流。这里,数据流要么直接从任何源接收,要么在我们对原始数据做了一些处理之后接收。

构建流应用程序的第一步是定义我们从数据源收集数据的批处理时间。如果批处理时间为2秒,则数据将每2秒收集一次并存储在RDD中。而这些RDD的连续序列链是一个不可变的离散流,Spark可以将其作为一个分布式数据集使用。

想想一个典型的数据科学项目。在数据预处理阶段,我们需要对变量进行转换,包括将分类变量转换为数值变量、删除异常值等。Spark维护我们在任何数据上定义的所有转换的历史。因此,无论何时发生任何错误,它都可以追溯转换的路径并重新生成计算结果。

我们希望Spark应用程序运行24小时 x 7,并且无论何时出现任何故障,我们都希望它尽快恢复。但是,Spark在处理大规模数据时,出现任何错误时需要重新计算所有转换。你可以想象,这非常昂贵。

缓存

以下是应对这一挑战的一种方法。我们可以临时存储计算(缓存)的结果,以维护在数据上定义的转换的结果。这样,当出现任何错误时,我们不必一次又一次地重新计算这些转换。

数据流允许我们将流数据保存在内存中。当我们要计算同一数据上的多个操作时,这很有帮助。

检查点(Checkpointing)

当我们正确使用缓存时,它非常有用,但它需要大量内存。并不是每个人都有数百台拥有128GB内存的机器来缓存所有东西。

这就引入了检查点的概念。

❝检查点是保存转换数据帧结果的另一种技术。它将运行中的应用程序的状态不时地保存在任何可靠的存储器(如HDFS)上。但是,它比缓存速度慢,灵活性低。 ❞

当我们有流数据时,我们可以使用检查点。转换结果取决于以前的转换结果,需要保留才能使用它。我们还检查元数据信息,比如用于创建流数据的配置和一组DStream(离散流)操作的结果等等。

流数据中的共享变量

有时我们需要为Spark应用程序定义map、reduce或filter等函数,这些函数必须在多个集群上执行。此函数中使用的变量将复制到每个计算机(集群)。

在这里,每个集群有一个不同的执行器,我们需要一些东西,可以给我们这些变量之间的关系。

例如,假设我们的Spark应用程序运行在100个不同的集群上,捕获来自不同国家的人发布的Instagram图片。我们需要一个在他们的帖子中提到的特定标签的计数。

「现在,每个集群的执行器将计算该集群上存在的数据的结果。但是我们需要一些东西来帮助这些集群进行通信,这样我们就可以得到聚合的结果。在Spark中,我们有一些共享变量可以帮助我们克服这个问题」

累加器变量

用例,比如错误发生的次数、空白日志的次数、我们从某个特定国家收到请求的次数,所有这些都可以使用累加器来解决。

每个集群上的执行器将数据发送回驱动程序进程,以更新累加器变量的值。累加器仅适用于关联和交换的操作。例如,sum和maximum有效,而mean无效。

广播变量

当我们处理位置数据时,比如城市名称和邮政编码的映射,这些都是固定变量。现在,如果任何集群上的特定转换每次都需要此类数据,我们不需要向驱动程序发送请求,因为这太昂贵了。

相反,我们可以在每个集群上存储此数据的副本。这些类型的变量称为广播变量。

❝广播变量允许程序员在每台机器上缓存一个只读变量。通常,Spark会使用有效的广播算法自动分配广播变量,但如果我们有多个阶段需要相同数据的任务,我们也可以定义它们。 ❞

利用PySpark对流数据进行情感分析

是时候启动你最喜欢的IDE了!让我们在本节中进行写代码,并以实际的方式理解流数据。

在本节中,我们将使用真实的数据集。我们的目标是在推特上发现仇恨言论。为了简单起见,如果推特带有种族主义或性别歧视情绪,我们说它包含仇恨言论。

因此,任务是将种族主义或性别歧视的推文与其他推文进行分类。我们将使用Tweets和label的训练样本,其中label'1'表示Tweet是种族主义/性别歧视,label'0'表示其他。

为什么这个项目与流处理相关?因为社交媒体平台以评论和状态更新的形式接收海量流媒体数据。这个项目将帮助我们限制公开发布的内容。

你可以在这里更详细地查看问题陈述-练习问题:Twitter情感分析(https://datahack.analyticsvidhya.com/contest/practice-problem-twitter-sentiment-analysis/?utm_source=blog&utm_medium=streaming-data-pyspark-machine-learning-model)。我们开始吧!

设置项目工作流

  1. 「模型构建」:我们将建立一个逻辑回归模型管道来分类tweet是否包含仇恨言论。在这里,我们的重点不是建立一个非常精确的分类模型,而是查看如何使用任何模型并返回流数据的结果
  2. 「初始化Spark流上下文」:一旦构建了模型,我们就需要定义从中获取流数据的主机名和端口号
  3. 「流数据」:接下来,我们将从定义的端口添加netcat服务器的tweets,Spark API将在指定的持续时间后接收数据
  4. 「预测并返回结果」:一旦我们收到tweet文本,我们将数据传递到我们创建的机器学习管道中,并从模型返回预测的情绪

下面是我们工作流程的一个简洁说明:

建立Logistic回归模型的数据训练

我们在映射到标签的CSV文件中有关于Tweets的数据。我们将使用logistic回归模型来预测tweet是否包含仇恨言论。如果是,那么我们的模型将预测标签为1(否则为0)。

你可以在这里下载数据集和代码(https://github.com/lakshay-arora/PySpark/tree/master/spark_streaming)。

首先,我们需要定义CSV文件的模式,否则,Spark将把每列的数据类型视为字符串。我们读取数据并检查:

# 导入所需库
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.streaming import StreamingContext
import pyspark.sql.types as tp
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
from pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizer
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import Row

# 初始化spark session
sc = SparkContext(appName="PySparkShell")
spark = SparkSession(sc)
    
# 定义方案
my_schema = tp.StructType([
  tp.StructField(name= 'id',          dataType= tp.IntegerType(),  nullable= True),
  tp.StructField(name= 'label',       dataType= tp.IntegerType(),  nullable= True),
  tp.StructField(name= 'tweet',       dataType= tp.StringType(),   nullable= True)
])
    
  
# 读取数据集
my_data = spark.read.csv('twitter_sentiments.csv',
                         schema=my_schema,
                         header=True)

# 查看数据
my_data.show(5)

# 输出方案
my_data.printSchema()

定义机器学习管道

现在我们已经在Spark数据帧中有了数据,我们需要定义转换数据的不同阶段,然后使用它从我们的模型中获取预测的标签。

在第一阶段中,我们将使用RegexTokenizer 将Tweet文本转换为单词列表。然后,我们将从单词列表中删除停用词并创建单词向量。在最后阶段,我们将使用这些词向量建立一个逻辑回归模型,并得到预测情绪。

请记住,我们的重点不是建立一个非常精确的分类模型,而是看看如何在预测模型中获得流数据的结果。

# 定义阶段1:标记tweet文本 
stage_1 = RegexTokenizer(inputCol= 'tweet' , outputCol= 'tokens', pattern= '\W')
# 定义阶段2:删除停用字
stage_2 = StopWordsRemover(inputCol= 'tokens', outputCol= 'filtered_words')
# 定义阶段3:创建大小为100的词向量
stage_3 = Word2Vec(inputCol= 'filtered_words', outputCol= 'vector', vectorSize= 100)
# 定义阶段4:逻辑回归模型
model = LogisticRegression(featuresCol= 'vector', labelCol= 'label')

设置我们的机器学习管道

让我们在Pipeline对象中添加stages变量,然后按顺序执行这些转换。将管道与训练数据集匹配,现在,每当我们有新的Tweet时,我们只需要将其传递到管道对象并转换数据以获得预测:

# 设置管道
pipeline = Pipeline(stages= [stage_1, stage_2, stage_3, model])

#拟合模型
pipelineFit = pipeline.fit(my_data)

流数据和返回的结果

假设我们每秒收到数百条评论,我们希望通过阻止发布包含仇恨言论的评论的用户来保持平台的干净。所以,每当我们收到新的文本,我们就会把它传递到管道中,得到预测的情绪。

我们将定义一个函数 「get_prediction」,它将删除空白语句并创建一个数据框,其中每行包含一条推特。

因此,初始化Spark流上下文并定义3秒的批处理持续时间。这意味着我们将对每3秒收到的数据进行预测:

#定义一个函数来计算情感
def get_prediction(tweet_text):
 try:
    # 过滤得到长度大于0的tweets
  tweet_text = tweet_text.filter(lambda x: len(x) > 0)
    # 创建一个列名为“tweet”的数据框,每行将包含一条tweet
  rowRdd = tweet_text.map(lambda w: Row(tweet=w))
    # 创建spark数据框
  wordsDataFrame = spark.createDataFrame(rowRdd)
    # 利用管道对数据进行转换,得到预测的情绪
  pipelineFit.transform(wordsDataFrame).select('tweet','prediction').show()
 except : 
  print('No data')
    
# 初始化流上下文
ssc = StreamingContext(sc, batchDuration= 3)

# 创建一个将连接到hostname:port的数据流,如localhost:9991
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))

# 用一个关键字“tweet_APP”分割tweet文本,这样我们就可以从一条tweet中识别出一组单词
words = lines.flatMap(lambda line : line.split('TWEET_APP'))

# 获取收到的推文的预期情绪
words.foreachRDD(get_prediction)

#开始计算
ssc.start()             

# 等待结束
ssc.awaitTermination()  

在一个终端上运行程序并使用Netcat(一个实用工具,可用于将数据发送到定义的主机名和端口号)。可以使用以下命令启动TCP连接:

nc -lk port_number

最后,在第二个终端中键入文本,你将在另一个终端中实时获得预测:

视频演示地址:https://cdn.analyticsvidhya.com/wp-content/uploads/2019/12/final_twitter_sentiment.mp4?_=1

结尾

流数据在未来几年会增加的越来越多,所以你应该开始熟悉这个话题。记住,数据科学不仅仅是建立模型,还有一个完整的管道需要处理。

本文介绍了Spark流的基本原理以及如何在真实数据集上实现它。我鼓励你使用另一个数据集或收集实时数据并实现我们刚刚介绍的内容(你也可以尝试其他模型)。

原文链接:https://www.analyticsvidhya.com/blog/2019/12/streaming-data-pyspark-machine-learning-model/