使用 PySpark 從 S3 消耗資料
你可以使用兩種方法來使用 AWS S3 儲存桶中的資料。
- 使用 sc.textFile(或 sc.wholeTextFiles)API:此 api 也可用於 HDFS 和本地檔案系統。
aws_config = {} # set your aws credential here
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", aws_config['aws.secret.access.key'])
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", aws_config['aws.secret.access.key'])
s3_keys = ['s3n/{bucket}/{key1}', 's3n/{bucket}/{key2}']
data_rdd = sc.wholeTextFiles(s3_keys)
- 使用自定義 API 讀取它(說一個 boto 下載器):
def download_data_from_custom_api(key):
# implement this function as per your understanding (if you're new, use [boto][1] api)
# don't worry about multi-threading as each worker will have single thread executing your job
return ''
s3_keys = ['s3n/{bucket}/{key1}', 's3n/{bucket}/{key2}']
# numSlices is the number of partitions. You'll have to set it according to your cluster configuration and performance requirement
key_rdd = sc.parallelize(s3_keys, numSlices=16)
data_rdd = key_rdd.map(lambda key: (key, download_data_from_custom_api(key))
我建議使用方法 2,因為在使用方法 1 時,驅動程式會下載所有資料,而工作人員只需處理它。這有以下缺點:
- 隨著資料大小的增加,記憶體不足。
- 你的工作人員將閒置,直到資料下載完畢