Building Reliable Data Pipelines: The Role of Unit Testing in Ensuring Code Quality and Operational Excellence

Table of Contents

Categories

Sign up for our newsletter

We care about the protection of your data. Read our Privacy Policy.

Building Reliable Data Pipelines: A visually striking, futuristic workspace filled with multiple digital screens showcasing lines of Python and PySpark code. A data engineer, focused on their laptop, monitors a screen displaying successful unit test results. Around them, glowing data-related symbols such as graphs, databases, and pipelines float in the air, representing automation and quality assurance in data engineering. The scene is set in a cool-toned, modern tech environment with blue, green, and purple hues.

Introduction

In the realm of software engineering, ensuring the reliability, efficiency, and maintainability is critical. Unit testing is one of the most effective practices to achieve these objectives, providing a systematic way to validate the smallest parts of a codebase.  

Data engineering shares fundamental principles with software engineering: both disciplines involve structuring and processing data, maintaining code reliability, and optimizing performance. Engineers in both fields build systems that handle large-scale data operations, manage dependencies, and use version control for consistent code management. As in software engineering, unit testing is essential in data engineering to verify that individual components function correctly, catching potential issues before they affect the entire system. This overlap underscores why unit testing is crucial for data engineering pipelines as well as traditional software applications. 

This blog post delves into unit testing from a data engineering perspective, examining its purpose and the benefits it brings to developers and organizations alike. 

 

Understanding Unit Testing in Script Development

Unit testing is a methodology where individual units or components of code are tested in isolation to validate their functionality. These units typically correspond to specific functions, methods, or modules within a script. Unit tests are designed to verify that each unit of code behaves as expected under various conditions, independent of other parts of the system. 

 

Purpose of Unit Testing in Script Development

The primary purpose of unit testing in script development is to ensure the correctness and reliability of the codebase. By systematically testing individual units of code, developers can identify and address defects, errors, and regressions early in the development process,reducing the likelihood of critical issues slipping into production. Additionally, unit testing promotes code modularity, readability, and maintainability, fostering a culture of quality and continuous improvement within engineering teams. 

 

Benefits of Unit Testing in Script Development

  • Early Bug Detection:  Unit tests help developers detect bugs and issues at an early stage, reducing the time and effort spent on debugging and leading to faster development cycles. 
  • Improved Code Quality:  Writing unit tests encourages best practices like modularization and separation of concerns, resulting in a more organized, extensible codebase that requires less maintenance over time. 
  • Enhanced Documentation:  Unit tests serve as executable documentation, clarifying the intended behaviour and usage of each code unit, which facilitates knowledge transfer and onboarding. 
  • Facilitates Continuous Integration (CI):  Integrating unit tests into the CI pipeline ensures that new code changes do not compromise system integrity, promoting rapid iteration and reliable software delivery. 

 

Developing Unit Tests for Scripts

When it comes to developing unit tests for scripts, several best practices and considerations come into play:

 

  • Identify Testable Units: Break down the script into smaller, testable units such as functions, methods, or modules. 

 

Scenario: In a Python script for a finance application, the script includes functions for calculating interest, validating user inputs, and generating financial reports. Each of these functions can be identified as separate testable units. 

 

  • Write Test Cases: Design test cases to validate the behaviour of each unit under different scenarios and edge cases. 

 

Scenario: For the interest calculation function mentioned earlier, test cases could include scenarios where different interest rates are provided, negative principal amounts are provided, and zero interest rates are provided. 

 

  • Use Test Frameworks: Leverage testing frameworks such as Pytest (Python), or Mocha (JavaScript) to automate the execution of unit tests and generate reports.

 

Scenario: In a Python project, the developers utilize the Pytest framework to automate unit test execution. They write test cases using Pytest’s syntax and leverage its built-in assertion methods for validation.

 

  • Mock External Dependencies: Use mocking libraries or techniques to simulate external dependencies such as databases, APIs, or file systems, ensuring that unit tests remain isolated and deterministic. 

 

Scenario: In a Python script that interacts with an external API to fetch weather data, the developers use the MagicMock library to simulate API responses. This ensures that unit tests for the script’s functions remain isolated and do not depend on the actual API. 

 

  • Automate Testing: Integrate unit tests into the CI/CD pipeline to automate testing and ensure consistent validation of code changes. 

 

Scenario: In a CI/CD pipeline for an external web application, unit tests written with Pytest are integrated into the pipeline. Whenever a developer makes changes to the Python scripts and pushes them to the repository, the CI/CD pipeline automatically triggers the execution of unit tests to validate the changes before deployment. 

 

Sample Unit Testing Use Case ProCogia implemented for a client 

 

Introduction 

ProCogia’s client has requirement to build a new report that allows internal stakeholders, including senior executives, to monitor and analyse KPIs from their Logistics Management systems. The report will provide them with valuable insights about their current operations, market trends etc. and help their decision-making process.

 

Challenge 

ProCogia needed to design and deploy unit cases for the pipelines built for the report. To help better explain, below is sample PySpark script we needed to design a unit case for.

				
					Select Action as EventName, OrderNumber as Freight_Order_Number, WorkOrderNumber as Work_Order_Number, 
Case  
WHEN OrderType = 'FREIGHT' then 'Freight' else 'Ship Assist' END as Order_Type, 
Work_Order_Type, 
Tug, 
Barge_Ship, 
Start_Time, 
End_Time, 
From_Location_Short_Name, 
From_Location_Name, 
From_Location_Name, 
To_Location_Short_Name, 
Customer_Name, 
Task_Type, 
Load_Status, 
Cargo_Name, 
Cargo_Quantity 
FROM [transportation].[vw_FACT_EVENTS] FACT 
LEFT JOIN [transportation].[vw_DIM_LOCATION] Tolocation 
ON FACT.ToLcationGuid = Tolocation.LOCATIONGUID 
LEFT JOIN [transportation].[vw_DIM_LOCATION] Fromlocation 
ON FACT.FromLocationGuid = Fromlocation.LOCATIONGUID 
where marked_as_deleted =0 
and action not in ( 
'River Bar Pilots', 
'Extra Crew - Per Occasion', 
'Hourly Billing Event', 
'Short Notice Cancel', 
) 
and action not like '%start%'
				
			

Approach

 

Develop a PySpark script: 

Below is a snippet of a PySpark script that was developed for the above scenario. The script called a method, ‘process_events’, is the main code that called subsequent methods for processing the data. 

				
					def process_events(spark, events_path:str, location_path:str, workorderlinks_path:str, tsg_path:str, target_path:str): 
 
    events_df = spark.read.format('delta').load(events_path) 
    location_df = spark.read.format('delta').load(location_path) 
    workorderlinks_df = 	spark.read.format('delta').load(workorderlinks_path) 
    active_events_df = events_df.filter(~col(MARKED_AS_DELETED_COLUMN)) if MARKED_AS_DELETED_COLUMN in events_df.columns else events_df 
    tsg_df = spark.read.csv(tsg_path, header="true", inferSchema="true") 
     
    filtered_df = filter_for_valid_events(active_events_df, location_df, workorderlinks_df) 
    computed_df = add_computed_columns(filtered_df) 
    servicegroup_df = add_servicegroup_column(computed_df, tsg_df) 
    report_df = add_tow_type(servicegroup_df) 
 
    # Now apply/persist these updates to the delta table. 
    num_rows = report_df.count() 
    if not DeltaTable.isDeltaTable(spark, target_path): 
        log.warning('+++ Creating new table with %s rows @ %s', num_rows, target_path) 
        report_df.write.format('delta').mode('overwrite').save(target_path) 
    else: 
        log.warning('+++ Overwriting table with %s rows @ %s', num_rows, target_path) 
        report_df.write.format('delta').mode('overwrite').save(target_path)
				
			

Develop Unit Test case for the PySpark script:

The below unit test case would check whether the code part of method, ‘process_events’, is working accurately to attain the anticipated behaviour. Individual parts of the code are tested to validate whether each of the associated methods are performing accurately to avoid any unexpected errors we might witness in future.

				
					def test_process_events(self):  
'''  
This test includes exercising the creation of the gold table. It also checks correctness of some column names.    
 
'''  
 
events_path = get_path_to_table(self.spark, DUMMY_BASE_URL, 'EVENTS_sample_ActionNotIn', 'silver', 'transportation')  
location_path = get_path_to_table(self.spark, DUMMY_BASE_URL, 'LOCATION', 'silver', ' transportation ')  
workorderlinks_path = get_path_to_table(self.spark, DUMMY_BASE_URL, 'WORKORDERLINKS', 'silver', ' transportation ')  
servicegroups_path = os.path.join(get_path_to_table(self.spark, DUMMY_BASE_URL, 'TUGSERVICEGROUPS', 'bronze', ' transportation '), 'tugservicegroups.csv')  
target_path = get_path_to_table(self.spark, DUMMY_BASE_URL, 'VALIDEVENTS', 'gold', ' transportation ')  
process_events(self.spark, events_path, location_path, workorderlinks_path, servicegroups_path, target_path)  
 
validevents_df = self.spark.read.format('delta').load(target_path)  
num_rows = validevents_df.count()  
expected_num_rows = 1  
self.assertEqual(expected_num_rows, num_rows, f'***: expected {expected_num_rows} rows but got {num_rows}')  
 
columns = validevents_df.columns  
expected_col_name_1 = 'Work_Order_Number'  
expected_col_name_2 = 'To_Location_Name'  
expected_col_name_3 = 'Tug_Service_Group'  
expected_col_name_4 = 'Link_Number'  
 
self.assertTrue(expected_col_name_1 in columns, f'--* columns name {expected_col_name_1} not found in dataframe')  
self.assertTrue(expected_col_name_2 in columns, f'--* columns name {expected_col_name_2} not found in dataframe')  
self.assertTrue(expected_col_name_3 in columns, f'--* columns name {expected_col_name_3} not found in dataframe')  
self.assertTrue(expected_col_name_4 in columns, f'--* columns name {expected_col_name_4} not found in dataframe')
				
			

The above unit test case would build a dummy view report which was requested by the client and check whether the column names which are part of the logic are appearing explicitly in the report.

 

Conclusion

Unit testing is a cornerstone of modern software engineering, enabling developers to build robust, reliable, and maintainable scripts. By validating individual units of code in isolation, developers can detect and address defects early, enhance code quality, and foster a culture of continuous improvement. Embracing unit testing as a core practice empowers engineering teams to deliver high-quality software that meets the evolving needs of users and stakeholders, driving success and innovation in today’s fast-paced digital landscape.

At ProCogia, we specialize in building scalable, high-quality data solutions, ensuring reliability through best practices like unit testing. If you’re looking to strengthen your data pipelines and implement robust testing strategies, get in touch with us today. Let’s collaborate to optimize your development process and elevate your engineering outcomes.

Subscribe to our newsletter

Stay informed with the latest insights, industry trends, and expert tips delivered straight to your inbox. Sign up for our newsletter today and never miss an update!

We care about the protection of your data. Read our Privacy Policy.

Keep reading

Dig deeper into data development by browsing our blogs…

Get in Touch

Let us leverage your data so that you can make smarter decisions. Talk to our team of data experts today or fill in this form and we’ll be in touch.