基于PySpark的流媒体用户流失预测

时间:2022-07-24
本文章向大家介绍基于PySpark的流媒体用户流失预测,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

磐创AI分享

作者 | Isak Kabir

编译 | VK

来源 | Towards Data Science

对于音乐流媒体业务来说,确定可能流失的用户(即有可能从付费降级到取消服务的用户)是关键。

如果一家音乐流媒体企业提前准确地识别出这些用户,他们就可以为他们提供折扣或其他类似的激励措施,从而拯救公司数百万的收入。

众所周知,获得一个新客户比留住一个现有客户要昂贵得多。这是因为回头客很可能会在贵公司的产品和服务上多花67%。

1.1工程概况

我们要确定可能取消其帐户并离开服务的用户。我们在这个项目中的目标是帮助一个虚构的企业(类似于Spotify和Pandora),通过建立和训练一个二进制分类器,该分类器能够根据用户过去的活动和与服务的交互获得的模式,准确识别取消音乐流服务的用户。

  • 定义客户流失变量:1—在观察期内取消订阅的用户,0—始终保留服务的用户

由于数据集的大小,该项目是通过利用apache spark分布式集群计算框架,我们使用Spark的Python API,即PySpark来实现的。

1.2加载数据

# 导入库

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import udf, col, concat, count, lit, avg, lag, first, last, when
from pyspark.sql.functions import min as Fmin, max as Fmax, sum as Fsum, round as Fround
from pyspark.sql.types import IntegerType, DateType, TimestampType

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, Normalizer, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.clustering import KMeans
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator,

# 创建Spark会话
spark = SparkSession 
.builder 
.appName(‘CustomerChurn’) 
.getOrCreate()

# 检查Spark配置
spark.sparkContext.getConf().getAll()

path = "mini_sparkify_event_data.json"
df = spark.read.json(path)

2.理解数据

数据集包含2018年10月1日至2018年12月1日期间记录的用户活动日志。整个数据集由大约2600万行/日志组成,而子集包含286500行。

完整的数据集收集22277个不同用户的日志,而子集仅涵盖225个用户的活动。子集数据集包含58300个免费用户和228000个付费用户。两个数据集都有18列,如下所示。

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

每个用户都属于特定的日志。数据集中的七列表示静态用户级信息:

「artist:」 用户正在收听的艺术家「userId」: 用户标识符;「sessionId:」 标识用户在一段时间内的唯一ID。多个用户可以使用相同的sessionId标记会话「firstName」: 用户的名字「lastName」: 用户的姓「gender」: 用户的性别;2类(M和F)「location」: 用户的位置「userAgent」: 用户用于访问流媒体服务的代理;有57个不同类别「registration」: 用户的注册时间戳「level」 (non-static): 订阅级别;两类(免费和付费)「page:」 生成此事件时用户正在访问的页面。下面一节将详细介绍不同类型的页面

「page」列包含用户在应用程序中访问过的所有页面的日志。

>>> df.select('page').distinct().show(10)
+--------------------+
|                page|
+--------------------+
|              Cancel|
|    Submit Downgrade|
|         Thumbs Down|
|                Home|
|           Downgrade|
|         Roll Advert|
|              Logout|
|       Save Settings|
|Cancellation Conf...|
|               About|
+--------------------

根据所执行的分析,仍然属于同一会话的两个连续日志之间的最长时间似乎是一个小时。

# 浏览auth列
df.groupby('auth').count().show()
+----------+------+
|      auth| count|
+----------+------+
|Logged Out|  8249|
| Cancelled|    52|
|     Guest|    97|
| Logged In|278102|
+----------+------+

我们还可以看到,用户相当活跃,其中排名前一位的歌曲总数已经达到8000首左右。下面的图表表明,流失的用户通常来自加州和新泽西州,大部分付费用户都离开了音乐应用程序,而取消订阅的男性多于女性。

加利福尼亚州和纽约州的人口往往更为密集,因此可能会有更高的流失率和更高的整体参与度。从下面的图中很容易看出,所提供的稀疏数据集是一个不平衡的数据集,因为与174个用户相比,流失用户的比例仅略高于20%(52)。

3.特征工程

首先,我们必须将原始数据集(每个日志一行)转换为具有用户级信息或统计信息的数据集(每个用户一行)。我们通过执行几个映射(例如获取用户性别、观察期的长度等)和聚合步骤来实现这一点。

3.1转换

对于在10月1日之后注册的少数用户,注册时间与实际的日志时间戳和活动类型不一致。因此,我们必须通过在page列中找到Submit Registration日志来识别延迟注册。

这一步并不简单,因为这样的日志事件没有映射到任何userId,因此必须从sessionId信息中提取这些事件。

对于少数注册晚的用户,观察开始时间被设置为第一个日志的时间戳,而对于所有其他用户,则使用默认的10月1日。

# 延迟页面列
windowsession = Window.partitionBy('sessionId').orderBy('ts')
df = df.withColumn("lagged_page", lag(df.page).over(windowsession))

windowuser = Window.partitionBy('userId').orderBy('ts').rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)

# 据此推断出用户注册后的开始日期
df = df.withColumn("beforefirstlog", first(col('lagged_page')).over(windowuser))
df = df.withColumn("firstlogtime", first(col('ts')).over(windowuser))
df = df.withColumn("obsstart", 
                   when(df.beforefirstlog == "Submit Registration", df.firstlogtime).otherwise(obs_start_default))

# 对于每一个日志得到观察开始的时间
df = df.withColumn("timefromstart", col('ts')-col("obsstart"))
# 以及观察结束前的时间
df = df.withColumn("timebeforeend", col('obsend')-col('ts'))

与上述情况类似,也有用户在默认观察期结束前取消了服务,即所谓的流失用户。对于每个这样的用户,各自观察期的结束被设置为他/她最后一个日志条目的时间戳,而对于所有其他用户,默认为12月1日。

3.2特征工程

新创建的用户级数据集包括以下列:

「lastlevel」:用户最后的订阅级别,转换为二进制格式(1-付费,0-免费)

「gender」:性别,转换成二进制格式(1-女性,0-男性)

「obsstart」「obsend」:用户特定观察期的开始和结束时间

「endstate」:用户在观察期内的最后一次交互

「nact」:观察期内用户的交互总数

「nsongs, ntbup, ntbdown, nfriend, nplaylist, ndgrade, nupgrade, nhome, nadvert, nhelp, nsettings, nerror」:播放的歌曲数,点赞的个数,取消赞的个数,添加好友个数,添加到播放列表中的歌曲个数,降级的级数,升级的级数,主页访问次数,播放的广告数,帮助页面访问数,设置访问数,错误数

「nact_recent」「nact_oldest」:用户在观察窗口的最后k天和前k天的活动

「nsongs_recent」「nsongs_oldest」:分别在观察窗口的最后k天和前k天播放的歌曲

# 按用户标识聚合
df_user = df.groupby(‘userId’)
.agg(
 # 用户级特征
 first(when(col(‘lastlevel’) == ‘paid’, 1).otherwise(0)).
alias(‘lastlevel’),
 first(when(col(‘gender’) == “F”, 1).otherwise(0)).alias(‘gender’),
 first(col(‘obsstart’)).alias(‘obsstart’),
 first(col(‘obsend’)).alias(‘obsend’),
 first(col(‘endstate’)).alias(‘endstate’),
 
 # 聚合活动统计
 count(col(‘page’)).alias(‘nact’),
Fsum(when(col(‘page’) == “NextSong”, 1).otherwise(0)).alias(“nsongs”),
 Fsum(when(col(‘page’) == “Thumbs Up”, 1).otherwise(0)).alias(“ntbup”),
 Fsum(when(col(‘page’) == “Thumbs Down”, 1).otherwise(0)).alias(“ntbdown”),
 Fsum(when(col(‘page’) == “Add Friend”, 1).otherwise(0)).alias(“nfriend”),
 Fsum(when(col(‘page’) == “Add to Playlist”, 1).otherwise(0)).alias(“nplaylist”), 
 Fsum(when(col(‘page’) == “Submit Downgrade”, 1).otherwise(0)).alias(“ndgrade”),
 Fsum(when(col(‘page’) == “Submit Upgrade”, 1).otherwise(0)).alias(“nugrade”),
 Fsum(when(col(‘page’) == “Home”, 1).otherwise(0)).alias(“nhome”),
 Fsum(when(col(‘page’) == “Roll Advert”, 1).otherwise(0)).alias(“nadvert”),
 Fsum(when(col(‘page’) == “Help”, 1).otherwise(0)).alias(“nhelp”),
 Fsum(when(col(‘page’) == “Settings”, 1).otherwise(0)).alias(“nsettings”),
 Fsum(when(col(‘page’) == “Error”, 1).otherwise(0)).alias(“nerror”),
 
 # 不同时期的活动统计
 Fsum(when(col(‘timebeforeend’) < trend_est, 1).otherwise(0)).alias(“nact_recent”),
 Fsum(when(col(‘timefromstart’) < trend_est, 1).otherwise(0)).alias(“nact_oldest”),
 Fsum(when((col(‘page’) == “NextSong”) & (col(‘timebeforeend’) < trend_est), 1).otherwise(0)).alias(“nsongs_recent”),
 Fsum(when((col(‘page’) == “NextSong”) & (col(‘timefromstart’) < trend_est), 1).otherwise(0)).alias(“nsongs_oldest”) )

聚合活动统计

4.探索性数据分析

在完成特征工程步骤之后,我们分析了构建的特征之间的相关性。

# 我们切换到pandas数据帧
df_user_pd = df_user.toPandas()

# 计算数值特征之间的相关性
cormat = df_user_pd[['nact_perh','nsongs_perh', 'nhome_perh', 'ntbup_perh','ntbdown_perh', 'nfriend_perh','nplaylist_perh', 
'nadvert_perh', 'nerror_perh', 'upgradedowngrade', 'songratio', 'positiveratio','negativeratio', 
'updownratio', 'trend_act', 'trend_songs', 'avgsessionitems',  'avgsessionlength','avgsongs']].corr()

# 绘图相关性
plt.rcParams['figure.figsize'] = (10,10)
plt.subplots_adjust(left=0.20, right=0.9, top=0.95, bottom=0.15)
sns.heatmap(cormat, cmap = "YlGnBu", square = True, vmin = -1, vmax = 1);
plt.title('Feature correlations');
plt.savefig('correlations.png')

上面的热图描述了变量nact_perh和nsongs_perh之间的高度相关性。这是意料之中的,因为听歌曲显然是迄今为止最常见的用户活动。

出于同样的原因,「trend_act」「trend_songs」之间有很高的相关性。在这两种情况下,我们决定简单地从所有进一步的分析中删除,只保留测量最重要的交互作用的变量。

为了进一步降低数据中的多重共线性,我们还决定在模型中不使用nhome_perh和nplaylist_perh。此外,「avgsessionlength」与每个会话中的「avgsessionitems」相关,因此也可以忽略它。

4.1与流失用户的关系

从下面所示的可视化中,我们得出了以下观察结果:

  • 平均来说,用户每小时播放更多的歌曲;
  • 流失用户每小时都会有更多的取消点赞(thumbs down)行为,平均来看,他们不得不看更多的广告;
  • 对于流失用户来说,歌曲和积极互动相对于总活动的比率通常较低
  • 流失用户平均每个会话的交互次数更少
  • 免费用户流失率更高
  • 男性用户的流失率略高

基于此分析,尚未删除任何特征。

5.建模与评估

我们首先使用交叉验证的网格搜索来测试几个参数组合的性能,所有这些都是从较小的稀疏用户活动数据集中获得的用户级数据。基于交叉验证中获得的性能结果(用AUC和F1分数衡量),我们确定了性能最好的模型实例,并在整个训练集中对它们进行了再训练。

5.1网格搜索法

Logistic回归
  • maxIter(最大迭代次数,默认值=100):[10,30]
  • regParam(正则化参数,默认值=0.0):[0.0,0.1]
  • elasticNetParam(混合参数-0表示L2惩罚,1表示L1惩罚,默认值=0.0):[0.0,0.5]
随机森林分类器
  • maxDepth(最大树深度,默认值=5):[4,5,6,7]
  • 树个数(树个数,默认值=20):[20,40]
梯度增强树GB分类器
  • maxDepth(最大树深度,默认值=5):[4,5]
  • maxIter(最大迭代次数,默认值=20):[20,100]

在定义的网格搜索对象中,每个参数组合的性能默认由4次交叉验证中获得的平均AUC分数(ROC下的面积)来衡量。下文第4.4节简要解释了AUC。

numeric_columns = [‘nsongs_perh’, ‘ntbup_perh’,’ntbdown_perh’, ‘nfriend_perh’, 
‘nadvert_perh’, ‘nerror_perh’, ‘upgradedowngrade’, ‘songratio’, ‘positiveratio’,’negativeratio’, 
‘updownratio’, ‘trend_songs’, ‘avgsessionitems’,’avgsongs’]

# 用VectorAssembler组合多个数值特征
numeric_assembler = VectorAssembler(inputCols = numeric_columns, outputCol = “numericvectorized”)

# 标准化数字特征
scaler = StandardScaler(inputCol = “numericvectorized”, outputCol = “numericscaled”, withStd = True, withMean = True)

# 添加两个二进制特征
binary_columns = [‘lastlevel’, ‘gender’]
total_assembler = VectorAssembler(inputCols = binary_columns + [“numericscaled”], outputCol = “features”)

# 使用三个不同的分类器定义三个不同的管道,所有这些都带有默认参数
# 逻辑回归
lr = LogisticRegression()
pipeline_lr = Pipeline(stages = [numeric_assembler, scaler, total_assembler, lr])

# 随机森林分类器
rf = RandomForestClassifier()
pipeline_rf = Pipeline(stages = [numeric_assembler, scaler, total_assembler, rf])

# 梯度增强树分类器
gb = GBTClassifier()
pipeline_gb = Pipeline(stages = [numeric_assembler, scaler, total_assembler, gb])

5.2性能指标

F1分数是解决此问题的首选性能指标。输入的用户级数据集不平衡。音乐流媒体服务的目标是识别出大多数可能流失的用户(目标是高召回率),但同时又不想无缘无故地给予太多折扣(以高精度为目标)——这可以帮助音乐流媒体业务避免经济损失。

class F1score(Evaluator):
def __init__(self, predictionCol = “prediction”, labelCol=”label”):
 self.predictionCol = predictionCol
 self.labelCol = labelCol
def _evaluate(self, dataset):
 
 # 计算F1分数
 tp = dataset.where((dataset.label == 1) & (dataset.prediction == 1)).count()
 fp = dataset.where((dataset.label == 0) & (dataset.prediction == 1)).count()
 tn = dataset.where((dataset.label == 0) & (dataset.prediction == 0)).count()
 fn = dataset.where((dataset.label == 1) & (dataset.prediction == 0)).count()
 
 # 加epsilon以防止被零除
 precision = tp / (tp + fp + 0.00001)
 recall = tp / (tp + fn + 0.00001)
 
 f1 = 2 * precision * recall / (precision + recall + 0.00001)
 
 return f1
def isLargerBetter(self):
 return True

表现最好的模型AUC得分为0.981,F1得分为0.855。

如上图所示,识别流失用户的最重要特征是错误率,它衡量每小时向用户显示的错误页面数量。用户遇到的错误越多,他/她对服务不满意的可能性就越大。

第二个和第三个最重要的特征「ntbdown_perh」「nadvert_perh」也有类似的情况,它们分别衡量每小时的取消点赞次数和每小时看到的广告数量。

最有趣的特征是「trend_songs」变量,它测量用户的歌曲收听活动趋势,作为第四个最重要的特征。

6.结论和改进

梯度增强树分类器的F1分数(精确度和召回率)为0.855,可以根据过去的用户活动和与音乐流服务的交互来识别流失的用户,这有助于企业避免严重的经济损失。

一些改进是在完全稀疏的数据集上对模型执行全面的网格搜索。利用到目前为止被忽略的歌曲级特征,例如,根据在指定观察期内听过的不同歌曲/艺术家计算用户的收听多样性等。构建新特征,例如歌曲收听会话的平均长度、跳过或部分收听歌曲的比率等。

Github链接:https://github.com/isakkabir/isakkabir-Customer-Churn-Prediction-Music-Streaming/blob/master/CustomerChurn_cluster.ipynb

Github链接:https://github.com/isakkabir/isakkabir-Customer-Churn-Prediction-Music-Streaming/blob/master/CustomerChurn_cluster.ipynb

原文链接:https://towardsdatascience.com/customer-churn-prediction-within-music-streaming-using-pyspark-a96edd4beae8