Solving the Siloed Data Problem
Using New File Notifications and Pandas to Efficiently Process Client Data
Many of our clients feel the pain of siloed data. The information needed for day to day operations exists within different data systems that are not accessible to each other. Telematics data exists in one system, maintenance data in another, fueling data somewhere else entirely, and none of these are linked. Fleet managers need these data sources integrated in a timely manner to fully understand what’s going on in their fleet. The Utilimarc Engineering team gathers data from all these different sources and efficiently loads it into a database for presentation to our analytics team.
Getting the Data to Utilimarc
Clients can choose how to share data that does not come from a telematics provider. Data shared with Utilimarc will be saved to a data lake to start the loading process into our analytics database.
- Utilimarc pulls data from an API
- Client shares data via Secure File Transfer Protocol
If a client cannot grant Utilimarc access to an API, the next best option would be to share files via SFTP (Secure File Transfer Protocol). The client sets up a scheduled query on their own database(s) that exports the results to a file and uploads that file to Utilimarc’s SFTP site. Additionally, a client could put a file on their SFTP site, and we could extract it from there.
A process on the server waits for new files, copies any new file to the data lake, and then deletes the file from the server so the file is not re-uploaded the next time the process runs.
If neither of the two other options are viable, data can also be shared via email attachment. This is the least preferred method as file sizes are limited in emails, end-to-end encryption is not offered, and there is difficulty verifying the data is coming from a trustworthy source. To prevent malicious loads from entering the Utilimarc database, email attachments are only saved if the sender is part of a pre-configured list of “safe senders” and the attachment is a file that we recognize. Recognized attachments from known senders are saved to the data lake, otherwise internal alerts are triggered.
Starting the Ingestion Process
Notifications are created when new client data is saved to the data lake. Ingestion into Utilimarc’s database is started once a notification is received on the notification queue. When the message is received from the queue, the `VisibilityTimeout` is set to 31 minutes to prevent a message from appearing in the queue again and possibly being re-processed before the first processing has finished.
Each notification contains metadata about the notification. We pull the `ApproximateReceiveCount` from the metadata to control retries on failures. The `ApproximateReceiveCount` is a count of how many times a notification has been received. In our use case, this is synonymous with the number of times a file has been attempted to be loaded to the database. A file is allowed to be retried up to ten times or until an unresolvable error is encountered. If the file has not been successfully processed by the time it runs out of retries, the message is sent to the dead letter queue. The dead letter queue only contains notifications of failed processes. If there’s some sort of major issue (ex: cloud provider outage) that causes many file load errors, we can poll the dead letter queue to reprocess files that should have been processed correctly once the underlying issue has been addressed. The message is deleted from the queue once its file has been successfully processed or the message has been sent to the dead letter queue.
The message within the notification is the path to the file in the data lake. The file path tells us which client the file belongs to and determines if we know which table it updates and steps to complete the load. If the file path isn’t recognized, the file is ignored.
Preparing the Data to Be Loaded
The actions and table information needed to validate and load a file into a table is maintained in configuration files. These are separated at a client level. Each configuration file contains a list of regular expressions of file names we’re expecting from that client and any information to load that file. At a minimum, this includes the table(s) it updates and its columns, primary keys, insert type and a cron schedule of when we are expecting to receive the file. A file’s configuration can also contain special functions for opening the file, column renames, default column values, column value fixes and small transformations.
The file is read into a Pandas data frame (pandas is a Python package for data manipulation) to be prepared for the load to the database. The data frame needs to match the database table’s columns, column order and data types. Columns in the configuration are compared to columns present in the file. If any of the columns defined in the config are missing from the file, processing is stopped immediately. Any columns that a client adds to the file that are not configured for the table are ignored. Columns that have a converter are added to the converters parameter where the column name maps to the converter function. Any date columns with a date data type are added to a list and passed as the parse_dates parameter. If the table had previously existed, pg_table_def is queried and the results are used to reorder the data frame’s columns to match the order in the database. Next, column values are checked to match the data type in the database. Small column value fixes are performed, such as trimming varchars and rounding decimal types to match the data type in the table.
Finally, the data frame is loaded to the database using awswrangler. Awswrangler is a Python package that loads pandas data frames to various AWS products. We use awswrangler’s `redshift.copy()` function to load the data frames to our Redshift database. Because awswrangler can perform upserts, appends, and overwrites when inserting into Redshift, the table is ready for analytics without worry of duplicated or missing data.
Threading to Keep Loads Moving
Processing time can take a significant hit when large files or large quantities of files are delivered. That final step of loading into the database can be very slow, although very little is happening. File notifications get stuck waiting in the queue waiting for the large loads to finish. This becomes especially problematic when a client’s ingestion is slowed down due to another client’s load. This problem can be alleviated by threading the load to Redshift. When threading a process like this, we need to ensure messages in the queue are handled properly after file processing and we are not loading too many files at once.
Before threading the load to the database, the database’s workload management (WLM) should be validated that it will allow the threading to work as intended. An example of an inefficient workload manager for our use case is one that sends all long running queries to one of the database’s queues and the short ones to another. In this scenario, we would still have the load bottleneck even with the threading implemented since many of our loads are large queries. We enabled automatic WLM on our Redshift cluster which can send a large query to one of multiple queues, better managing the workload. In addition to enabling automatic WLM, the Redshift user running these loads is given the highest priority in the queue since these loads are vital to our business.
Handling Messages Properly
The new object notification needs to be handled correctly based on the processing outcome. A successfully processed message should be deleted right away. A failure that can be retried should update the message’s visibility so it can be received again. An unresolvable error or last retry should be sent to the dead letter queue and then deleted.
Each message pulled from the queue starts a new thread. A second thread is then started for each Record within the notification (i.e. the path in the data lake, each message can contain multiple paths). This thread is created using an overridden thread class that allows errors in a child thread to be passed to the parent thread. We call this a Propagating Thread. This thread’s join() method is overridden, and the return value indicates if and what kind of error was encountered during processing.
A notification’s record threads (the propagating threads) are joined with the message thread. Once all the threads finish, their return values are evaluated to determine how the message should be handled. A list of all True values indicates that each record was processed successfully. A false value in this list means that a known unresolvable error was encountered and the message should be sent to the dead letter queue immediately. Any None values indicate that an unresolvable error was encountered, and the file should be retried.
Some basic information about file processing is saved to a processing reporting table. This table contains the file’s name, the table it updated, how many rows in the table were updated, when processing started, when processing finished and some notes about what happened during processing. The processing reporting table is later used to trigger actions that should only happen after we receive a group of files from a client or can be used to trigger building analytics reports.
Notifying the Right People When Something Goes Wrong
Sometimes the data does not show up as expected — columns are missing, file encoding changes, or column names change — just to name a few. We want to notify the right people when this happens so data isn’t unexpectedly missing from a report. On any ingestion failure, the Engineering team is sent an email containing filename and the error. This error notification is also sent to a client’s Customer Success Manager who can reach out to the client if client action is needed to load the file or to alert them of the failure.
In addition to knowing about load errors, we also want to know when we do not receive expected files. We use a monitoring service that sends alerts when the file has not checked into the service by a certain time. The time at which a file is expected is resolved using the cron in the file configuration. A cron expression is a schedule consisting of five fields. The fields specify the minute, hour, day of month, month, and day of the week. If the service hasn’t been pinged within a certain amount of time after the cron schedule, the file’s processing will be considered “down”. A message is sent to a slack channel and creates a Gitlab Incident so the missing file can be dealt with quickly.
Why It's Important
Processing files using new object notifications allows client data to be ready for analytics within minutes of the data being shared with Utilimarc. The load preparation done using Pandas ensures the data is production ready and prevents avoidable load failures. Clients can send us a large quantity of files without their load times being affected because file processing is threaded. Utilimarc analysts and data scientists know that their reports and models are built on quality underlying tables containing up to date information. Clients can then use these dashboards and models to inform day to day business decisions.
If you would like more information on how Utilimarc uses your data, please click the button below to speak to a member of our analytics team.