Hadoop Data ingestion is the beginning of your data pipeline in a data lake. It means taking data from various silo databases and files and putting it into Hadoop. Sounds arduous? For many companies, it does turn out to be an intricate task. That is why they take more than a year to ingest all their data into Hadoop data lake. Why does that happen? The reason is as Hadoop is an open source; there are a variety of ways you can ingest data into Hadoop. It gives every developer the choice of using her/his favorite tool or language to ingest data into Hadoop. Developers while choosing a tool/technology stress on performance, but this makes governance very complicated.
The Hadoop Distributed File System (HDFS)
Hadoop uses a distributed file system that is optimized for reading and writing of large files. When writing to HDFS, data are “sliced” and replicated across the servers in a Hadoop cluster. The slicing process creates many small sub-units (blocks) of the larger file and transparently writes them to the cluster nodes. The various slices can be processed in parallel (at the same time) enabling faster computation. The user does not see the file slices but interacts with the whole file. When transferring files out of HDFS, the slices are assembled and written as one file on the host file system.
Can Hadoop Data Ingestion be Made Simpler and Faster?
Definitely. For that, Hadoop architects need to start thinking about data ingestion from management’s point of view too. By adopting these best practices, you can import a variety of data within a week or two. Moreover, the quicker we ingest data, the faster we can analyze it and glean insights. Please note here I am proposing only one methodology – which is robust, is widely available and performs optimally. The idea is to use these techniques so we can ingest all the data within few weeks, not months or years. Now, let’s have a look at how we import different objects:
1. File Ingestion
Ingestion of file is straightforward. The optimal way is to import all the files into Hadoop or Data Lake, to load into Landing Server, and then use Hadoop CLI to ingest data. For loading files into landing server from a variety of sources, there is ample technology available. Keep using what you are and just use Hadoop CLI to load the data into Hadoop, or Azure Data Lake, or S3 or GCS (Google Cloud Storage)
2. Database Ingestion
Now, this is a significant deal. I have seen companies using Sqoop (Variety of ways in Sqoop), Ni-Fi, and other tools to load the database into Hadoop. So here is my simple guide.
2.1 Database Dump
When data is smaller, like less than a million rows, you can afford to load this database dump on daily/hourly basis. Do not create change-data-capture for smaller tables. It would create more problems in Hadoop. The tables which have 100 million+ records, use multiple threads of Sqoop (-m) to load into Hadoop.
2.2 Change Data Capture
Do ‘Change Data Capture’ (CDC) only for the tables which are large ( at least 10M+). For CDC you can use either trigger on the source table ( I know DBAs don’t prefer that), or use some logging tool. These tools are proprietary for every database. Golden Gate for Oracle, SQL Server CDC, etc. Once you ingest CDC into Hadoop, you need to write Hive queries to merge these tables. You can also use OvalEdge time machine to process these transactions.
Change Data Capture (CDC) techniques are used to identify changes. CDC can be the basis to synchronize another system with the same incremental changes, or to store an audit trail of changes. The audit trail may subsequently be used for other uses e.g. to update a data warehouse or to run analyses across the changes e.g. to identify patterns of changes. In this blog post, there are four common methods to perform CDC: Date_Modified, DIFF, Triggers, and Log-Based Change Data Capture. I will also share some of the challenges with each method.
2.2.1 Date_Modified
Many transactional applications keep track of metadata in every row including who created and/or most-recently modified the row, as well as when the row was created and last modified. The approach to CDC in such an environment is to keep track of when changes are extracted, and in a subsequent run filter on the DATE_MODIFIED column to only retrieve rows that were modified since the most recent time data was extracted. This approach has a few challenges that may or may not be a concern, depending on the application:
- Data deletes are a challenge because there is no DATE_MODIFIED for a deleted row (unless deletes are logical and update a flag in the row indicates the row was deleted). The extreme case of delete is truncate table which is uncommon in transactional applications but does occur sometimes.
- DATE_MODIFIED must be available on all tables and must be reliably set. Database triggers may be a good way to set the values but these may introduce overhead on the transactional application.
- Extracting the changes uses a lot of resources. Of course DATE_MODIFIED may be indexed to lower the impact of the select statement at the cost of storing (and continuously updating) the additional index.
Using DATE_MODIFIED for CDC works well for traditional data warehouse applications that are populated using Extract, Transform and Load (ETL) jobs, when the source tables don’t process deletes.
2.2.2 Diff
The diff method for change data capture compares the current state of the data with previous state of the data to identify what changed. Challenges with this approach include:
- To perform the diff requires a lot of resources to compute the differences between the data, and resource consumption grows at least linearly with the growth in data volume.
- CDC cannot be performed in real-time because the diff realistically takes too many resources to perform all the time.
Compared to the DATE_MODIFIED CDC method the diff method does not have the challenge with deleted rows. The diff method works well for low data volumes.
2.2.3 Triggers
Database triggers can be used to perform CDC in shadow tables. The shadow tables may store the entire row to keep track of every single column change, or only the primary key is stored as well as the operation type (insert, update or delete). The use of database triggers to perform CDC also has a few challenges:
- Firing the trigger, and storing the row changes in a shadow table, introduces overhead. In an extreme case CDC may introduce 100% overhead on the transaction i.e. instead of .1 second it may take .2 seconds to complete a transaction.
- The lower-overhead alternative to only store the primary key of the table requires a join back to the source table to retrieve the changes which (1) increases the load to retrieve the changes, and (2) loses intermediate changes if multiple changes took place on the same row.
- Should the source application perform a truncate then chances are the trigger won’t fire and changes are not recorded. Also, if changes are made to tables then triggers and shadow tables may also have to be modified, recreated and/or recompiled which introduces extra overhead to manage and maintain the database.
CDC using database triggers lowers the overhead to extract the changes but increases the overhead to record the changes.
2.2.4 Log-Based Change Data Capture
Transactional databases store all changes in a transaction log in order to recover the committed state of the database should the database crash for whatever reason. Log-based CDC takes advantage of this aspect of the transactional database to read the changes from the log. The challenges with log-based CDC are:
- Interpreting the changes in the transaction log is difficult because there are no documented standards on how the changes are stored (i.e. transaction logs from different database vendors are completely different), and there are many scenarios that must all be considered and tested (e.g. consider clustered databases, rollbacks and savepoints, many different ways to perform inserts, updates and deletes, etc.).
- Database vendors may not provide an interface to the transaction logs – documented or not – and even if there is one it may be relatively slow and/or resource intensive.
- Most databases have been optimized to only use internal identifiers to recover database row changes which is insufficient to perform CDC and record the changes on a different system. Supplemental logging of primary key columns is required to retrieve the context of the updates. The introduction of supplemental logging will increase the volume of data written to the transaction logs but generally only by a small percentage, and generally, there is very little if any measurable performance impact on the transactional application.
The biggest benefits of log-based CDC include:
- Minimal Impact: Log-based CDC has less impact on the database because it reads directly from the logs without directly impacting the transaction. In contrast, trigger-based CDC creates triggers on tables that require change data capture, and firing these slows down transactions.
- Fast performance: Directly reads the logs on the file system allowing highly efficient change data capture, supporting large volumes of data.
- More flexibility: Log-based capture supports more data operations such as truncates, and enables support for DDL capture.
2.3 Streaming Ingestion
Data appearing on various IOT devices or log files can be ingested into Hadoop using open source Ni-Fi. I know there are multiple technologies (Flume or Streamsets etc.), but Ni-Fi is the best bet. After we know the technology, we also need to know that what we should do and what not.
The Dos and Don’ts of Hadoop Data Ingestion
- Do not create CDC for smaller tables; this would create more problem at a later stage.
- When you do a CDC, try to merge to main tables, not more than hourly. If you want to do every minute or so, you are doing something wrong. Keep is either on daily basis or hourly max.
- Use Sqoop -m 1 option for smaller tables.
- Use -queries option all the time, do not use -table option.
- Directly load data into a managed table. Do not use external tables. Governing external tables is hard.
- Do not import a BLOB or a CLOB (Character Large Object) field using Sqoop. If you need to do that, write some custom logic or use OvalEdge.
- Import into Hive table, where all the columns are String type. Now using additional transformation to convert this String to appropriate Date/timestamp/double format. Or use OvalEdge to load with a single click.