简单的MapReduce程序(Hadoop2.2.0)
本文是对Hadoop2.2.0版本的MapReduce进行详细讲解。请大家要注意版本,因为Hadoop的不同版本,源码可能是不同的。
1.获取源码
大家可以下载Hbase
Hbase: hbase-0.98.9-hadoop2-bin.tar.gz
在里面就包含了Hadoop2.2.0版本的jar文件和源码。
2.WordCount案例分析
在做详解之前,我们先来看一个例子,就是在一个文件中有一下的内容
hello hongten 1 hello hongten 2 hello hongten 3 hello hongten 4 hello hongten 5 ...... ......
文件中每一行包含一个hello,一个hongten,然后在每一行最后有一个数字,这个数字是递增的。
我们要统计这个文件里面的单词出现的次数(这个可以在网上找到很多相同的例子)
首先,我们要产生这个文件,大家可以使用以下的java代码生成这个文件
1 import java.io.BufferedWriter; 2 import java.io.File; 3 import java.io.FileWriter; 4 5 /** 6 * @author Hongten 7 * @created 11 Nov 2018 8 */ 9 public class GenerateWord { 10 11 public static void main(String[] args) throws Exception { 12 13 double num = 12000000; 14 15 StringBuilder sb = new StringBuilder(); 16 for(int i=1;i<num;i++){ 17 sb.append("hello").append(" ").append("hongten").append(" ").append(i).append("\n"); 18 } 19 20 File writename = new File("/root/word.txt"); 21 writename.createNewFile(); 22 BufferedWriter out = new BufferedWriter(new FileWriter(writename)); 23 out.write(sb.toString()); 24 out.flush(); 25 out.close(); 26 System.out.println("done."); 27 } 28 }
进入Linux系统,编译GenerateWord.java文件
javac GenerateWord.java
编译好了以后,会生成GenerateWord.class文件,然后执行
java GenerateWord
等待一段时间....就会生成这个文件了(大概252MB左右)。
接下来,我们来写统计单词的map,reduce,以及客户端的实现。
项目结构
这里总共有三个java文件
客户端
首先,我们需要定义Configuration和job,然后就是job的set操作,最后到job.waitForCompletion()方法,才触发了动作的提交。
这里可以理解为在客户端,包含了一个配置分布式运行的相关配置信息,最后提交动作。
1 package com.b510.hongten.hadoop; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Job; 8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 9 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 10 11 /** 12 * @author Hongten 13 * @created 11 Nov 2018 14 */ 15 public class WordCount { 16 17 public static void main(String[] args) throws Exception { 18 //读取配置文件 19 Configuration conf = new Configuration(); 20 //创建job 21 Job job = Job.getInstance(conf); 22 23 // Create a new Job 24 job.setJarByClass(WordCount.class); 25 26 // Specify various job-specific parameters 27 job.setJobName("wordcount"); 28 29 job.setMapperClass(MyMapper.class); 30 job.setMapOutputKeyClass(Text.class); 31 job.setMapOutputValueClass(IntWritable.class); 32 33 job.setReducerClass(MyReducer.class); 34 job.setOutputKeyClass(Text.class); 35 job.setOutputValueClass(IntWritable.class); 36 37 // job.setInputPath(new Path("/usr/input/wordcount")); 38 // job.setOutputPath(new Path("/usr/output/wordcount")); 39 40 FileInputFormat.addInputPath(job, new Path("/usr/input/wordcount1")); 41 42 Path output = new Path("/usr/output/wordcount"); 43 if (output.getFileSystem(conf).exists(output)) { 44 output.getFileSystem(conf).delete(output, true); 45 } 46 47 FileOutputFormat.setOutputPath(job, output); 48 49 // Submit the job, then poll for progress until the job is complete 50 job.waitForCompletion(true); 51 52 } 53 }
自定义的Mapper
1 package com.b510.hongten.hadoop; 2 3 import java.io.IOException; 4 import java.util.StringTokenizer; 5 6 import org.apache.hadoop.io.IntWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Mapper; 9 10 /** 11 * @author Hongten 12 * @created 11 Nov 2018 13 */ 14 public class MyMapper extends Mapper<Object, Text, Text, IntWritable> { 15 16 private final static IntWritable one = new IntWritable(1); 17 private Text word = new Text(); 18 19 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 20 StringTokenizer itr = new StringTokenizer(value.toString()); 21 while (itr.hasMoreTokens()) { 22 word.set(itr.nextToken()); 23 context.write(word, one); 24 } 25 } 26 27 }
自定义的Reduce
1 package com.b510.hongten.hadoop; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Reducer; 8 9 /** 10 * @author Hongten 11 * @created 11 Nov 2018 12 */ 13 public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 14 15 private IntWritable result = new IntWritable(); 16 17 public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 18 int sum = 0; 19 for (IntWritable val : values) { 20 sum += val.get(); 21 } 22 result.set(sum); 23 context.write(key, result); 24 } 25 26 }
运行并查看结果
cd /home/hadoop-2.5/bin/ --创建测试文件夹 ./hdfs dfs -mkdir -p /usr/input/wordcount1 --把测试文件放入测试文件夹 ./hdfs dfs -put /root/word.txt /usr/input/wordcount1 --运行测试 ./hadoop jar /root/wordcount.jar com.b510.hongten.hadoop.WordCount --下载hdfs上面的文件 ./hdfs dfs -get /usr/output/wordcount/* ~/ --查看文件最后5行 tail -n5 /root/part-r-00000
运行结果
从yarn客户端可以看到程序运行的时间长度
从11:47:46开始,到11:56:48结束,总共9min2s.(这是在我机器上面的虚拟机里面跑的结果,如果在真正的集群里面跑的话,应该要快很多)
数据条数:12000000-1条
3.客户端源码分析
当我们在客户端进行了分布式作业的配置后,最后执行
// Submit the job, then poll for progress until the job is complete job.waitForCompletion(true);
那么在waiteForCompletion()方法里面都做了些什么事情呢?
//我们传递的verbose=true public boolean waitForCompletion(boolean verbose ) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { //提交动作 submit(); } //verbose=true if (verbose) { //监控并且打印job的相关信息 //在客户端执行分布式作业的时候,我们能够看到很多输出 //如果verbose=false,我们则看不到作业输出信息 monitorAndPrintJob(); } else { // get the completion poll interval from the client. int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf()); while (!isComplete()) { try { Thread.sleep(completionPollIntervalMillis); } catch (InterruptedException ie) { } } } //返回作业的状态 return isSuccessful(); }
这个方法里面最重要的就是submit()方法,提交分布式作业。所以,我们需要进入submit()方法。
public void submit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); //设置新的API,我使用的2.2.0的HadoopAPI,区别于之前的API setUseNewAPI(); //和集群做连接,集群里面做出相应,分配作业ID connect(); final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { //提交作业 /* Internal method for submitting jobs to the system. The job submission process involves: 1. Checking the input and output specifications of the job. 2. Computing the InputSplits for the job. 3. Setup the requisite accounting information for the DistributedCache of the job, if necessary. 4. Copying the job's jar and configuration to the map-reduce system directory on the distributed file-system. 5. Submitting the job to the JobTracker and optionally monitoring it's status. */ //在这个方法里面包含5件事情。 //1.检查输入和输出 //2.为每个job计算输入切片的数量 //3.4.提交资源文件 //5.提交作业,监控状态 //这里要注意的是,在2.x里面,已经没有JobTracker了。 //JobTracker is no longer used since M/R 2.x. //This is a dummy JobTracker class, which is used to be compatible with M/R 1.x applications. return submitter.submitJobInternal(Job.this, cluster); } }); state = JobState.RUNNING; LOG.info("The url to track the job: " + getTrackingURL()); }
所以我们需要进入submitter.submitJObInternal()方法去看看里面的实现。
//在这个方法里面包含5件事情。 //1.检查输入和输出 //2.为每个job计算输入切片的数量 //3.4.提交资源文件 //5.提交作业,监控状态 //这里要注意的是,在2.x里面,已经没有JobTracker了。 JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException { //validate the jobs output specs checkSpecs(job); Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, job.getConfiguration()); //configure the command line options correctly on the submitting dfs Configuration conf = job.getConfiguration(); InetAddress ip = InetAddress.getLocalHost(); if (ip != null) { submitHostAddress = ip.getHostAddress(); submitHostName = ip.getHostName(); conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName); conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress); } JobID jobId = submitClient.getNewJobID(); //设置Job的ID job.setJobID(jobId); Path submitJobDir = new Path(jobStagingArea, jobId.toString()); JobStatus status = null; try { conf.set(MRJobConfig.USER_NAME, UserGroupInformation.getCurrentUser().getShortUserName()); conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer"); conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString()); LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir"); // get delegation token for the dir TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { submitJobDir }, conf); populateTokenCache(conf, job.getCredentials()); // generate a secret to authenticate shuffle transfers if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) { KeyGenerator keyGen; try { keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM); keyGen.init(SHUFFLE_KEY_LENGTH); } catch (NoSuchAlgorithmException e) { throw new IOException("Error generating shuffle secret key", e); } SecretKey shuffleKey = keyGen.generateKey(); TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(), job.getCredentials()); } copyAndConfigureFiles(job, submitJobDir); Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); // Create the splits for the job LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir)); //写切片信息,我们主要关系这个方法 :)) int maps = writeSplits(job, submitJobDir); conf.setInt(MRJobConfig.NUM_MAPS, maps); LOG.info("number of splits:" + maps); // write "queue admins of the queue to which job is being submitted" // to job file. String queue = conf.get(MRJobConfig.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME); AccessControlList acl = submitClient.getQueueAdmins(queue); conf.set(toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString()); // removing jobtoken referrals before copying the jobconf to HDFS // as the tasks don't need this setting, actually they may break // because of it if present as the referral will point to a // different job. TokenCache.cleanUpTokenReferral(conf); if (conf.getBoolean( MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED, MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) { // Add HDFS tracking ids ArrayList<String> trackingIds = new ArrayList<String>(); for (Token<? extends TokenIdentifier> t : job.getCredentials().getAllTokens()) { trackingIds.add(t.decodeIdentifier().getTrackingId()); } conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS, trackingIds.toArray(new String[trackingIds.size()])); } // Write job file to submit dir writeConf(conf, submitJobFile); // // Now, actually submit the job (using the submit name) // //到这里才真正提交job printTokens(jobId, job.getCredentials()); status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials()); if (status != null) { return status; } else { throw new IOException("Could not launch job"); } } finally { if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); if (jtFs != null && submitJobDir != null) jtFs.delete(submitJobDir, true); } } }
在这里我们关心的是
int maps = writeSplits(job, submitJobDir);
进入writeSplites()方法
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { //可以从job里面获取configuration信息 JobConf jConf = (JobConf)job.getConfiguration(); int maps; if (jConf.getUseNewMapper()) { //调用新的切片方法,我们使用的2.x的hadoop,因此 //使用的是新的切片方法 maps = writeNewSplits(job, jobSubmitDir); } else { //旧的切片方法 maps = writeOldSplits(jConf, jobSubmitDir); } return maps; }
我们使用的版本是2.x,所以,我们使用writeNewSplites()方法。
@SuppressWarnings("unchecked") private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { //可以从job里面获取configuration信息 Configuration conf = job.getConfiguration(); //通过反射获取一个输入格式化 //这里面返回的是TextInputFormat,即默认值 InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf); // == 1 == //输入格式化进行切片计算 List<InputSplit> splits = input.getSplits(job); // == 2 == T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]); // sort the splits into order based on size, so that the biggest // go first Arrays.sort(array, new SplitComparator()); JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array); return array.length; }
我们看到‘== 1 ==’,这里是获取输入格式化,进入job.getInputFormatClass()方法
@SuppressWarnings("unchecked") public Class<? extends InputFormat<?,?>> getInputFormatClass() throws ClassNotFoundException { //如果配置信息里面INPUT_FORMAT_CLASS_ATTR(mapreduce.job.inputformat.class)没有配置 //则返回TextInputFormat //如果有配置,则返回我们配置的信息 //意思是:默认值为TextInputFormat return (Class<? extends InputFormat<?,?>>) conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class); }
我们看到,系统默认的输入格式化为TextInputFormat。
我们看到‘== 2 ==’,这里从输入格式化里面进行切片计算。那么我们进入getSplites()方法
public List<InputSplit> getSplits(JobContext job) throws IOException { //minSize = Math.max(1, 1L)=1 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); // == A == //maxSize = Long.MAX_VALUE long maxSize = getMaxSplitSize(job); // == B == // generate splits List<InputSplit> splits = new ArrayList<InputSplit>(); //获取输入文件列表 List<FileStatus> files = listStatus(job); //遍历文件列表 for (FileStatus file: files) { //一个文件一个文件的处理 //然后计算文件的切片 Path path = file.getPath(); //文件大小 long length = file.getLen(); if (length != 0) { BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { //通过路径获取FileSystem FileSystem fs = path.getFileSystem(job.getConfiguration()); //获取文件所有块信息 blkLocations = fs.getFileBlockLocations(file, 0, length); } //判断文件是否可以切片 if (isSplitable(job, path)) { //可以切片 //获取文件块大小 long blockSize = file.getBlockSize(); //切片大小 splitSize = blockSize //默认情况下,切片大小等于块的大小 long splitSize = computeSplitSize(blockSize, minSize, maxSize); // == C == long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //块的索引 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); // == D == //切片详细信息 splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts())); } } else { // not splitable //不可切片 splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts())); } } else { //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } // Save the number of input files for metrics/loadgen job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); LOG.debug("Total # of splits: " + splits.size()); return splits; }
我们看‘== A ==’, getFormatMinSplitSize()方法返回1,getMinSplitSize()方法返回1L。
protected long getFormatMinSplitSize() { return 1; } public static long getMinSplitSize(JobContext job) { //如果我们在配置文件中有配置SPLIT_MINSIZE(mapreduce.input.fileinputformat.split.minsize),则取配置文件里面的 //否则返回默认值1L //这里我们,没有配置,所以返回1L return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L); }
我们看‘== B ==’,getMaxSplitSize()方法返回Long.MAX_VALUE(我们没有进行对SPLIT_MAXSIZE进行配置)
public static long getMaxSplitSize(JobContext context) { //如果我们在配置文件中有配置SPLIT_MAXSIZE(mapreduce.input.fileinputformat.split.maxsize),则取配置文件里面的 //否则返回默认值Long.MAX_VALUE //这里我们,没有配置,所以返回Long.MAX_VALUE return context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE); }
我们看‘== C ==’,在我们没有进行配置的情况下,切片大小等于块大小。
//minSize=1 //maxSize=Long.MAX_VALUE protected long computeSplitSize(long blockSize, long minSize, long maxSize) { //Math.min(maxSize, blockSize) -> Math.min(Long.MAX_VALUE, blockSize) -> blockSize //Math.max(minSize, blockSize) -> Math.max(1, blockSize) -> blockSize return Math.max(minSize, Math.min(maxSize, blockSize)); }
我们看‘== D ==’,通过偏移量获取块的索引信息。
protected int getBlockIndex(BlockLocation[] blkLocations, long offset) { //通过偏移量获取块的索引 for (int i = 0 ; i < blkLocations.length; i++) { // is the offset inside this block? if ((blkLocations[i].getOffset() <= offset) && (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){ return i; } } BlockLocation last = blkLocations[blkLocations.length -1]; long fileLength = last.getOffset() + last.getLength() -1; throw new IllegalArgumentException("Offset " + offset + " is outside of file (0.." + fileLength + ")"); }
4.小结
用通俗的语言来描述上面的事情,可以用下面的图来说明:
系统默认的块大小为128MB,在我们没有进行其他配置的时候,块大小等于切片大小。
Type1:块大小为45MB,小于系统默认大小128MB,
切片信息:path, 0, 45, [3, 8, 10]
切片信息:文件的位置path, 偏移量0, 切片大小45, 块的位置信息[3, 8, 10]=该文件(块)存在HDFS文件系统的datanode3,datanode8,datanode10上面。
Type2:块大小为128MB,即等于系统默认大小128MB,不会分成两个快,和Type1一样。
Type3:块大小为414MB,即大于系统默认128MB,那么在我们上传该文件到HDFS的时候,系统就会把该文件分成很多块,每一块128MB,每一块128MB,直到分完为止,最后剩下30MB单独为一块。那么,每一个切片信息由文件位置path, 偏移量,切片大小, 块的位置信息构成。我们把这一串信息称为文件的切片清单。
当系统拿到了文件的切片清单了以后,那么就会把这些清单提交给分布式系统,再由分布式系统去处理各个切片。
5.Mapper详解
5.1.map输入
map从HDFS获取输入流,然后定位到切片的位置,除了第一个切片,其他切片都是从第二行开始读取数据进行处理。
在org.apache.hadoop.mapred.MapTask里面,包含了run()方法
//org.apache.hadoop.mapred.MapTask public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException { this.umbilical = umbilical; if (isMapTask()) { // If there are no reducers then there won't be any sort. Hence the map // phase will govern the entire attempt's progress. //我们在客户端可以设置reduce的个数 // job.setNumReduceTasks(10); //如果没有Reduce,只有map阶段, if (conf.getNumReduceTasks() == 0) { //那么就执行这行 mapPhase = getProgress().addPhase("map", 1.0f); } else { // If there are reducers then the entire attempt's progress will be // split between the map phase (67%) and the sort phase (33%). //只要有Reduce阶段, mapPhase = getProgress().addPhase("map", 0.667f); //就要加入排序 sortPhase = getProgress().addPhase("sort", 0.333f); } } TaskReporter reporter = startReporter(umbilical); boolean useNewApi = job.getUseNewMapper(); initialize(job, getJobID(), reporter, useNewApi); // check if it is a cleanupJobTask if (jobCleanup) { runJobCleanupTask(umbilical, reporter); return; } if (jobSetup) { runJobSetupTask(umbilical, reporter); return; } if (taskCleanup) { runTaskCleanupTask(umbilical, reporter); return; } //是否使用新的API if (useNewApi) { //我们使用的是new mapper runNewMapper(job, splitMetaInfo, umbilical, reporter); } else { runOldMapper(job, splitMetaInfo, umbilical, reporter); } done(umbilical, reporter); }
我们进入到runNewMapper()方法,我们可以看到整个map的宏观动作
1.输入初始化
2.调用org.apache.hadoop.mapreduce.Mapper.run()方法
3.更新状态
4.关闭输入
5.关闭输出
@SuppressWarnings("unchecked") private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewMapper(final
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- RCE远程控制Windows服务器——以win10虚拟机为例
- 解决InnoDB: Table mysql/innodb_index_stats has length mismatch
- R语言中对文本数据进行主题模型topic modeling分析
- QT学习第3天:QSlider使用方法
- qt学习第2天:QRadioButtonTest+ButtonGroup单选后提示消息,QComBox
- Python使用矩阵分解法找到类似的音乐
- SpringBoot集成ELK实现日志收集实践
- python在Scikit-learn中用决策树和随机森林预测NBA获胜者
- R语言:用R语言填补缺失的数据
- R语言如何和何时使用glmnet岭回归
- r语言中对LASSO回归,Ridge岭回归和Elastic Net模型实现
- cmd里如何查看历史命令并执行
- akka-typed(10) - event-sourcing, CQRS实战
- 【每日一题】37. Sudoku Solver
- A quick introduction to innodb_ruby (2.对innodb_ruby的简单介绍)