WordCount - 表 API
此示例与 WordCount 相同,但使用 Table API。有关执行和结果的详细信息,请参阅 WordCount 。
Maven
要使用 Table API,请将 flink-table
添加为 maven 依赖项:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.1.4</version>
</dependency>
代码
public class WordCountTable{
public static void main( String[] args ) throws Exception{
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
final BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment( env );
// get input data
DataSource<String> source = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles"
);
// split the sentences into words
FlatMapOperator<String, String> dataset = source
.flatMap( ( String value, Collector<String> out ) -> {
for( String token : value.toLowerCase().split( "\\W+" ) ){
if( token.length() > 0 ){
out.collect( token );
}
}
} )
// with lambdas, we need to tell flink what type to expect
.returns( String.class );
// create a table named "words" from the dataset
tableEnv.registerDataSet( "words", dataset, "word" );
// word count using an sql query
Table results = tableEnv.sql( "select word, count(*) from words group by word" );
tableEnv.toDataSet( results, Row.class ).print();
}
}
注意 :对于使用 Java <8 的版本,请使用匿名类替换 lambda:
FlatMapOperator<String, String> dataset = source.flatMap( new FlatMapFunction<String, String>(){
@Override
public void flatMap( String value, Collector<String> out ) throws Exception{
for( String token : value.toLowerCase().split( "\\W+" ) ){
if( token.length() > 0 ){
out.collect( token );
}
}
}
} );