In this example, I aimed to load a CSV file from a URL, clean and process the data using Apache Spark, and run a few SQL queries to analyze it. Here's what I did step by step:
I started by setting up a SparkSession. This is a required step for using Spark's DataFrame and SQL functionalities.
val spark = SparkSession.builder()
.appName("CSV from URL to DataFrame")
.master("local[*]") // Running locally; for a cluster, adjust this line
.getOrCreate()
Next, I defined the URL pointing to the CSV file I wanted to load. In this case, I used a sample CSV file about air travel data.
val urlfile = new URL("https://people.sc.fsu.edu/~jburkardt/data/csv/airtravel.csv")
To fetch the CSV file's content, I used the IOUtils class from Apache Commons IO. This class allows me to convert the content of the file at the given URL into a string.
val csvContent = IOUtils.toString(urlfile, "UTF-8")
Once I had the CSV content, I saved it to a temporary file on the local filesystem. I used java.nio.file.Files to write the content to a file.
val tempFilePath = "/tmp/airtravel.csv"
Files.write(Paths.get(tempFilePath), csvContent.getBytes("UTF-8"))
With the CSV file saved locally, I read it into a Spark DataFrame. I specified options like header (since the CSV has headers) and inferSchema (to automatically detect the data types of columns).
val testcsv = spark
.read
.option("header", "true")
.option("inferSchema", "true")
.csv(tempFilePath)
I noticed that the column names might have unwanted spaces or quotes, so I cleaned them by trimming spaces and removing any quotes.
val cleanedData = testcsv
.toDF(testcsv.columns.map(c => c.trim.replace("\"", "")): _*)
To verify that the data looked correct, I displayed the first few rows and printed the schema.
cleanedData.show()
cleanedData.printSchema()
Since the columns for the years 1958, 1959, and 1960 contained numeric data but were stored as strings, I cast these columns to double for numerical analysis.
val cleanedDataWithColumns = cleanedData
.withColumn("1958", cleanedData("1958").cast("double"))
.withColumn("1959", cleanedData("1959").cast("double"))
.withColumn("1960", cleanedData("1960").cast("double"))
I created a temporary view of the cleaned data so that I could run SQL queries on it.
cleanedDataWithColumns.createOrReplaceTempView("airtravel_data")
I wrote several SQL queries to analyze the data. First, I queried for the month with the highest travel in 1958.
val maxTravels1958 = spark.sql("""
SELECT Month, `1958`
FROM airtravel_data
ORDER BY `1958` DESC
LIMIT 1
""")
maxTravels1958.show()
Then, I ran another query to compute the average travel for each year (1958, 1959, 1960).
val averageTravelPerYear = spark.sql("""
SELECT
AVG(`1958`) AS avg_1958,
AVG(`1959`) AS avg_1959,
AVG(`1960`) AS avg_1960
FROM airtravel_data
""")
averageTravelPerYear.show()
Finally, I filtered records where the travel in 1958 was greater than 400.
val travelsAbove400 = spark.sql("""
SELECT Month, `1958`
FROM airtravel_data
WHERE `1958` > 400
""")
travelsAbove400.show()
After finishing the analysis, I stopped the SparkSession to free up resources.
spark.stop()