添加依赖项 flink-java
和 flink-client
(如 JVM 环境设置示例中所述)。
public class WordCount{
public static void main( String[] args ) throws Exception{
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// input data
// you can also use env.readTextFile(...) to get words
DataSet<String> text = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,"
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap( new LineSplitter() )
// group by the tuple field "0" and sum up tuple field "1"
.groupBy( 0 )
.aggregate( Aggregations.SUM, 1 );
// emit result
public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>>{
public void flatMap( String value, Collector<Tuple2<String, Integer>> out ){
// normalize and split the line into words
String[] tokens = value.toLowerCase().split( "\\W+" );
// emit the pairs
for( String token : tokens ){
if( token.length() > 0 ){
out.collect( new Tuple2<String, Integer>( token, 1 ) );
如果使用 Java 8,则可以用 lambda 表达式替换 .flatmap(new LineSplitter())
DataSet<Tuple2<String, Integer>> counts = text
// split up the lines in pairs (2-tuples) containing: (word,1)
.flatMap( ( String value, Collector<Tuple2<String, Integer>> out ) -> {
// normalize and split the line into words
String[] tokens = value.toLowerCase().split( "\\W+" );
// emit the pairs
for( String token : tokens ){
if( token.length() > 0 ){
out.collect( new Tuple2<>( token, 1 ) );
} )
// group by the tuple field "0" and sum up tuple field "1"
.groupBy( 0 )
.aggregate( Aggregations.SUM, 1 );
在 IDE 中 :只需在 IDE 中运行即可。Flink 将在 JVM 中创建一个环境。
从 flink 命令行 :使用独立的本地环境运行程序,执行以下操作:
确保 flink 正在运行(
); -
创建一个 jar 文件(
maven package
); -
命令行工具(在 flink 安装的bin
文件夹中)启动程序:flink run -c your.package.WordCount target/your-jar.jar
选项允许你指定要运行的类。如果 jar 是可执行的/定义主类是没有必要的。