fbpx

CSV to Parquet Formatter: Deep Dive

We created the CSV to Parquet Formatter App to give folks an easy way to convert individual text files with comma separated values to Parquet format. It is available to install and use for free from our Nominode App Store. In this post, we will provide details about the code in the App and discuss some of the design choices that we made. If you are interested in developing your own Nominode App, check out our step-by-step tutorial.

Define a Clear and Narrow Scope

The first step when creating an App is to decide exactly what the App will and won’t do and how folks will interact with the App. What an App can do is really only limited by your imagination, so establishing firm objectives and clearly documenting limitations is important. For this App, the overall goal was to keep the code simple and still provide value by reducing the functionality to solving a specific, common problem. Along those lines, the first major decision was that we would limit it to only performing 1-to-1 conversions. Each CSV file is transformed into a separate Parquet file. The second decision was that all of the source CSV files have to exist in an S3 bucket and that all of the destination Parquet files would also be placed in an S3 bucket. Finally, we decided that the App needed to have a way to track which files it has already processed.

We are in the midst of creating a paid version of the App that reads and combines data from multiple CSV files based on patterns and uses it to create new or update existing Parquet files. Additional paid versions will also support source and destination locations that are not S3 buckets.

Leverage Existing Code

With the desired functionality well defined, we began a search for existing Python packages that we could use to implement it. For the goal of reading files from and writing files to S3 buckets, we decided to use the boto3 package. In order to read comma separated value data and convert it to Parquet formatted data, we selected the pandas package. We elected to support a remote MySQL database for processed file tracking and to leverage the sqlalchemy package to populate and update it. There are other built-in Python classes that we used for things like temporary storage, file path handling and date stamping. This is the complete list of external imports in the App code:

    import io

    import logging

    import os

    import random

    import time

    from datetime import datetime

    from pathlib import Path, PurePosixPath

    from tempfile import TemporaryDirectory


    import boto3

    import pandas as pd

    from sqlalchemy import create_engine, text

Use Nominode Connections

A Connection on a Nominode is a way to securely store credentials, token strings and other secrets. For this App, we knew that it would need credentials to access the S3 bucket containing the source CSV files, the S3 bucket where the destination Parquet files would be created and the remote database used for process tracking. Step-by-step instructions for Adding a Connection in your NND App are available. Luckily, Connection types for S3 credentials and remote database credentials were already predefined, so all that we had to do is reference them when we defined the User Interface in the App code. This is the code in the App that uses the S3 Connection information and boto3 to create an object for interacting with the S3 bucket containing the source files:

    source_s3_client = boto3.client(

        "s3",

        Aws_access_key_id =

source_s3_bucket_connection.get("access_key_id"),

        Aws_secret_access_key =

source_s3_bucket_connection.get("secret_access_key"),

    )


And this is the code that uses the database Connection information and the create_engine function from sqlalchemy to create an object for interacting with the remote database table:


    sql_engine = create_engine(

        "{engine}://{user}:{pwd}@{host}:{port}/{db}".format(

            engine=tracking_db.get("db_engine"),

            user=tracking_db.get("username"),

            pwd=tracking_db.get("password"),

            port=tracking_db.get("port"),

            host=tracking_db.get("hostname"),

            db=tracking_db.get("database"),

        )

    )

    sql_connection = .sql_engine.connect()

Define a User Interface

Each of the parameter options that we wanted to present to users of the App needed to be described in its code. Parameters defined can be grouped together to further enhance the visual layout of their presentation. We decided to use five parameter groups to keep parameters that apply to the same area together and slightly separated from other parameters. An App can present several different actions for uses to execute and each action has to have its own set of parameters defined. Our App only presents a single action, defined by a function called reformat.  This is what the definition of the action and it’s parameters looks like:

    @engine.action(

        display_name="Transform Format",

        as_kwargs=False,

    )

    @engine.parameter_group(tracking_options)

    @engine.parameter_group(destination_options)

    @engine.parameter_group(destination_s3)

    @engine.parameter_group(source_options)

    @engine.parameter_group(source_s3)

    def reformat(params):

 

When listing parameter groups it is important to remember they are rendered in reverse order from top to bottom. So, from the code above, the source_s3 parameter group will be displayed at the top of the Task page and the tracking_options parameter group will be at the bottom. Each of the parameter groups referenced were defined elsewhere in the code. For example, this is how the source_S3 parameter group was defined:

    source_s3 = ParameterGroup(

        Parameter(

            name="source_s3_bucket_connection",

            display_name="Source S3 Bucket Connection",

            type=AWSS3BucketConnection,

            required=True,

        ),

        Parameter(

            name="source_s3_folder_path",

            display_name="Source S3 Folder Path",

            type=String(),

            required=False,

            default="parent_folder/source_folder",

        ),

        name="source_s3",

        display_name="Source Location",

    )

Give Users Options

Even though this was a free App, we wanted to support some flexibility and choices in its design. Specifically, we decided to offer the option to only include a subset of columns from the CSV File, the option to reprocess all of the files previously processed, and the ability to choose what compression, if any, to apply to the internal Parquet data. This is how those parameter options were defined:

source_options = ParameterGroup(

Parameter(

name="columns",

display_name="Columns to Include",

type=Text(),

required=False,

),

Parameter(

name="reprocess",

display_name="Reprocess Files",

type=Boolean(),

default=False,

required=True,

),

name="source_options",

display_name="Source Options",

)

destination_options = ParameterGroup(

Parameter(

name="parquet_compression",

display_name="Parquet Compression",

type=Enum(choices =

["snappy", "gzip", "brotli", "none"],),

default="gzip",

required=True,

),

name="destination_options",

display_name="Destination Options",

)

Track and Report Progress

We felt it was important for this App, and any App potentially processing a large amount of data, to regularly report its progress back to the Nominode so that anyone monitoring the Task can tell how far along it is. To do that, inside the code loop where we are processing files, we added the following lines to update the progress percentage indicator and the execution log:

prog += 1

progress = "%.2f" % ((prog / total) * 100)

engine.update_progress(progress=progress)

logger.info(f"Reading file: {key}...")

As mentioned above, we also wanted to use a remote database for tracking files that we have successfully converted to ensure that they are not reprocessed on subsequent task runs, unless the user selected the option to do that  We added a set of helper functions that use the sql_connection object to execute SQL Queries against a database table:

def initialize_tracking_database()

def update_stale_processing()

def check_tracking_state(source_key)

def insert_tracker(source_key, source_hash)

def update_tracker(source_key, source_status, error=None

For example, this is the code of the helper function that checks if a file has already been processed:

def check_tracking_state(source_key):

data = {

"source_bucket": source_bucket,

"source_key": source_key,

}

check_sql = f"""

SELECT source_status

FROM {tracking_table}

WHERE source_key = :source_key

AND source_bucket = :source_bucket

"""

results = sql_connection.execute(

text(check_sql), **data

).fetchall()

if results:

return results[0][0]

else:

return []

Use Pagination with S3

When reading a large number of files from an S3 bucket, we were aware that functions can only fetch a maximum of 1000 files at a time. To loop through each set or “page” of files, we used this code:

paginator = source_s3_client.get_paginator("list_objects")

paginate_params = {"Bucket": source_bucket}

if source_path:

paginate_params["Prefix"] = source_path

page_iterator = paginator.paginate(**paginate_params)

for page in page_iterator:

contents = page.get("Contents", [])

for file in contents:

key = file["Key"]

Read the Source File Data

We used the read_csv function from the pandas library to build an in-memory dataframe object from the contents of each file that is being processed. This is the code that did that and properly handled reading only a subset of the columns in the data, if that option was specified:

fs = source_s3_client.get_object(Bucket=source_bucket, Key=key)

table_params =

{"filepath_or_buffer": io.BytesIO(fs["Body"].read())}

columns = source_options.get("columns")

if columns:

use_columns = [x.strip(" ") for x in columns.split(",")]

if all(x.isdigit() for x in use_columns):

table_params["usecols"] = list(map(int, use_columns))

else:

table_params["usecols"] = lambda c: c in use_columns

df = pd.read_csv(**table_params)

Write the Destination File Data

To convert the in-memory dataframe object to Parquet format, we used this code to prepare a temporary local file:

with TemporaryDirectory(dir=Path(file)) as tempdir:

path_len = len(source_path)

path_len += 1 if path_len else 0

name_piece = glob.key[path_len:]

if name_piece[-4:] == ".csv" or name_piece[-4:] == ".txt":

name_piece = name_piece[:-4]

parq_local_file =

f"{name_piece}.{datetime.now().strftime(

'%Y%m%dT%H%M%S'

)}.parquet"

parq_local = str(Path(tempdir) / parq_local_file)

os.makedirs(os.path.dirname(parq_local), exist_ok=True)

We used the to_parquet function from the pandas library to write the dataframe object out to the local Parquet formatted file, including the compression, if that option was specified:

converted_df = glob.df.astype("str")

parq_params = {"path": parq_local}

comp = destination_options.get("parquet_compression")

parq_params["compression"] = None if comp == "none" else comp

converted_df.to_parquet(**parq_params)

We used the upload_fileobj function from the boto3 library to upload that file to the destination S3 bucket:

dest_key = "/".join(filter(

None, [destination_path, parq_local_file]))

with open(parq_local, "rb") as data:

destination_s3_client.upload_fileobj(

data, destination_bucket, dest_key)

Handle Exceptions

Many portions of the App code were actually contained within try and except blocks to ensure that any errors encountered were handled gracefully. The except block was used to write the exception encountered to the Task execution log and to update the file processing database to reflect what happened. The code to write the local file to the destination S3 bucket actually looked like this:

try:

with open(parq_local, "rb") as data:

destination_s3_client.upload_fileobj(

data, destination_bucket, dest_key)

except Exception as e:

reason = "Destination Write Error"

logger.exception(e)

update_tracker(source_key=key, source_status=reason, error=e)

We will continue to write articles like this that take a deeper look at the code and design decisions behind some of the Apps that we have created. But, if there are any particular Apps that you would like for us to dive into or any particular App design and development topics you would like to see us discuss, please let us know in the comment section below.

Byte sized updates delivered to your inbox
  • This field is for validation purposes and should be left unchanged.