The CluePoints platform supports clinical trial sponsors that conduct medical studies, with statistical analyses of the captured study data. The main use case of the platform is the detection of systematic errors or anomalies in clinical studies' data: Customers (typically data analysts at pharmaceutical, medical device, or clinical research organization companies) use the platform to upload sets of study data and configure data processing pipelines. By executing those pipelines, customers can derive so-called signals, which point to possible weaknesses (systematic errors or anomalies) in the capturing of the study data. Customers can visualize and further analyze signals, and finally assign actions to reduce identified weaknesses to mitigate the risk of faulty study results.
The design of the current solution is mainly influenced by the classical Data Warehouse approach of using some kind of “staging area” and (ETL-) batch processing to gather, harmonize and integrate the “raw” clinical study data from operational source systems and to finally load this “derived” data into a relational database. Statistical methods are then applied to the data in this relational database to derive the signals, and a commercial tool is used for the visualization of the derived data and signals. CluePoints is aware of some issues with this solution design. The number of relational databases is very high, as multiple databases are created per clinical study. Load balancing of the data access is difficult, as clinical studies produce noticeably varying amounts of data. And the ACID guarantees provided by the used RDBMS are actually not needed for the data processing.
Why are we involved?
INNOQ is currently supporting CluePoints with the refinement of the software architecture of the CluePoints platform. As a result of this work, we became aware of some similarities between the CluePoints platform and the Data Mesh approach, which has quite recently been proposed for the design and development of solutions for analytical data processing. According to Data Mesh, the CluePoints platform can be seen as a “Self-Serve Data Platform”, allowing data analysts to easily create their own “Data Products”: Services that consecutively process incoming study data and present the signals derived from each version of the study data. We are currently promoting this approach and are interested in customer projects embracing it. Together with CluePoints, we aligned on a PoC to get more experience in applying up-to-date technologies commonly used in data mesh projects. We believe that those technologies will be helpful in the design of an improved solution for the CluePoints platform.
To avoid misunderstandings, let us point out, that the CluePoints platform does not align with all Data Mesh principles. Data analysts do not share their data products. Therefore, interoperability of data products is not needed nor supported within the CluePoints platform. Also, the Data Mesh approach typically aims to improve analytical data processing in larger enterprises, while CluePoints is providing a service for data analysts working at different clinical facilities.
PoC Solution overview
In this PoC, we focused on the actual data processing of the CluePoints platform. Particularly, we excluded a major part of the CluePoints platform that allows data analysts at clinical facilities to configure their own data processing pipelines.
Scope and use case
To further limit the scope of the PoC, we decided to focus on one processing step: the addition of surrogate keys to a clinical study dataset. A clinical study dataset consists of patient profile data, data about doctor visits, and about their participation in the clinical study. It also contains personal data about patient diseases and their treatment. The generated surrogate keys allow the combination of the different datasets by patient-, visit-, or location-IDs. We covered some of the most common scenarios for the incoming study data. We handled the addition of new datasets to an existing study, as well as the addition of rows or columns to an existing dataset. We did this specifically to challenge the usefulness of the data versioning tool support, which we introduced in this PoC. Finally, we automated the execution of the data processing for an easy handover to the CluePoints development team.
Object storage: We used an object storage for the raw and derived study data. This fits well with the ever-growing need for data storage capacity by being cost-efficient and easily expandable. We suggested S3 compatibility, as this standard is supported by most cloud providers and is also easily available in on-premise environments. That way, switching a cloud provider could be easily accomplished.
Columnar file format: We applied a columnar file format, as data processing is done in batches on complete files of incoming study data. Row-by-row processing is not needed. Columnar file formats provide good performance for SQL-based data processing, which currently is very much the standard way of doing data processing at CluePoints.
Alternative open table format: Additionally, we employed an open table format for the storage and processing of study data. Open table formats provide better metadata handling over the underlying columnar file formats, which results in performance improvements and support for additional features like time traveling.
Distributed SQL query engine: We used a distributed SQL query engine to support SQL-based data processing. Spark SQL is quite commonly used for this purpose. Our position here is that Spark SQL should currently not be used in production without a more profound understanding of the underlying Spark processing. Therefore, we decided to leave Spark SQL out of the picture for this PoC.
Data Versioning: We introduced a data versioning tool to allow git-like operations on data. The branching functionality provides isolation and avoids the need to copy the study data.
Matadata tracking/data lineage: We used OpenLineage-based metadata tracking to observe and present data lineage dependencies between the raw and the derived datasets.
We used the following technology stack within the PoC:
|MinIO||S3-compatible object storage||https://min.io/|
|lakeFS||git-like data versioning||https://lakefs.io/|
|Cherrypy||Used for lakeFS actions (post-merge hooks) implementation||https://cherrypydocrework.readthedocs.io/index.html|
|Parquet||columnar file format||https://parquet.apache.org/|
|Iceberg||open table format||https://iceberg.apache.org/|
|Trino||distributed SQL query engine||https://trino.io/|
|Marquez||OpenLineage compatible metadata service||https://marquezproject.ai/|
|Pandas||Used for IO on CSV- and Parquet files||https://pandas.pydata.org|
|Jupyter Notebook||Used as development environment||https://jupyter.org/|
We used Jupyter Notebooks with python for the implementation of our solution and relied on the pandas framework for the processing of the CSV files. We deployed the solution locally using docker and docker-compose. We also automated the setup of the environment and the execution of the data processing.
The docker environment
The environment is set up with docker containers, which share a docker network. In the configuration file, we configured the following containers:
- MinIO and an initial MinIO setup
- lakeFS with an additional PostgresDB container and an initial setup
- A webserver for the lakeFS hook
- Marquez including the webUI, the marquezAPI, and another PostgresDB
- The Trino coordinator
- A Hive metastore with a MariaDB backend
The running containers then look like this:
After those containers are up and running, the different endpoints can be inspected, and the data ingestion can begin.
Initial data ingestion
We used some study test data provided by CluePoints and derived some versions of consecutive study datasets from them, resulting in the following directory structure:
The “ilms” folder contained the original data and the folders ilms1 to ilms3 held different versions of the original data. Each version differentiates from the original by having fewer datasets, columns, or rows. The different data files in each of those folders contain the raw study data for e.g. patients, visits, or locations each as one CSV file. An additional contents CSV file provides metadata about the fields and their data types used in each of the study data CSV files. To correctly parse the study data CSV files and load the data into pandas dataframes, we assigned the proper data types from the contents CSV. Using pandas again, we then saved these dataframes as parquet files into our lakeFS repository. We created a new lakeFS branch for each of our different versions of study data. lakeFS handles the actual storage in the underlying MiniO system for us. To confirm that it worked, we can look at the respective endpoints, where we can see that in MinIO are files with unknown content. Whereby in lakeFS, these files are mapped into a hierarchical filesystem, which we can browse:
To be able to use our query engine Trino for the SQL processing on the now existing parquet files, we had to add the schema information in the Hive metastore. Therefore, we mapped the data types from the contents.csv file to the respective SQL data types and generated create table statements for each of the parquet files in our lakeFS branch. With these statements successfully executed, we could use SQL to access the ingested raw study data:
The processing step
We handled the addition of the surrogate keys for the parquet files as one of possibly multiple data processing steps within this PoC.
We implemented the processing step in two ways. In the first approach, we used a python dictionary to remember already given id-key pairs. To add a surrogate key column to a given table, we first generated missing IDs, added them to the dictionary, and then matched all IDs with their original keys. With Hive tables, it is not possible to add a column to an existing table. Because of this, we recreated the table with an appended column for the surrogate keys. We didn’t implement this data processing step reliably: after the termination of the processing, the memory-based python dictionary and the corresponding mapping of the original to the surrogate keys is lost.
In the second approach, we relied only on SQL processing. Instead of the python dictionary, we used a table for the mapping between surrogate and original keys. We created this mapping table, generated and added missing IDs for each original key to that table, and finally created the new table with a left-join.
We implemented this second approach based on the Apache Iceberg open table format, but it can be done with Hive tables in the same way. With that, we successfully retrieved what CluePoints calls a “base” dataset: The study data in SQL tables with added surrogate keys for patients (CluePoints also adds surrogate keys for visits and locations). After the successful processing, we committed both the parquet files with the raw data and those with the base data to the lakeFS branch.
We repeated the processing step outlined above for all the prepared study data versions. Each time, we created a new empty branch and finally got every version of study data loaded and accessible for SQL processing in lakeFS. We could easily switch between the different versions (snapshots) of the study data just by choosing the corresponding branch. A better approach would have been to create branches based on the previous one and just load only the changed or added rows of data. We decided against this for the following reasons:
- CluePoints is currently working only on the full study data for each version.
- Deleted rows need to be handled separately (e.g. by adding a “delete”-flag or by providing additional files with the deleted rows).
- Branching from a non-empty branch only takes over the actual data. The schema information in the Hive metastore needs to be copied or recreated in a separate step.
We spent some time analyzing the third argument. lakeFS provides support to copy the schema information in Hive metastore from one branch to another. We implemented a lakeFS POST merge hook to automate this copying of metadata whenever a new branch gets created. But while this worked in general, we did not appreciate this approach. A failing hook execution could leave the dataset in an inconsistent state, which might not have been spotted easily. Only a datatype might have been changed.
We tried out the Apache Iceberg open table format because this format keeps table metadata in the file system, which should be copied automatically together with the data. But for now, the Trino connector for Iceberg tables still relies on the Hive metastore, making the additional copy of metadata still necessary.
The data processing is integrated with Marquez, a metadata service, which implements the OpenLineage standard for tracking metadata and schema information. For each dataset in a data processing pipeline (incoming, intermediate, or outgoing) the schema information and data processing step, which created it, is logged. This allows visualizing the dependencies between datasets and processing steps (data lineage). In detail, we used the metadata service to:
- list all created datasets
- show the metadata for a given dataset
- visualize the dependencies of a processed dataset, e.g. the step of creating artificial key tables from raw datasets
We have created a data lineage graph with four knots for each dataset: the ingestion of the dataset, the unprocessed raw dataset, the addition of surrogate keys as the processing step, and the final “base” dataset which the processing step created. The datasets, processing steps, and the data lineage graph can be then inspected in the Marquez UI:
Findings and outlook
The following findings of our PoC regarding both the current state as well as possible improvements have been most noteworthy.
Usefulness of lakeFS branching
lakeFS supports git-like operations on data files maintained in object storage. Within the PoC, we used lakeFS branches for the processing of the “raw” into the “derived” datasets. For each new version of “raw” data, we created a new lakeFS branch. This allowed us to easily perform some useful operations on “raw” datasets, like
- incrementally add new or changed data files to an existing “raw” dataset
- combine files from different versions of “raw” datasets to a new “raw” dataset
- manually create or change “raw” datasets for test purposes
- try out a new version of our data processing on an existing “raw” dataset
After this first processing step, a lakeFS branch contains what CluePoints calls a “snapshot”: The relevant derived study data for a specific point in time. All further data processing steps (like the application of statistical methods) can then be executed regarding a specific lakeFS branch. A failure in the data processing does not introduce any corruption in statistical results already created, as each lakeFS branch could easily be reset to a former state. Results of a complete successful execution could be retained by committing or even by merging them into an explicit lakeFS branch for the results. Switching between different snapshots becomes as easy as doing a checkout on a specific lakeFS branch. So, the use of the branching functionality for snapshots is already meeting some major requirements of the CluePoints platform. Additionally, some planned lakeFS improvements are quite promising:
merge behavior of lakeFS: Currently, lakeFS does not care about the actual content of an object. If we merge two branches that both update the same file, it is up to the user to resolve this conflict. Within the PoC, we used overwriting of the destination branch with the files from the source branch (source-wins-strategy).
lakeFS is currently working on better merging support. The main idea is to support improved merging of metadata files used in open table formats like Apache Iceberg or Delta Lake. This will allow merging changes on Delta or Iceberg tables as long as no underlying data file has been changed on both source and destination branches.
Git-lakeFS integration: The combination of lakeFS and git will allow aligning commits from data with commits from source code. This will enable us to retrace which source code version has generated which data. Furthermore, it will enable us to precisely re-execute former steps of the data processing (s. https://docs.lakefs.io/understand/roadmap.html#git-lakeFS-integration).
Nevertheless, some concerns about the usage of lakeFS exist and should be mentioned:
lakeFS controls underlying object storage: lakeFS controls how data is stored in the underlying object storage. lakeFS uses metadata files to keep track of data files, branches, and commits and also stores these in object storage. It is not intended to access data files managed by lakeFS directly through the underlying object storage. This should not be an actual problem, as lakeFS itself provides S3-compatible access to the explicit versions of the stored data files.
Postgres dependency: lakeFS currently stores the metadata not only in mutable files but also in a PostgreSQL DB to ensure strong consistency. lakeFS is planning to get rid of the strict PostgreSQL dependency, but production scenarios will further rely on some DB technology to ensure ACID guarantees (https://docs.lakefs.io/understand/roadmap.html#decouple-ref-store-from-postgresql-high-priority).
Deleting files introduces a spark dependency: Deleting files from lakeFS is questionable in general: If we want to keep track of the correct history of datasets, we cannot remove any file, that has been part of a former commit. Still, there are situations, where we actually want to remove files. For instance, if we mistakenly committed a large file and do not want to waste storage space.
To completely remove files from lakeFS and to free up storage space, the lakeFS garbage collection tool needs to be used. This tool is currently implemented as a Spark job and introduces the Spark dependency, which we liked to avoid within the PoC.
Use of standard metadata tracking
Metadata tracking enables us to visualize dependencies between datasets in a data processing pipeline. With metadata tracking, it is easy to see how the data entered the pipeline, who worked on it, and which processing was done on a dataset before. Overall, we get the visual representation of a data lineage graph by tracking each step of a data processing pipeline.
The evolution of data lineage tools is not clear yet. We decided to use the OpenLineage reference implementation, Marquez. But there are numerous alternatives available (e.g. Amundsen or atlan).
Implementing metadata tracking manually needs quite a lot of effort. Additionally, care should be taken to clearly separate the code for the metadata tracking from the code for the actual data processing. In this PoC, we manually implemented metadata tracking but didn’t put much effort in the separation of the tracking code. Data processing tools like Apache airflow, Keboola, or dbt have integrated support for OpenLineage-based metadata tracking and fully allow focusing on the implementation of the processing step. The metadata tracking is done under the hood and can just be activated as required.
The CluePoints platform already keeps track of metadata of the used datasets and enables visualizing their dependencies. Additionally, the CluePoints platform currently does not rely on a standard tool for the configuration and execution of the data processing pipelines. Thus, while quite helpful within the scope of our PoC, we didn’t recommend the usage of a standard tool for metadata tracking to CluePoints.
MinIO with Azure Blob Storage
We chose MinIO for a distributed, redundant, and easily scalable object storage. Moreover, MinIO is S3-compatible, which allows us to exchange this object storage solution with any S3-compatible service. The S3 compatibility also allows combining MinIO with lakeFS, which we used for versioning, branching, and snapshot creation. In this PoC we used MinIO only as a local single node, but for production scenarios, it would be necessary to use the distributed mode. The MinIO Azure Gateway allows to easily migrate the actual object storage to the Azure Blob Storage service, which by itself is not S3-compatible. This is limited to the Azure Blob Storage Gen V1 offering. The Azure Blob Storage Gen V2, which provides a file system view on the stored objects, is not supported. As we recommend the combination of lakeFS and MinIO, we are fine with Azure Blob Storage Gen V1, as lakeFS provides us with the hierarchical file system view.
Overall, MinIO seems to be a good candidate for the object storage solution of a refined CluePoints platform. It can be deployed both on-premises and in the Cloud, and it allows to be replaced easily with solutions from standard object storage providers like AWS or Azure.
Trino still relying on Hive metastore
Trino is a fast and distributed query engine, which supports running standard SQL queries against files in object storage containing the study data. Both the parquet file format and the Apache Iceberg open table format are supported by Trino. The Trino query engine allows a straightforward migration of the existing SQL-based data processing of the CluePoints platform. Trino uses the Hive metastore to maintain and use the relevant metadata for the SQL processing. This information includes present schemas, tables within each schema, columns within each table as well as additional information about each column. Furthermore, the locations of the files for each table, the file format as well as information about partitioning keys are maintained in the Hive metastore. For each SQL query, Trino looks up the metadata in the Hive metastore and uses this information to derive and execute the data access. Typically, a Hive metastore is configured as a separate relational DB. We used a MariaDB instance for that purpose within the PoC.
As mentioned earlier, while being appropriate for the actual SQL processing, the Hive metastore becomes a drawback when used with a data versioning tool like lakeFS. The metadata (stored in Hive metastore) and the actual data (in object storage) need to be kept in sync. When a lakeFS branch is created, existing data from the source branch is directly present in the new branch. But the corresponding schema information in the Hive metastore still only exists for the tables of the source branch. The metadata for the destination branch needs to be added (i.e. recreated for the data in the destination branch) in an additional step after each creation of a new branch. We used lakeFS post-merge hooks to automate this sync of the schema information in the Hive metastore. But the execution of merge hooks is not reliable and could lead to mismatches between data and schema information within a lakeFS branch.
We therefore recommended using Trino together with lakeFS only for the processing of full sets of “raw” data. Still, doing a load based on increments would very much fit the data version control provided by lakeFS. This leads us to our final finding.
No relevant advantages of Apache Iceberg open table format, yet
The Apache Iceberg open table format offers advantages like ACID guarantees, time traveling, and hidden partitioning. But the Trino connector for Iceberg tables currently still relies on metadata stored in a Hive metastore.
Apache Iceberg is already working on its own Trino connector to get rid of the Hive metastore dependency (s. https://github.com/trinodb/trino/issues/1324). A direct Trino connector for Iceberg tables should allow us to create branches with both data and metadata in one atomic operation. Improved lakeFS merge support for Iceberg metadata files (s. https://docs.lakefs.io/understand/roadmap.html#pluggable-diffmerge-operators) may further facilitate parallel work on different branches.
Thus, we hope that Apache Iceberg (or another open table format) will soon support a seamless combination of Trino-based SQL processing on versioned datasets maintained by lakeFS.