As an IT consultancy, over time we have worked on a number of projects with various requirements on pipelines and ETLs. Of course, data pipelines are key to the success of any organization that needs insights from data. Data needs to be ingested, cleaned, processed and made available for the right people (think authentication, authorization). Data drives the decision making for any organization, and it is a key competitive advantage for any organization looking into the future.
In this article, we aim to describe one of the solutions we delivered using AWS, with the respective workflow and concepts, that you might find handy when considering your next pipeline.
The problem statement that drove this solution required us to deliver an architecture that was scalable and redundant, kept track of the processed units and could be run on-demand (for new or previously failed data). We should also take into account high velocity, data snapshots during transformations, and traceability.
There are currently many options available in terms of solutions and vendors, as well as frameworks such as Airflow, prefect, Luigi, AWS SageMaker, and Azure ML. Pretty much any vendor can give you a built solution. However, when assessing your options you’ll need to also consider the total cost of ownership (TCO) together with the know-how and skill-sets required to maintain and support such a solution.
You might actually be tempted to create your own custom solution, as, when starting from scratch, it is easier to focus on the high level features and the interesting challenges a technical team may want to cover. When that happens however, we often ignore the depth of the underlying structure needed to support a 24/7 system working in production and the integration with other systems. Reinventing something is often needless, although it will certainly depend on the use cases and requirements at hand.
Without getting into non-technical topics, it is usually recommended to create a proof of concept (PoC), a prototype, and/or a minimum viable product (MVP), which you will then evolve into the final solution. During these activities you’ll need to do your own homework and see what’s available, what’s possible, and what’s needed.
In summary, there are many ways and mechanisms in which an ETL pipeline can be done. Here we will focus on data flows, giving an example of a pipeline we have delivered and which currently processes dozens of news articles per second, from multiple news sources, continuously; 24 x 7. Needless to say, even though in this particular solution the data comes in the form of news articles, conceptually any kind of data can be processed and consumed in the same way, such as images or videos, not only text, articles or documents (perhaps OCR PDFs is something common).
The tech stack we chose was ElasticSearch as a NoSQL database and Python for all processing (including AWS Lambdas) using AWS on the infra side.
The concepts of the solution described here should be the main take-aways. Evidently, the database, programming language and infrastructure provider do not affect in any way the workflows or design of the end result.
Our team was very familiar with AWS, AWS Lambda and ElasticSearch. Because of this, no other middleware or platforms were necessary (e.g. cluster for Airflow, prefect, Docker, K8s, distributed computing – and all the support and TCO that entails).
The ingestion Pipeline
The previously mentioned news articles that need to be ingested are “PUT” in an AWS S3 bucket in their raw format (XML). An AWS lambda function gets triggered when a new file is received in that bucket (PUT operation), which reads the XML raw article and converts it into JSON format.
Internally, this lambda is using a pydantic model to validate and verify the data and expected format, allowing for optional fields, but making sure the mandatory fields and format types are present. If the processing of an article fails, this XML will be copied into another AWS S3 bucket named “errors”, to analyze and review at a later stage.
After the first lambda converts the XML into JSON, it “PUT”s the JSON as a file in another AWS S3 bucket. The reason for this intermediate step, and consequently additional storage, is:
- To have traceability between the converted and original article, where this can act as a cache for the longer processing that the conversion takes, and
- To be able to reproduce or re-run conversions as logic changes. This can provide a diff between what has been used and the original resource.
Finally, there is a second lambda whose purpose is to be triggered once the JSON file is “PUT” into the JSON AWS S3 bucket, reads the file and writes it to the data store of choice (in this case, ElasticSearch).
Since we are working with millions of news articles with changing metadata fields, and the use cases include full text search capabilities, it makes perfect sense to use ElasticSearch as the NoSQL data store solution.
This ElasticSearch cluster has indices with naming conventions using date timestamps to group news articles.
Enriching with metadata
The ingestion of data is only the first step to bring value to data.
In this project, once the articles are stored in the database (ElasticSearch), a job will run everyday to process the newly received articles, adding metadata to them and storing them in another ElasticSearch cluster.
The reasoning behind this approach is that there will be a raw data source (ElasticSearch) and another one for processed articles with additional information written by processes (in this case, Machine Learning models). This will once again allow traceability and comparison, caching, and re-running if necessary (with or without having to ingest or reprocess different things received by the raw articles).
The job will identify the newly ingested articles by identifying the last one processed in the enriched ElasticSearch cluster, and will proceed to load the article, send it to a machine learning model, and attach the results as new fields in the document, before storing it again in the same ElasticSearch cluster, but in another “enriched” index.
This concludes the enriching pipeline, which takes the news article and creates an enriched version with additional metadata which is also stored in ElasticSearch for other applications and users to consume.
Recipe of this solution
- AWS S3 bucket (x2)
- AWS Lambdas (x2)
- ElasticSearch cluster (x2)
- Job to process documents
There are a few scenarios and specifics which we did not go over. This is an overview and as such, some things are simplified.
To give you a few examples of things that you might need to consider:
- Failure scenarios (perhaps another S3 bucket for failed articles, or another database to store articles by id to know which ones have failed, among other options)
- Using ephemeral Kubernetes pods instead of lambdas
- Scalability when using the different solutions
Building the pipeline was an interesting activity, providing a solution outside the usual ETL frameworks and platforms. In a way, much simpler and focused on the needs, instead of going through a larger vendor-based or open-source solution that would increase cost and complexity.
This pipeline has proven to be very successful, reliable, and relatively straightforward to debug when problems in the components were identified – Lambda logic or ElasticSearch/S3 issues. Very clean and clear architecture, considerably reducing the TCO because it was pretty much serverless (with the exception of ElasticSearch, which was already in place anyway and required for the application). As a point of proof, the solution has been running successfully over the last two years!
The temptation to use the latest tools, external vendors or make a larger system than needed is always present, and certainly needs to be evaluated and considered with each project. However, to ensure an optimal approach the focus should remain on the defined objective(s), whilst considering the TCO.