使用外部 Sink
可以将表写入 TableSink,这是一个支持不同格式和文件系统的通用接口。批处理表只能写入 BatchTableSink
,而流表需要 StreamTableSink
。
目前,flink 仅提供 CsvTableSink
接口。
用法
在上面的示例中,替换:
DataSet<Row> result = tableEnv.toDataSet( table, Row.class );
result.print();
有:
TableSink sink = new CsvTableSink("/tmp/results", ",");
// write the result Table to the TableSink
table.writeToSink(sink);
// start the job
env.execute();
/tmp/results
是一个文件夹,因为 flink 做并行操作。因此,如果你有 4 个处理器,结果文件夹中可能会有 4 个文件。
另外,请注意我们明确地称之为 env.execute()
:这是启动 flink 工作所必需的,但在前面的例子中,print()
为我们做了。