Saturday, January 1, 2011

Pentaho Data Integration and Infinidb Series: Bulk Upload


Pentaho Data Integration and InfiniDB Series: Bulk Upload

Introduction

Calpont InfiniDB is one of the more popular column oriented databases. If you are not familiar with the concept of column oriented databases, I suggest visiting InfiniDB.org for a good overview. InfiniDB is available as an open source version as well as in a paid for enterprise edition. 

This is the first one is short series of articles on how to use Pentaho Data Integration (Kettle) with InfiniDB. A column oriented database is one of the main building blocks of a BI solution. In the last few years Kettle has become one of the most popular open source ETL tools. Currently there is no dedicated step in Kettle that allows the direct export of data into InfiniDB, but this doesn't mean that it is difficult to achieve. This article will show the fairly easy process of setting up such a solution. The article assumes that you are familiar with Kettle, Linux and InfiniDB.

Imagine that we have to load a data warehouse on an hourly basis. Our data warehouse has 3 fact tables that we want to populate using the InfiniDB bulk loader.

Our Kettle job will look like this:

  1. Start Job step
  2. A standard transformation with the results exported to a pipe separated text files (export happens within the transformation)
  3. Check if file exists
  4. Create InfiniDB Job file using the colxml utility
  5. Run bulk upload using the cpimport utility

I will not go into much detail about the 2nd step. For the purpose of this exercise I only created quite a simple transformation.

Prepare Tables

First off, let's create a special database on InfiniDB called dwh_test with three tables:

mysql> CREATE DATABASE dwh_test;
Query OK, 1 row affected (0.00 sec)

mysql> CREATE TABLE dwh_test.fact_impression (`count` INT(255)) ENGINE=infinidb;
Query OK, 0 rows affected (2.82 sec)

mysql> CREATE TABLE dwh_test.fact_click (`count` INT(255)) ENGINE=infinidb;
Query OK, 0 rows affected (1.09 sec)

mysql> CREATE TABLE dwh_test.fact_advert_event (`count` INT(255)) ENGINE=infinidb;
Query OK, 0 rows affected (0.37 sec)

For the following examples, please find the files here:


  • InfiniDB bulk load job (using mainly Kettle steps): download
  • InfiniDB bulk load job (using shell): download
  • Sample transformation: download

Export to text files 
Imagine that our job is run on an hourly level. The transformation exports three | (pipe) separated text files into the /usr/local/Calpont/data/bulk/data/import/ directory, which is the InfiniDB default bulk import directory. Infinidb accepts pipe separated text files per default, but you are free to use other separators as well.

For our ETL process, it is not always guaranteed that there is data for every run, which means, there is a possibility that there is no output data at all. This is something we have to keep in mind. Hence, one thing we can do in our main transformation is is to use a Switch/Case step to figure out if we have data in the stream (Note: There are other ways to do this check as well):


  • We have some sort of input etc, which we will not discuss here.
  • After the Group By step we add a Switch/Case step and create a hub from the first one to the second one
  • Add a Dummy step and a Text file output step
  • Create a hub from the Switch/Case step to the Dummy step
  • Create a hub from the Switch/Case step to the Text file output step
  • Now double click on the Switch/Case step and fill it out:

  • Set filed name to switch to the count field (but in theory it can be any field, as long as you know that it is definitely populate when data is available)
  • We add only one case leaving the value empty and setting the target step to a Dummy step. Leaving the value empty means that the field has to be NULL. So in case there are no records in the stream, the stream will be directed to the Dummy step.
  • The default target step is the Text file output step

  • Double click the Text file output:

  • Set the filename to /usr/local/Calpont/data/bulk/data/import/<tablename>. Replace <tablename> with the actual table name.
  • Set Extension to tbl
  • Click on the Content tab:

  • Set Separator to |
  • Leave the Enclosure field empty
  • Tick Enable the enclosure fix?
  • Untick Header
  • Set Format to Unix
  • Set Encoding to UTF-8. Note: InfiniDB only accepts UTF-8!

  • Fields tab: If your fields are not in the same order as the database columns, make sure you bring them now into the right order.

Find below a screenshot of the extremely simplified transformation:
Please find below two different solutions: The first one ("Using mainly Kettle steps") tries to solve most tasks in Kettle itself, whereas the second one makes a bit more use of shell script. It's down to you then to decide which one suits your project better.


Using mainly Kettle steps

It is advisable to check if the text output file of our transformation exists or not, otherwise Infinidb will throw an error. Our transformation should only create a file if there is data available.

The easiest approach is to use the Check if file exists step. 
This has two advantages: 


  • You don't have to write a shell script and 
  • It is OS independent. 
The disadvantage is that your flow gets quite a bit longer if you are importing more than one file.

The example below is for one file upload process only. 
The general flow is as follows:

  • Check if file exists step
  • Execute a shell script step: Create InfiniDB job file
  • Execute a shell script step: Execute bulk upload

Check if file exists

Create a new job in Kettle and insert 

  1. a Start step
  2. a Transformation step and link it to your main transformation
  3. Add a Check if file exists step to the canvas 
  4. Create hubs between the first three steps
  5. Add two Execute a shell script steps.
  6. Create a "Follow when result is true" hub to the 1st Execute a shell script step
  7. Create a "Follow when result is true" hub from the first Execute a shell script step to the second one.
  8. Add a Delete file step. Create a hub from the 2nd Execute a shell script step to this one.
  9. Double click on the Check if file exists step:

  • Add following file path: /usr/local/Calpont/data/bulk/data/import/fact_impression.tbl

Find below a screenshot of the job:

Setup Infinidb job files

Basically, the colxml utility creates the InfiniDB bulk job file in /usr/local/Calpont/data/bulk/data/job/ for you and accepts the database name ("dwh_test" in our example), a job number ("9993", which you can set to any convenient number) and the table name ("fact_impression") as command line arguments. For additional arguments please reference the Infinidb Admin manual. If you use another separator than the pipe, you have to mention it here as well.

Open the 1st Execute a shell script step; name it Setup Bulk Job

  • Double click on Load Tables and make sure the Insert script is ticked. 
  • Set the Working directory to /usr/local/Calpont/bin/  
  • Click the Script tab and insert following lines:

./colxml dwh_test -j 9993 -t fact_impression

Load Tables

Open the 2nd Execute a shell script step and name it Load Tables; create a hop from the Setup Bulk Job to this one (if you haven't done so already).

  • Double click on Load Tables and make sure Insert script is ticked. 
  • Set the Working directory to /usr/local/Calpont/bin/ 
  • Click the Script tab and insert following lines:

./cpimport -j 9993

cpimport is the Infinidb bulk upload utility. It accepts the job number as a command line argument. For additional arguments please reference the Infinidb Admin manual.

Delete text file

Now we also want to make sure that we clean up everything. 

  • Insert a Delete file step
  • Create a hop from the load Load Tables step to this one.
  • Double click on the Delete file step
  • Insert into the File/Folder cell the following: /usr/local/Calpont/data/bulk/data/import/fact_impression.tbl

Round up

Save the job and transformation, execute the job and check the InfiniDB table. In my case, everything happens on the command line on EC2:

[xxxx@ip-xxxx data-integration] nohup ./kitchen.sh -file="../my-files/testing/infinidb_bulk_upload_example/jb_infinidb_bulk_upload_example.kjb" -Level=Basic &
[xxxx@ip-xxxx data-integration] idbmysql
mysql> USE dwh_test;
Database changed
mysql> SELECT * FROM fact_impression;
+-------+
| count |
+-------+
|     3 |
+-------+
1 row in set (0.11 sec)

Note: To further improve this example, you should add a "Delete Files" step to the beginning of your job to delete any existing InfiniDB job files. 

Using the power of shell scripts

Setup Infinidb job files

First off, create and test a standard shell file. You can create it in any convenient folder. Depending on your Linux distribution, the syntax may vary. The example below is for RedHat Linux.

vi test.sh

Press i and insert following lines:

#!/bin/sh

[ -f /usr/local/Calpont/data/bulk/data/import/fact_advert_event.tbl ] && echo "File exists" || echo "File does not exist"

Press ESC :wq to write and close the file

Make the file executable:
chmod a+x test.sh

Run the shell script:
./test.sh

If you get an error back, try to fix it, otherwise we are ready to go ahead.

Create a new job in Kettle and insert 

  1. a Start step
  2. a Transformation step and link it to your main transformation
  3. an Execute a shell script step; name it Setup Bulk Job
  4. Create a hub from the Start step to the Transformation step and another one from the Transformation step to the Execute a shell script step

Next up:

  • Double click on Load Tables and make sure Insert script is ticked. 
  • Set the Working directory to /usr/local/Calpont/bin/  
  • Click the Script tab and insert following lines:

We slightly change the script now and include a check for all our files: We create a separate Infinidb job for each table import so that we can do some easy checking.

Basically, the colxml utility creates the bulk job file in /usr/local/Calpont/data/bulk/data/job/ for you and accepts the database name ("dwh_test" in our example), a job number ("9991", which you can set to any convenient number) and the table name ("fact_advert_event") as command line arguments. For additional arguments please reference the Infinidb Admin manual.

Before we request the new job files, we also remove all existing ones (in case there is no data, no text file will exist, hence no new job file would be created with our condition, but there might be still an old job file in the directory). 

#!/bin/sh
rm -rf  /usr/local/Calpont/data/bulk/job/Job_9991.xml
rm -rf  /usr/local/Calpont/data/bulk/job/Job_9992.xml
rm -rf  /usr/local/Calpont/data/bulk/job/Job_9993.xml

[ -f /usr/local/Calpont/data/bulk/data/import/fact_advert_event.tbl ] && ./colxml dwh_test -j 9991 -t fact_advert_event
[ -f /usr/local/Calpont/data/bulk/data/import/fact_advert_click.tbl ] && ./colxml dwh_test -j 9992 -t fact_click
[ -f /usr/local/Calpont/data/bulk/data/import/fact_advert_impression.tbl ] && ./colxml dwh_test -j 9993 -t fact_impression

Load Tables

Insert another Execute a shell script and name it Load Tables; create a hop from the Setup Bulk Job to this one.

  • Double click on Load Tables and make sure the Insert script is ticked. 
  • Set the Working directory to /usr/local/Calpont/bin/  
  • Click the Script tab and insert following lines:

#!/bin/sh
rm -rf  /usr/local/Calpont/data/bulk/job/Job_9991.xml
rm -rf  /usr/local/Calpont/data/bulk/job/Job_9992.xml
rm -rf  /usr/local/Calpont/data/bulk/job/Job_9993.xml

[ -f /usr/local/Calpont/data/bulk/data/import/fact_advert_event.tbl ] && ./colxml dwh_test -j 9991 -t fact_advert_event
[ -f /usr/local/Calpont/data/bulk/data/import/fact_advert_click.tbl ] && ./colxml dwh_test -j 9992 -t fact_click
[ -f /usr/local/Calpont/data/bulk/data/import/fact_advert_impression.tbl ] && ./colxml dwh_test -j 9993 -t fact_impression

cpimport is the Infinidb bulk load utility. It accepts the job number as a command line argument.  For additional arguments please reference the Infinidb Admin manual.

Delete text files

Now we also want to make sure that we clean up everything. 

  • Insert a Delete files step
  • Create a hop from the load Load Tables step to this one.
  • Double click on the Delete files step
  • Insert into the File/Folder cell the following: /usr/local/Calpont/data/bulk/data/import
  • and into the Wildcard (RegEx) cell: ^.+\.tbl$
This will delete any files ending in .tbl from this directory.

Find below a screenshot of the job:

Round up

Save the job and transformation, execute the job and check the InfiniDB table. In my case, everything happens on the command line on EC2:

[xxxx@ip-xxxx data-integration] nohup ./kitchen.sh -file="../my-files/testing/infinidb_bulk_upload_example/jb_infinidb_bulk_upload_example_using_shell.kjb" -Level=Basic &
[xxxx@ip-xxxx data-integration] idbmysql
mysql> USE dwh_test;
Database changed
mysql> SELECT * FROM fact_impression;
+-------+
| count |
+-------+
|     3 |
+-------+
1 row in set (0.11 sec)

If things go wrong ... Error log is your friend

InfiniDB provides quite good error logging. You will find a short error description in the Kettle log (make sure you output the basic log data to a file).
Have a look at following directories to find the detailed InfiniDB log files:

/usr/local/Calpont/data/bulk/log:

  • errors can be found in job_#.err
  • successful jobs will be logged in job_#.log

/usr/local/Calpont/data/bulk/data/import:

  • If the data of the import file does not match the table definition, then a file tablename.tbl.bad will be created. 

Files and references

Please find the example files here:
InfiniDB bulk load job (using mainly Kettle steps): download
InfiniDB bulk load job (using shell): download
Sample transformation: download

Some information for this article was taken from the Calpont Infinidb manuals as well as from the Pentaho forums.

4 comments:

  1. Hello Diethard I'm trying to download files but get 404 error. Do you have already those files?

    I have just installed InfiniDB and begin connecting through Kettle. Your post is my best reference to get started.

    Thanks a lot

    ReplyDelete
    Replies
    1. Ok, I managed to find these files. I changed the link in the blog post now ... let me know if it works.

      Delete
    2. Thank you so much Diddy.. Now they're working fine.

      Cheers

      Delete