字数
Maven
添加依赖项 flink-java
和 flink-client
(如 JVM 环境设置示例中所述)。
代码
public class WordCount{
public static void main( String[] args ) throws Exception{
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// input data
// you can also use env.readTextFile(...) to get words
DataSet<String> text = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,"
);
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap( new LineSplitter() )
// group by the tuple field "0" and sum up tuple field "1"
.groupBy( 0 )
.aggregate( Aggregations.SUM, 1 );
// emit result
counts.print();
}
}
LineSplitter.java
:
public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>>{
public void flatMap( String value, Collector<Tuple2<String, Integer>> out ){
// normalize and split the line into words
String[] tokens = value.toLowerCase().split( "\\W+" );
// emit the pairs
for( String token : tokens ){
if( token.length() > 0 ){
out.collect( new Tuple2<String, Integer>( token, 1 ) );
}
}
}
}
如果使用 Java 8,则可以用 lambda 表达式替换 .flatmap(new LineSplitter())
:
DataSet<Tuple2<String, Integer>> counts = text
// split up the lines in pairs (2-tuples) containing: (word,1)
.flatMap( ( String value, Collector<Tuple2<String, Integer>> out ) -> {
// normalize and split the line into words
String[] tokens = value.toLowerCase().split( "\\W+" );
// emit the pairs
for( String token : tokens ){
if( token.length() > 0 ){
out.collect( new Tuple2<>( token, 1 ) );
}
}
} )
// group by the tuple field "0" and sum up tuple field "1"
.groupBy( 0 )
.aggregate( Aggregations.SUM, 1 );
执行
在 IDE 中 :只需在 IDE 中运行即可。Flink 将在 JVM 中创建一个环境。
从 flink 命令行 :使用独立的本地环境运行程序,执行以下操作:
-
确保 flink 正在运行(
flink/bin/start-local.sh
); -
创建一个 jar 文件(
maven package
); -
使用
flink
命令行工具(在 flink 安装的bin
文件夹中)启动程序:flink run -c your.package.WordCount target/your-jar.jar
-c
选项允许你指定要运行的类。如果 jar 是可执行的/定义主类是没有必要的。
结果
(a,1)
(against,1)
(and,1)
(arms,1)
(arrows,1)
(be,2)
(fortune,1)
(in,1)
(is,1)
(mind,1)
(nobler,1)
(not,1)
(of,2)
(or,2)
(outrageous,1)
(question,1)
(sea,1)
(slings,1)
(suffer,1)
(take,1)
(that,1)
(the,3)
(tis,1)
(to,4)
(troubles,1)
(whether,1)