A tool/framework to detect the extent of changes in data entities between time periods

Suman Kumar Gangopadhyay
Analytics Vidhya
Published in
8 min readOct 2, 2021

--

Data Observability — a must have metric today

Today, organisations in the world leverage multiple tools/frameworks to enable traceability of data running throughout various data pipelines within their own data landscape. A variety of tools/frameworks exist to track and report pipeline performance, to alert on imminent SLA breaches and also to show the lineage of a data product throughout an entire distributed architecture. Some of these tools/frameworks capture the lineage of a data product across heterogeneous storage systems (where the intermediate data resides) and/or compute systems (where the transformations are being computed). These tools/frameworks give the user a very high-level visibility of the data movement through various parts of the pipeline within the entire architecture.

Tools/frameworks available for data observability

The aim of this blog is to present a tool/framework which can be used to track the changes in data entities over chosen time periods across their lineage of transformations.

The user of this tool/framework has the freedom to choose the time periods, the data entities and the lineage of transformations that needs to be tracked. The user can choose multiple entities to track and can also define the various levels of aggregation at which the lineage can be tracked.

Once configured with these user specific choices, this tool/framework provides the information in the form of a directed graph that can be read from right to left, with the root node being at the extreme left. The root node represents the final form of the data entity in its lineage. The nodes on the right of the root node represents the various transformations applied on the data entity. These transformations are displayed in the graph in a breadth first manner. The rightmost nodes in the graph represents the sources where the journey of the entity started in its lineage. The edges connecting the nodes will display the percentage changes in the values of the transformations applied on the data entity.

By tracking the extent of changes from the source to the destination of the entity, the user can pin-point the transformation which may have potential errors/data quality issues in the pipeline.

This framework expects the user to provide the following information as configurations

  • The database tables and columns holding the transformations (which the user wants to track)
  • The lineage of the entity and the related transformations coded in a json file (one time activity)
  • The dates within which the user is looking to compare the entity and the transformations on it
  • The various levels of aggregations which is of interest to the user

Where to get the required data

This blog uses the data (COVID-19 Activity.csv & COVID-19 Cases.csv) available from

Technology stack in use

The primary programming language used are Python and SQL (This tool/framework can work with PostgreSQL or Snowflake). The user interface is built using the flask framework. No docker image of this tool/framework is available at the time of this writing, however, one should be available very soon.

Setting things up

initial config entries for the application to start

This is a text file with key-value pairs under specific headings. It follows requirements of the python configparser module. The database information is captured under host name (server), database name (dbname) and schema, the filter (this is redundant and will be taken out in a future enhancement) and levels field holds the various levels of granularity that the user wants to aggregate the data on. The hierarchy_table_name field holds the database table name which has the data of the various levels. The drop-downs in the main landing page of the application will be populated using this table. The fields query holds the SQL to list the data entity, all the various attributes used in the transformations performed to arrive at the entity and the tables which stores all these. Each entity defined in the config file should have a corresponding query. The field date_column_name is the name of the date column in the database. The entity_list field holds the list of data entities that the user wants to track. Care must be taken while defining the entity_list and the queries. The order of the entities should match the order of the entity SQLs in order to correctly display the lineage per entity. The framework will present the user with an intermediate page to navigate to the entity of his/her choice. The details under the section app-home refers to the location from where the application is being run (this will be replaced once the application is made available as a docker image in a future enhancement) and the location of the credentials byte-code file with which the application will connect to the database

Edges & Vertices

Next comes the initial information about the edges, which lays out the journey of the data entity from the source to the destination via the transformations. The user needs to provide this information in the form of a json

edge json — contains column names of the data entities being tracked

User will have to copy this json into another json which will form the lookup for comparing the two dates between which the changes of the data entity is being compared. Similar setup will need to be done for all the entities listed in the initial configuration. The initial edge values can be anything, the tool/framework will populate those at runtime. Edge description is for documentation purpose only.

During runtime, the application will also create jsons of the vertices (which are the database table columns that stores the source, transformations and destination of the data entities mentioned in the initial configuration) based on the initial configurations provided by the user and will also verify whether the vertices are correctly mentioned as edges.

Verification failures will be logged in the console and the outcome won’t be generated.

Vertex json

Core processing logic

The core of the application stands on breadth first search of a directed graph. The destination of the data entity is the root node of the directed graph and all transformations and sources leading to the destination are non-root nodes of the directed graph which are traversed using breadth first search. Based on the edges and vertices derived from the initial configurations provided by the user, the generated graph is stored in memory as an adjacency list. The flask application connects to the database and queries the hierarchy table to retrieve the data for the levels and present the user with the following interface

Landing page of the application where the user can choose the aggregation levels

Selecting the future and past dates are mandatory and hence a default value is always populated, however, all other levels are optional and can be skipped. On clicking the Get Lineage button, the application will retrieve the information from the database based on the dates and the other chosen levels. Similar relevant information will be retrieved for all the entities defined in the initial configuration. The user will be presented with an intermediate page where he/she can choose to investigate the entity of choice in detail. The intermediate page also reminds the user the data that has been chosen in the landing page.

Intermediate page

The difference in values of the source, transformations and destination are computed in percentages and stored as the labels of the edges. The final image is rendered using the python dagre-d3 library. The final page also reminds the user the data that has been chosen in the landing page. This helps the user to remember the selections made in the beginning.

Final rendered image of the directed graph displaying the extent of data changes along the edges

Pseudocode

The application starts by connecting to the PostgreSQL database and building the vertex json based on the configurations provided by the user. After building the vertex json, the application fetches the data for the various levels of aggregation mentioned by the user in the configurations. Once the user submits the choice of data and aggregation levels, the application updates the edges and lookup jsons with the values of the current and old dates respectively. Once these jsons are updated, the adjacency matrix is also updated with the corresponding values based on the percentage difference between the old and the current values. Completion of these operations triggers the breadth first search algorithm and prepares the final list of edges with the percentage differences as the edge labels. The list of edges is a python list of tuples and the content of the tuples are source vertex, destination vertex and the percentage difference in the values of source and destination vertices. This list is then passed to the html and displayed using the dagre-D3 python library. The application can connect to PostgreSQL or Snowflake database based on the value passed to the BuildJsons class in the app.py.

Conclusion

This tool/framework is aimed at building data observability at the finest level of granularity chosen by the user. The main advantage of this application is its ability to layout the data differences based on the user’s choice. Once the initial configurations are set, the user need not worry about the knowledge of the code running the data pipeline, this is because even if the user is unaware of the code of the pipeline, he/she can pin-point the potential issues in the data (source, destination or the intermediate transformations) by looking at the rendered graph and the percentages along the edges. The effort to adopt this application is very low and it delivers great value in terms of observability of the drifts in the data entities through time.

--

--

Suman Kumar Gangopadhyay
Analytics Vidhya

A Data engineer who loves to foray into uncharted domains of engineering