Nowadays, there's a big chance that the training and test data are hosted in some cloud storage system. In this section, we are going to learn how to ingest data through Apache Spark from an object storage such as Amazon S3 (https://aws.amazon.com/s3/) or S3-based (such as Minio, https://www.minio.io/). The Amazon simple storage service (which is more popularly known as Amazon S3) is an object storage service part of the AWS cloud offering. While S3 is available in the public cloud, Minio is a high performance distributed object storage server compatible with the S3 protocol and standards that has been designed for large-scale private cloud infrastructures.
We need to add to the Scala project the Spark core and Spark SQL dependencies, and also the following:
groupId: com.amazonaws
artifactId: aws-java-sdk-core
version1.11.234
groupId: com.amazonaws
artifactId: aws-java-sdk-s3
version1.11.234
groupId: org.apache.hadoop
artifactId: hadoop-aws
version: 3.1.1
They are the AWS Java JDK core and S3 libraries, plus the Apache Hadoop module for AWS integration.
For this example, we need to have already created one existing bucket on S3 or Minio. For the readers not familiar with the S3 object storage, a bucket is similar to a file system directory, where users can store objects (data and the metadata that describe it). Then we need to upload a file in that bucket that would need to be read by Spark. The file used for this example is one generally available for download at the MonitorWare website (http://www.monitorware.com/en/logsamples/apache.php). It contains HTTP requests log entries in ASCII format. For the purpose of this example, we are assuming that the name of the bucket is dl4j-bucket and the uploaded file name is access_log. The first thing to do in our Spark program is to create a SparkSession, as follows
val sparkSession = SparkSession
.builder
.master(master)
.appName("Spark Minio Example")
.getOrCreate
In order to reduce noise on the output, let's set the log level for Spark to WARN, as follows
sparkSession.sparkContext.setLogLevel("WARN")
Now that the SparkSession has been created, we need to set up the S3 or Minio endpoint and the credentials for Spark to access it, plus some other properties, as follows:
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "http://<host>:<port>")
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "access_key")
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "secret")
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.path.style.access", "true")
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.connection.ssl.enabled", "false")
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
This is the meaning of the properties that have been set for the minimal configuration:
- fs.s3a.endpoint: The S3 or Minio endpoint.
- fs.s3a.access.key: The AWS or Minio access key ID.
- fs.s3a.secret.key: The AWS or Minio secret key.
- fs.s3a.path.style.access: Enables S3 path style access while disabling the default virtual hosting behavior.
- fs.s3a.connection.ssl.enabled: Specifies if SSL is enabled at the endpoint. Possible values are true and false.
- fs.s3a.impl: The implementation class of the S3AFileSystem that is used.
We are now ready to read the access_log file (or any other file) from a S3 or Minio bucket and load its content into a RDD, as follows:
val logDataRdd = sparkSession.sparkContext.textFile("s3a://dl4j-bucket/access_log")
println("RDD size is " + logDataRdd.count)
It is also possible to convert the RDD into a DataFrame and show the content on the output, as follows:
import sparkSession.implicits._
val logDataDf = logDataRdd.toDF
logDataDf.show(10, false)
This will provide the following output:
Once data has been loaded from objects stored into S3 or Minio buckets, any operation available in Spark for RDDs and Datasets can be used.