Writing Spark Apps and deploying to Databricks

Table of Contents

Categories

Sign up for our newsletter

We care about the protection of your data. Read our Privacy Policy.

Run Spark in standalone mode using our favorite IDE and unit test framework. Create test data and build unit tests using: VS Code, Python / PySpark,Azure

Data Engineering & Spark

Data Engineering is today a highly demanded discipline and like Data Science requires a broad set of skills to master. Professional Data Engineers need a background in software engineering, SQL, cloud technologies (e.g. Azure and AWS), devops experience and last but certainly not least, attention to quality, both code and perhaps more so, data.

In this blog, we address a technology that has become well known and is used pervasively it seems. Databricks. While it is quite common and in fact expected for a Data Scientist to build and develop their solutions using an interactive notebook technology. The same is not true for Data Engineers. Spark code that procures and curates data of different types from various sources data has to be robust and efficient.

Databricks can be an appealing tool to build such code. It’s easy. Just warm up a cluster, create a new notebook and off you go. But it is not recommended. How are you going to test your code? Where is the test data? How about version control of your test data. And are you really going to be run a cluster all day long just so you can tap out your code? Perhaps you have a blank cheque from your IT department!

The answer is to set up a local development environment. Run Spark in standalone mode. Use your favorite IDE and unit test framework. Create test data and build unit tests. We will show you how to do this using:

  • VS Code
  • Python / PySpark
  • Azure

Goals

Lets just recap what we’d like to do. We want to develop a unit tested Spark application in a development environment. Then deploy it into a Databricks environment in the cloud.

Project Structure

Your project should look something like as shown below.

Lets start off by talking about the files shown in orange. process_data.py is where all of your data engineering code goes.

Say for example, you want to flatten incoming JSON data. Then you’d like to unit test this code and also run it from the command line. The import dependency between these files would be like this:

process_data.pyLet’s start with the core of this file. You can see we’ve implemented a process_data() method. This one does something fairly simple. Pass in a dataframe with one column that is all JSON. Parse the JSON and return it in as a flattened JSON.
				
					def process_data (spark, source_df: DataFrame) -> DataFrame:

    df = source_df.withColumn('student', explode('Full')).drop('Full')
    df = df.select(df.student.ID.alias('ID'),
                   df.student.LastName.alias('LastName'), 
                   df.student.FirstName.alias('FirstName'),
                   df.student.age.alias('age'),
                   df.student.Major.alias('Major'),
                   df.student.StudentStatus.alias('Status'))
    return df
				
			
This Python module should have a main part too. This is because we want to deploy this file to a Databricks cloud and execute as a command line script. For that to work, the main part will call a process_data_wrapper method passing in a real Spark context.
				
					if __name__ == "__main__":
    # If running as main, context will be in Databricks and have real a spark object 
    
    # Now process data!
    process_data_wrapper(spark)
				
			

And for the final part, the “wrapper“.

				
					def process_data_wrapper (spark):

    log.info(f"+++ process_data_wrapper running in spark {spark.version}")
    args = vars(ap.parse_args())
    p_tablename : str = args['tablename']
    log.info(f'+++ p_tablename={p_tablename}')

    source_table_path = get_path_to_table(spark, p_tablename, 'input')
    source_df = spark.read.format('parquet').load(source_table_path)
    processed_df = process_data(spark, source_df)
    target_path = get_path_to_table(spark, 'students', 'output')
    merge_into_target(spark, processed_df, target_path, ['ID'])
				
			

spark_main.py – This is a driver that runs in your local development environment. It first creates a standalone Spark context and passes it to a method in process_data.py.

It’s really simple. Here’s an example:

				
					from process_data import get_or_create_spark, process_data_wrapper
spark = get_or_create_spark('Local-Spark-App')
process_data_wrapper(spark)
				
			

By the way, we put a get_or_create_spark() method in process_data.py because we’ll be calling it from test code as well as the spark_main.py driver.
test_process_data.py – This is where you create unit tests. I like to use Python’s unittest framework. Each test will create the required test data.

Here is a sample unit test:

				
					from create_test_tables_from_source_data import create_tables
from process_data import get_or_create_spark, process_data, get_path_to_table

def test_process_data(self):
    create_tables(self.spark)
    source_df = self.spark.read.format('parquet').load('storage/input/students')
    new_df = process_data(self.spark, source_df)
    actual_count = new_df.count()
    expected_count = 28
    self.assertEqual(expected_count, actual_count, '--* DataFrame did not have expected number of row')
				
			

Test Data

Our unit tests need test data. In our example scenario we will be processing JSON data. Which is flattened into parquet and then merged with a delta table. Here we write some simple driver code to create test data from JSON source files.

Storage folder

I like to put all test related data in folder called storage. In this example, a couple of JSON files represent the source data, from which dataframes are generated. The folder will also contain a Delta table. Unit test code will manipulate and test the contents of the table.

Known problems

We know that in Windows, when the Spark context is stopped that a “Shutdown Hook” exception is thrown. This is related to an issue that prevents deletion of temporary files that are created by Spark. I configure my Spark context to locate this folder in a tmp directory within my project. That way it’s easy to identify and delete from time to time.

Full Sample Project

The full sample project is written to work with blob storage accounts in Azure. However it should be easy to modify to run in AWS too.

You can see it here at my Github: template-spark-app

Prerequisites

My development environment used Java 1.8 and Python 3.8. The bin folder contains a winutils.exe and a hadoop.dll. You need to set a HADOOP_HOME environment variable in Windows and include it the Windows Path variable. It should point to this project folder.

This last bit is important if you’re running in Windows. You’ll run into errors otherwise.

Conclusion

In conclusion, mastering data engineering, especially with tools like Databricks and Spark, requires careful attention to development environments, testing, and deployment strategies. As you continue to explore the world of data engineering, consider leveraging the resources and expertise offered by ProCogia and similar industry leaders. With their guidance, you can navigate the complexities of data engineering with confidence, ensuring the scalability and reliability of your solutions.

Author

Keep reading

Dig deeper into data development by browsing our blogs…
ProCogia would love to help you tackle the problems highlighted above. Let’s have a conversation! Fill in the form below or click here to schedule a meeting.