1 前言

Spark这么火,越来越多的小伙伴开始搞大数据。
通过多方查阅资料,这个单机版的Spark的HelloWorld终于跑出来了。
此HelloWorld非彼HelloWorld,并不是打印出HelloWorld那么简单,而是一个单词统计程序,就是统计出一个文件中单词出现的次数并排序。

会通过原生的scala的方式,传统的java方式和java8的方式分别实现同一功能。

其实单机版和运行于集群之上的Spark程序,差别就在于运行环境,开发流程是一样的。
以后的文章会记录如何建立集群。

另外,该系列文章会在本人闲暇时同时在 CSDN简书 更新。

欢迎各位道友纠错。

2 环境搭建

本人所使用环境如下:

1
2
3
4
5
6
7
C:\Users\hylexus>java -version
java version "1.8.0_91"
Java(TM) SE Runtime Environment (build 1.8.0_91-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.91-b14, mixed mode)
C:\Users\hylexus>scala -version
Scala code runner version 2.11.8 -- Copyright 2002-2016, LAMP/EPFL

Eclipse for scala:

1
2
Scala IDE build of Eclipse SDK
Build id: 4.4.1-vfinal-2016-05-04T11:16:00Z-Typesafe

此处scala版和java版都将使用maven来管理依赖,如何使用maven创建scala工程,请看本人另一文章 http://blog.csdn.net/hylexus/article/details/52602774

注意:使用的spark-core_2.11依赖的jar文件多的吓人,耐心等待下载jar吧…………^_^

2.1 scala版

pom.xml部分内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
<properties>
<maven.compiler.source>1.6</maven.compiler.source>
<maven.compiler.target>1.6</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.5</scala.version>
<scala.compat.version>2.11</scala.compat.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<!-- Test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.specs2</groupId>
<artifactId>specs2-junit_2.11</artifactId>
<version>2.4.16</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<!-- see http://davidb.github.com/scala-maven-plugin -->
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<!-- <arg>-make:transitive</arg> -->
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<compilerArguments>
<!-- <extdirs>src/main/webapp/plugins/ueditor/jsp/lib</extdirs> -->
</compilerArguments>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<!-- If you have classpath issue like NoDefClassError,... -->
<!-- useManifestOnlyJar>false</useManifestOnlyJar -->
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>

2.2 java版

pom.xml文件内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<compilerArguments>
<!-- <extdirs>src/main/webapp/plugins/ueditor/jsp/lib</extdirs> -->
</compilerArguments>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<!-- If you have classpath issue like NoDefClassError,... -->
<!-- useManifestOnlyJar>false</useManifestOnlyJar -->
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>

3 代码

3.1 scala-低调版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("WordCount") //
.setMaster("local")
val sc = new SparkContext(conf)
val fileName = "E:/workspace/eclipse/scala/spark-base/src/main/scala/log4j.properties"
//获取文件内容
val lines = sc.textFile(fileName, 1)
//分割单词,此处仅以空格分割作为示例
val words = lines.flatMap(line => line.split(" "))
//String===>(word,count),count==1
val pairs = words.map(word => (word, 1))
//(word,1)==>(word,count)
val result = pairs.reduceByKey((word, acc) => word + acc)
//sort by count DESC
val sorted=result.sortBy(e => { e._2 }, false, 1)
val mapped=sorted.map(e => (e._2, e._1))
mapped.foreach(e => { println("【" + e._1 + "】出现了" + e._2 + "次") })
sc.stop()
}
}

3.2 scala-高调版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf
conf.setAppName("rank test").setMaster("local")
val sc = new SparkContext(conf)
val fileName = "E:/workspace/eclipse/scala/spark-base/src/main/scala/log4j.properties"
sc.textFile(fileName, 1) //lines
.flatMap(_.split(" ")) //all words
.map(word => (word, 1)) //to pair
.reduceByKey(_ + _) //count
.map(e => (e._2, e._1)) //
.sortByKey(false, 1) //
.map(e => (e._2, e._1)) //
.foreach(e => { println("【" + e._1 + "】出现了" + e._2 + "次") })
sc.stop()
}
}

3.3 java-传统版

代码恶心的没法看啊……
到处都是匿名内部类……
还好有java8的lambda来拯救你

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import java.util.Arrays;
import java.util.Iterator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
public class WordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("WordCounter")//
.setMaster("local");
String fileName = "E:/workspace/eclipse/scala/spark-base/src/main/scala/log4j.properties";
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile(fileName, 1);
JavaRDD<String> words = lines
.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 1L;
// 以前的版本好像是Iterable而不是Iterator
@Override
public Iterator<String> call(String line) throws Exception {
return Arrays.asList(line.split(" ")).iterator();
}
});
JavaPairRDD<String, Integer> pairs = words
.mapToPair(new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(String word)
throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
JavaPairRDD<String, Integer> result = pairs.reduceByKey(
new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer e, Integer acc)
throws Exception {
return e + acc;
}
}, 1);
result.map(
new Function<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(
Tuple2<String, Integer> v1) throws Exception {
return new Tuple2<>(v1._1, v1._2);
}
})//
.sortBy(new Function<Tuple2<String, Integer>, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Tuple2<String, Integer> v1)
throws Exception {
return v1._2;
}
}, false, 1)//
.foreach(new VoidFunction<Tuple2<String, Integer>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Integer> e)
throws Exception {
System.out.println("【" + e._1 + "】出现了" + e._2 + "次");
}
});
sc.close();
}
}

3.4 java-lambda版

用上java8的lambda之后,还是挺清爽的嘛^_^

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
public class WordCountByJava8 {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("WordCounter")//
.setMaster("local");
String fileName = "src/main/java/hylexus/spark/test1/WordCountByJava8.java";
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile(fileName, 1);
lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator())
.mapToPair(word -> new Tuple2<>(word, 1))
.reduceByKey((e, acc) -> e + acc, 1)
.map(e -> new Tuple2<>(e._1, e._2))
.sortBy(e -> e._2, false, 1)
.foreach(e -> {
System.out.println("【" + e._1 + "】出现了" + e._2 + "次");
});
sc.close();
}
}

4 运行效果

1
2
3
4
5
6
7
8
9
//...............
//...............
【->】出现了6次
【+】出现了5次
【import】出现了5次
【new】出现了4次
【=】出现了4次
//...............
//...............