Avatar of Rainer Jaspert
Avatar of Alexander Kniesz

The CluePoints platform supports clinical trial sponsors that conduct medical studies, with statistical analyses of the captured study data. The main use case of the platform is the detection of systematic errors or anomalies in clinical studies' data: Customers (typically data analysts at pharmaceutical, medical device, or clinical research organization companies) use the platform to upload sets of study data and configure data processing pipelines. By executing those pipelines, customers can derive so-called signals, which point to possible weaknesses (systematic errors or anomalies) in the capturing of the study data. Customers can visualize and further analyze signals, and finally assign actions to reduce identified weaknesses to mitigate the risk of faulty study results.

Current solution

The design of the current solution is mainly influenced by the classical Data Warehouse approach of using some kind of “staging area” and (ETL-) batch processing to gather, harmonize and integrate the “raw” clinical study data from operational source systems and to finally load this “derived” data into a relational database. Statistical methods are then applied to the data in this relational database to derive the signals, and a commercial tool is used for the visualization of the derived data and signals. CluePoints is aware of some issues with this solution design. The number of relational databases is very high, as multiple databases are created per clinical study. Load balancing of the data access is difficult, as clinical studies produce noticeably varying amounts of data. And the ACID guarantees provided by the used RDBMS are actually not needed for the data processing.

Why are we involved?

INNOQ is currently supporting CluePoints with the refinement of the software architecture of the CluePoints platform. As a result of this work, we became aware of some similarities between the CluePoints platform and the Data Mesh approach, which has quite recently been proposed for the design and development of solutions for analytical data processing. According to Data Mesh, the CluePoints platform can be seen as a “Self-Serve Data Platform”, allowing data analysts to easily create their own “Data Products”: Services that consecutively process incoming study data and present the signals derived from each version of the study data. We are currently promoting this approach and are interested in customer projects embracing it. Together with CluePoints, we aligned on a PoC to get more experience in applying up-to-date technologies commonly used in data mesh projects. We believe that those technologies will be helpful in the design of an improved solution for the CluePoints platform.

To avoid misunderstandings, let us point out, that the CluePoints platform does not align with all Data Mesh principles. Data analysts do not share their data products. Therefore, interoperability of data products is not needed nor supported within the CluePoints platform. Also, the Data Mesh approach typically aims to improve analytical data processing in larger enterprises, while CluePoints is providing a service for data analysts working at different clinical facilities.

PoC Solution overview

In this PoC, we focused on the actual data processing of the CluePoints platform. Particularly, we excluded a major part of the CluePoints platform that allows data analysts at clinical facilities to configure their own data processing pipelines.

Scope and use case

To further limit the scope of the PoC, we decided to focus on one processing step: the addition of surrogate keys to a clinical study dataset. A clinical study dataset consists of patient profile data, data about doctor visits, and about their participation in the clinical study. It also contains personal data about patient diseases and their treatment. The generated surrogate keys allow the combination of the different datasets by patient-, visit-, or location-IDs. We covered some of the most common scenarios for the incoming study data. We handled the addition of new datasets to an existing study, as well as the addition of rows or columns to an existing dataset. We did this specifically to challenge the usefulness of the data versioning tool support, which we introduced in this PoC. Finally, we automated the execution of the data processing for an easy handover to the CluePoints development team.

Used technologies

We used the following technology stack within the PoC:

Technology Short description Link
MinIO S3-compatible object storage https://min.io/
lakeFS git-like data versioning https://lakefs.io/
Cherrypy Used for lakeFS actions (post-merge hooks) implementation https://cherrypydocrework.readthedocs.io/index.html
Parquet columnar file format https://parquet.apache.org/
Iceberg open table format https://iceberg.apache.org/
Trino distributed SQL query engine https://trino.io/
Marquez OpenLineage compatible metadata service https://marquezproject.ai/
Pandas Used for IO on CSV- and Parquet files https://pandas.pydata.org
Jupyter Notebook Used as development environment https://jupyter.org/

Implemented solution

We used Jupyter Notebooks with python for the implementation of our solution and relied on the pandas framework for the processing of the CSV files. We deployed the solution locally using docker and docker-compose. We also automated the setup of the environment and the execution of the data processing.

The docker environment

The environment is set up with docker containers, which share a docker network. In the configuration file, we configured the following containers:

The running containers then look like this:

After those containers are up and running, the different endpoints can be inspected, and the data ingestion can begin.

Initial data ingestion

We used some study test data provided by CluePoints and derived some versions of consecutive study datasets from them, resulting in the following directory structure:

The “ilms” folder contained the original data and the folders ilms1 to ilms3 held different versions of the original data. Each version differentiates from the original by having fewer datasets, columns, or rows. The different data files in each of those folders contain the raw study data for e.g. patients, visits, or locations each as one CSV file. An additional contents CSV file provides metadata about the fields and their data types used in each of the study data CSV files. To correctly parse the study data CSV files and load the data into pandas dataframes, we assigned the proper data types from the contents CSV. Using pandas again, we then saved these dataframes as parquet files into our lakeFS repository. We created a new lakeFS branch for each of our different versions of study data. lakeFS handles the actual storage in the underlying MiniO system for us. To confirm that it worked, we can look at the respective endpoints, where we can see that in MinIO are files with unknown content. Whereby in lakeFS, these files are mapped into a hierarchical filesystem, which we can browse:

Left: MinIO Homescreen, right: LakeFS Homescreen
Left: MinIO Homescreen, right: LakeFS Homescreen

To be able to use our query engine Trino for the SQL processing on the now existing parquet files, we had to add the schema information in the Hive metastore. Therefore, we mapped the data types from the contents.csv file to the respective SQL data types and generated create table statements for each of the parquet files in our lakeFS branch. With these statements successfully executed, we could use SQL to access the ingested raw study data:

The processing step

We handled the addition of the surrogate keys for the parquet files as one of possibly multiple data processing steps within this PoC. We implemented the processing step in two ways. In the first approach, we used a python dictionary to remember already given id-key pairs. To add a surrogate key column to a given table, we first generated missing IDs, added them to the dictionary, and then matched all IDs with their original keys. With Hive tables, it is not possible to add a column to an existing table. Because of this, we recreated the table with an appended column for the surrogate keys. We didn’t implement this data processing step reliably: after the termination of the processing, the memory-based python dictionary and the corresponding mapping of the original to the surrogate keys is lost.
In the second approach, we relied only on SQL processing. Instead of the python dictionary, we used a table for the mapping between surrogate and original keys. We created this mapping table, generated and added missing IDs for each original key to that table, and finally created the new table with a left-join.
We implemented this second approach based on the Apache Iceberg open table format, but it can be done with Hive tables in the same way. With that, we successfully retrieved what CluePoints calls a “base” dataset: The study data in SQL tables with added surrogate keys for patients (CluePoints also adds surrogate keys for visits and locations). After the successful processing, we committed both the parquet files with the raw data and those with the base data to the lakeFS branch.

Left: Dataset before, right: Dataset after
Left: Dataset before, right: Dataset after

We repeated the processing step outlined above for all the prepared study data versions. Each time, we created a new empty branch and finally got every version of study data loaded and accessible for SQL processing in lakeFS. We could easily switch between the different versions (snapshots) of the study data just by choosing the corresponding branch. A better approach would have been to create branches based on the previous one and just load only the changed or added rows of data. We decided against this for the following reasons:

We spent some time analyzing the third argument. lakeFS provides support to copy the schema information in Hive metastore from one branch to another. We implemented a lakeFS POST merge hook to automate this copying of metadata whenever a new branch gets created. But while this worked in general, we did not appreciate this approach. A failing hook execution could leave the dataset in an inconsistent state, which might not have been spotted easily. Only a datatype might have been changed.

We tried out the Apache Iceberg open table format because this format keeps table metadata in the file system, which should be copied automatically together with the data. But for now, the Trino connector for Iceberg tables still relies on the Hive metastore, making the additional copy of metadata still necessary.

Metadata tracking

The data processing is integrated with Marquez, a metadata service, which implements the OpenLineage standard for tracking metadata and schema information. For each dataset in a data processing pipeline (incoming, intermediate, or outgoing) the schema information and data processing step, which created it, is logged. This allows visualizing the dependencies between datasets and processing steps (data lineage). In detail, we used the metadata service to:

We have created a data lineage graph with four knots for each dataset: the ingestion of the dataset, the unprocessed raw dataset, the addition of surrogate keys as the processing step, and the final “base” dataset which the processing step created. The datasets, processing steps, and the data lineage graph can be then inspected in the Marquez UI:

Left: Datasets and processing Homescreen, Right: Data Lineage Graph
Left: Datasets and processing Homescreen, Right: Data Lineage Graph

Findings and outlook

The following findings of our PoC regarding both the current state as well as possible improvements have been most noteworthy.

Usefulness of lakeFS branching

lakeFS supports git-like operations on data files maintained in object storage. Within the PoC, we used lakeFS branches for the processing of the “raw” into the “derived” datasets. For each new version of “raw” data, we created a new lakeFS branch. This allowed us to easily perform some useful operations on “raw” datasets, like

After this first processing step, a lakeFS branch contains what CluePoints calls a “snapshot”: The relevant derived study data for a specific point in time. All further data processing steps (like the application of statistical methods) can then be executed regarding a specific lakeFS branch. A failure in the data processing does not introduce any corruption in statistical results already created, as each lakeFS branch could easily be reset to a former state. Results of a complete successful execution could be retained by committing or even by merging them into an explicit lakeFS branch for the results. Switching between different snapshots becomes as easy as doing a checkout on a specific lakeFS branch. So, the use of the branching functionality for snapshots is already meeting some major requirements of the CluePoints platform. Additionally, some planned lakeFS improvements are quite promising:

Nevertheless, some concerns about the usage of lakeFS exist and should be mentioned:

Use of standard metadata tracking

Metadata tracking enables us to visualize dependencies between datasets in a data processing pipeline. With metadata tracking, it is easy to see how the data entered the pipeline, who worked on it, and which processing was done on a dataset before. Overall, we get the visual representation of a data lineage graph by tracking each step of a data processing pipeline. The evolution of data lineage tools is not clear yet. We decided to use the OpenLineage reference implementation, Marquez. But there are numerous alternatives available (e.g. Amundsen or atlan).
Implementing metadata tracking manually needs quite a lot of effort. Additionally, care should be taken to clearly separate the code for the metadata tracking from the code for the actual data processing. In this PoC, we manually implemented metadata tracking but didn’t put much effort in the separation of the tracking code. Data processing tools like Apache airflow, Keboola, or dbt have integrated support for OpenLineage-based metadata tracking and fully allow focusing on the implementation of the processing step. The metadata tracking is done under the hood and can just be activated as required.

The CluePoints platform already keeps track of metadata of the used datasets and enables visualizing their dependencies. Additionally, the CluePoints platform currently does not rely on a standard tool for the configuration and execution of the data processing pipelines. Thus, while quite helpful within the scope of our PoC, we didn’t recommend the usage of a standard tool for metadata tracking to CluePoints.

MinIO with Azure Blob Storage

We chose MinIO for a distributed, redundant, and easily scalable object storage. Moreover, MinIO is S3-compatible, which allows us to exchange this object storage solution with any S3-compatible service. The S3 compatibility also allows combining MinIO with lakeFS, which we used for versioning, branching, and snapshot creation. In this PoC we used MinIO only as a local single node, but for production scenarios, it would be necessary to use the distributed mode. The MinIO Azure Gateway allows to easily migrate the actual object storage to the Azure Blob Storage service, which by itself is not S3-compatible. This is limited to the Azure Blob Storage Gen V1 offering. The Azure Blob Storage Gen V2, which provides a file system view on the stored objects, is not supported. As we recommend the combination of lakeFS and MinIO, we are fine with Azure Blob Storage Gen V1, as lakeFS provides us with the hierarchical file system view.

Overall, MinIO seems to be a good candidate for the object storage solution of a refined CluePoints platform. It can be deployed both on-premises and in the Cloud, and it allows to be replaced easily with solutions from standard object storage providers like AWS or Azure.

Trino still relying on Hive metastore

Trino is a fast and distributed query engine, which supports running standard SQL queries against files in object storage containing the study data. Both the parquet file format and the Apache Iceberg open table format are supported by Trino. The Trino query engine allows a straightforward migration of the existing SQL-based data processing of the CluePoints platform. Trino uses the Hive metastore to maintain and use the relevant metadata for the SQL processing. This information includes present schemas, tables within each schema, columns within each table as well as additional information about each column. Furthermore, the locations of the files for each table, the file format as well as information about partitioning keys are maintained in the Hive metastore. For each SQL query, Trino looks up the metadata in the Hive metastore and uses this information to derive and execute the data access. Typically, a Hive metastore is configured as a separate relational DB. We used a MariaDB instance for that purpose within the PoC.

As mentioned earlier, while being appropriate for the actual SQL processing, the Hive metastore becomes a drawback when used with a data versioning tool like lakeFS. The metadata (stored in Hive metastore) and the actual data (in object storage) need to be kept in sync. When a lakeFS branch is created, existing data from the source branch is directly present in the new branch. But the corresponding schema information in the Hive metastore still only exists for the tables of the source branch. The metadata for the destination branch needs to be added (i.e. recreated for the data in the destination branch) in an additional step after each creation of a new branch. We used lakeFS post-merge hooks to automate this sync of the schema information in the Hive metastore. But the execution of merge hooks is not reliable and could lead to mismatches between data and schema information within a lakeFS branch.

We therefore recommended using Trino together with lakeFS only for the processing of full sets of “raw” data. Still, doing a load based on increments would very much fit the data version control provided by lakeFS. This leads us to our final finding.

No relevant advantages of Apache Iceberg open table format, yet

The Apache Iceberg open table format offers advantages like ACID guarantees, time traveling, and hidden partitioning. But the Trino connector for Iceberg tables currently still relies on metadata stored in a Hive metastore.
Apache Iceberg is already working on its own Trino connector to get rid of the Hive metastore dependency (s. https://github.com/trinodb/trino/issues/1324). A direct Trino connector for Iceberg tables should allow us to create branches with both data and metadata in one atomic operation. Improved lakeFS merge support for Iceberg metadata files (s. https://docs.lakefs.io/understand/roadmap.html#pluggable-diffmerge-operators) may further facilitate parallel work on different branches.

Thus, we hope that Apache Iceberg (or another open table format) will soon support a seamless combination of Trino-based SQL processing on versioned datasets maintained by lakeFS.

Conclusion

Data-driven application development paired with cloud-based deployment and powerful and freely available AI tools turned (and are still turning) almost everything upside down in the world of analytical data processing. New technologies and tools promise to provide substantial improvements by allowing to take apart the former centralistic solution designs.

The Data Mesh principles with the main idea of a “decentralized data ownership delivering data as a product” provide guidance on how to get along in this challenging environment. Still, due to competing technologies, missing tool maturity, and the lack of established standards it is not easy to choose the appropriate ones.

CluePoints provided us with the interesting use case of processing clinical study data, which we used to become more experienced with some of these new technologies and tools while working on a real-world task. We believe that the combination of MinIO-based object storage and lakeFS-based data versioning could provide a promising solution design. Relying on Trino as an SQL engine for both reading and writing of the clinical data is possible. But for now, it introduces an unpleasant Hive metastore dependency.
Using data versioning, we would like to have both the clinical data and the metadata (SQL schema information) kept in the versioned object storage. With Apache Iceberg working on a specific Trino connector (without the Hive metastore dependency), we expect to close the gap and actually provide a valuable tech stack for the given task. The configuration and execution of data processing pipelines is a core functionality of the current CluePoints platform. Relying on a standard tool like dbt for this purpose generally seems to be a valid approach, but hasn’t been covered within the PoC.

Contact us