使用 JAVA 的 Spark DataFrames
DataFrame 是组织到命名列中的分布式数据集合。它在概念上等同于关系数据库中的表。DataFrame 可以从多种来源构建,例如:结构化数据文件,Hive 中的表,外部数据库或现有 RDD。
将 Oracle RDBMS 表读入 spark 数据帧::
SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");
sparkConf.registerKryoClasses(new Class<?>[]{
Class.forName("org.apache.hadoop.io.Text"),
Class.forName("packageName.className")
});
JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
SQLContext sqlcontext= new SQLContext(sparkContext);
Map<String, String> options = new HashMap();
options.put("driver", "oracle.jdbc.driver.OracleDriver");
options.put("url", "jdbc:oracle:thin:username/password@host:port:orcl"); //oracle url to connect
options.put("dbtable", "DbName.tableName");
DataFrame df=sqlcontext.load("jdbc", options);
df.show(); //this will print content into tablular format
如果需要,我们还可以将此数据帧转换回 rdd:
JavaRDD<Row> rdd=df.javaRDD();
从文件创建数据框:
public class LoadSaveTextFile {
//static schema class
public static class Schema implements Serializable {
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
public String getMachId() {
return machId;
}
public void setMachId(String machId) {
this.machId = machId;
}
public String getSensorType() {
return sensorType;
}
public void setSensorType(String sensorType) {
this.sensorType = sensorType;
}
//instance variables
private String timestamp;
private String machId;
private String sensorType;
}
public static void main(String[] args) throws ClassNotFoundException {
SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer");
sparkConf.registerKryoClasses(new Class<?>[]{
Class.forName("org.apache.hadoop.io.Text"),
Class.forName("oracle.table.join.LoadSaveTextFile")
});
JavaSparkContext sparkContext=new JavaSparkContext(sparkConf);
SQLContext sqlcontext= new SQLContext(sparkContext);
//we have a file which ";" separated
String filePath=args[0];
JavaRDD<Schema> schemaRdd = sparkContext.textFile(filePath).map(
new Function<String, Schema>() {
public Schema call(String line) throws Exception {
String[] tokens=line.split(";");
Schema schema = new Schema();
schema.setMachId(tokens[0]);
schema.setSensorType(tokens[1]);
schema.setTimestamp(tokens[2]);
return schema;
}
});
DataFrame df = sqlcontext.createDataFrame(schemaRdd, Schema.class);
df.show();
}
}
现在我们有来自 oracle 的数据框以及文件。同样,我们也可以从蜂巢中读取一个表格。在数据框架中,我们可以像在 rdbms 中一样获取任何列。就像获取列或最大值的最小值一样。可以计算列的平均值/平均值。其他一些功能,如 select,filter,agg,groupBy 也可用。