基于Spark /Tensorflow使用CNN处理NLP的尝试

时间:2022-05-03
本文章向大家介绍基于Spark /Tensorflow使用CNN处理NLP的尝试,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

01 前言

关于CNN如何和NLP结合,其实是被这篇文章(http://www.wildml.com/2015/11/understanding-convolutional-neural-networks-for-nlp/)指导入门的 。 我觉得使用CNN去处理一些NLP的分类问题,是非常不错的。

主要好处有:

1、CNN能自动抽取出一些高级特征,减少了特征工程的时间

2、使用WordEmbedding技术将词汇表达为向量后,可以很方便的将文本表示为类似图片的2D向量

3、神经网络表达能力强

缺点的话,就是目前我还没想到如何把一些非文本类的因子给融合进去。就是有时候我是希望能够做些特征工程,抽取出一些因子,然后加入到模型里面的。

02

数据预处理阶段

先简单解释下流程,首先是对所有文本先分词,我们采用Ansj分词工具,然后通过Spark 的Word2vec 来训练得到词向量。Zepplin是一个很好的工具,方便算法工程师做预处理,我们给力的运维同学还把tensorflow也集成进了zepplin,方便我们使用。

比如要做分词也很简答,

%spark import org.ansj.splitWord.analysis.NlpAnalysis import scala.collection.JavaConversions._ import scala.collection.mutable import org.apache.spark.sql.functions.udf //注册一个分词函数 spark.udf.register("parse",(co:String)=>{ val parseMethod Class.forName("org.ansj.splitWord.analysis.NlpAnalysis").getMethod("parse", classOf[String]) val tmp = parseMethod.invoke(null, co) val terms = tmp.getClass.getMethod("getTerms").invoke(tmp).asInstanceOf[java.util.List[Any]] terms.map(f => f.asInstanceOf[ {def getName: String}].getName).toArray }) //获取数据 spark.sql("select parse(content[1]) as keywords,id from table where size(content) > 2").write.options(Map("format"->"csv")).save("/tmp/words_anlysis") spark.read.csv("/tmp/words_anlysis").show(10)

这样就得到了所有分好词的文本。

接着使用word2vec来训练:

%spark val input = spark.read.csv("/tmp/words_anlysis").rdd.map(row=> row.getString(0)) val word2vec = new Word2Vec() val model = word2vec.fit(input) val result = model.getVectors.map(f=>s""" ${f._1} ${f._2.mkString(",")} """) sc.parallelize(result.toSeq,1).saveAsTextFile("/tmp/word_embedding")

值得注意的是,这些都是在zepplin完成的,你也可以写个spark程序来完成。

03

使用CNN卷积做分类

详细Tensorflow的代码我已经贴到gist上了: nlp-cnn.py(https://gist.github.com/allwefantasy/fc4b2b560759bec700a4a413bdfd5fa1)。我Python也才刚学没一会,写的时候也是不断的到google里去问,为了能够先run起来,我把训练数据全部载入到内存。最好还是应该采用部分预加载的方式,或者使用tensorflow queue的机制来喂数据,否则数据量大了,内存就不够用了。

关于CNN现在文章已经多的不要不要了,大家先参考其他文章学个大概,我这里主要介绍一些我在实际操作中遇到的一些问题。

首先定义输入输出:

VOCAB_SIZE = 100 NUM_CLASSES = 2 SEQUENCE_LENGTH = 59 input_x = tf.placeholder(tf.float32, [None, SEQUENCE_LENGTH, VOCAB_SIZE, 1], name="input_x") input_y = tf.placeholder(tf.float32, [None, NUM_CLASSES], name="input_y")

我们以输入为例,我们需要构建的一个数组应该是这样的:

[//某次迭代需要的文档数量 [ //某个文档的长度 SEQUENCE_LENGTH [ // 词向量的长度 VOCAB_SIZE [//输入通道,为1], .... ], .... ] ]

数组的第一层对应 None,也就是说不确定最外层数组的大小。第二层的大小对应SEQUENCE_LENGTH,也就文档的词长度,第三层对应词向量,也就是100,最后一层对应输入通道,图片是RGB 那么就是3通道,我们这里是1。 所以构建数据格式也是我一开始疑惑的一个地方,如何构建一个适合 CNN输入的数据格式。

第二个问题就是卷积网络的构建,我们分析下具体的代码,里面还要做一些计算:

def conv_layer(input, size_in, size_out, width=VOCAB_SIZE, name="conv"): with tf.name_scope(name): w = tf.Variable(tf.truncated_normal([VOCAB_WINDOW, width, size_in, size_out], stddev=0.1), name="W") //w 定义了filter窗口大小,一般而言,字段顺序都是先高后宽 //size_in是输入的通道数。size_out则是输出通道。后面我们会解释这个size_in,size_out b = tf.Variable(tf.constant(0.1, shape={size_out}, name="B")) //如果你熟悉系线性回归公式,wx+b 你就能清楚这个b的含义了。 conv = tf.nn.conv2d(input, w, strides=[1, 1, 1, 1], padding="VALID") act = tf.nn.relu(conv + b) tf.summary.histogram("weights", w) tf.summary.histogram("biases", b) tf.summary.histogram("activations", act) return tf.nn.max_pool(act, ksize=[1, VOCAB_WINDOW, 1, 1], strides=[1, 1, 1, 1], padding="VALID")

首先,VOCAB_WINDOW, width 定了了你的卷积框的大小。其次,size_in其实指的是我们前面说的文本矩阵的通道数,这里是1,如果是彩图那么是3,如果是灰度图是1。我们也可以通过不同的方式对同一端文本构建新的矩阵,那么就可以设置为多个通道。

size_out 则是你任意指定的,主要定义你想捕捉到多少个特征,一个特征对应一个卷积后的二维向量。形象上说,就是我扫描原来的那张二维向量多少次,我这里第一次卷积操作设置为64,第二个卷积操作设置为128,也就是我第一次扫描输入的图片64次,得到64个新的图,第二次又对新的图(这64个新图会被第二次扫描器看成一张新图),扫描128次,得到128个新图。

b 为啥是一个size_out大小的一维张量呢?我们说CNN会阐述共享,就是一次卷积操作,也就是把图片扫描一遍,会共用一组参数。

下面一段代码设计到了很多数字,这些数字都是计算出来的。

conv1 = conv_layer(input_x, 1, 64, VOCAB_SIZE, "conv1") conv_out = conv_layer(conv1, 64, 128, 1, "conv2") tf.add_to_collection('conv_c', conv_out) flattened = tf.reshape(conv_out, [-1, 51 * 128])

我们知道,初始输入是 59X100,这个是原始的文本矩阵大小。 经过了两次卷积,两次max-pool,那么最后把卷积输出的向量张开成一维的的大小后是51*128 的长度。那么这个值是怎么计算出来的。

卷积和池化的stride 都是1,然后第一次卷积的框大小是: 3X100。 因为padding采用了VALID, 计算卷积后的向量大小是通过如下公式完成的:

Ø 若padding=VALID,output_size = (input_size - filter_size + stride) / stride; Ø 若padding=SAME,output_size = (input_size + stride - 1) / stride;

所以第一次卷积后高变成了 (59-3+1)/1 = 57 ,宽变成了1。 接着再进行一次大小为 3X1的池化操作,按相同的公式计算,变成了 55X1。 接着再进行一次卷积,一次池化,变成了 51X1。 所以每个通道都是一个51维的向量。第二次卷积,我们一共有128个通道,也就是经过前面的运算,我们一共产出了 128个51X1的向量,然后把这些向量都拼接起来,长度就是 51 * 128了。

fc层就是一个全连接的网络了,没啥可说的。这里还有一个问题,有时候我们希望能够把最后产生的那51个128维的向量给提取出来,因为这些向量是CNN对某个内容做完分析后抽取出来的特征。

一开始我没啥思路,后面想想,其实tensorflow里conv不过是一个op,是op就代表是可以运行的,所以通过如下代码,就可以把每轮迭代的向量输出了:

[train_accuracy, s, train_conv_out] = sess.run([accurate, summ, conv_out], feed_dict={input_x: batdch_data, input_y: batch_y}) print(train_conv_out)

04

总结

因为现在还在Tunning,还没有得出实际的准确率。不过在实际操作中,通过组合使用spark + tensorflow, 然后使用zepplin 进行交互操作,整个过程还是相当让人愉悦的。