Monday, January 31, 2011

Pentaho Data Integration: Remote execution with Carte


Pentaho Data Integration: Remote execution with Carte

Tutorial Details

  • Software: PDI/Kettle 4.1 (download here), installed on your PC and on a server
  • Knowledge: Intermediate (To follow this tutorial you should have good knowledge of the software and hence not every single step will be described)
Carte is an often overlooked small web server that comes with Pentaho Data Integration/Kettle. It allows remote execution of transformation and jobs. It even allows you to create static and dynamic clusters, so that you can easily run your power hungry transformation or jobs on multiple servers. In this session you will get a brief introduction on how to work with Carte.
Now let's get started: SSH to the server where Kettle is running on (this assumes you have already installed Kettle there).

Encrypt password

Carte requires a user name and password. It's good practise to encrypt this password. Thankfully Kettle already comes with an encryption utility.
In the PDI/data-integration/ directory run:

sh encr.sh -carte yourpassword

OBF:1mpsdfsg323fssmmww3352gsdf7

Open pwd/kettle.pwd and copy the encrypted password after "cluster: ":

vi ./pwd/kettle.pwd


# Please note that the default password (cluster) is obfuscated using the Encr script provided in this release
# Passwords can also be entered in plain text as before
#
cluster: OBF:1mpsdfsg323fssmmww3352gsdf7

Please note that "cluster" is the default user name.

Start carte.sh

Make sure first that the port you will use is available and open.

In the simplest form you start carte with just one slave that resides on the same instance: 
nohup sh carte.sh localhost 8181 > carte.err.log &

After this, press CTRL+C .

To see if it started:
tail -f carte.err.log

Although outside the scope of the session, I will give you a brief idea on how to set up a cluster: If you want to run a cluster, you have to create a configuration XML file. Examples can be found in the pwd directory. Open one of these XMLs and amend it to your needs. Then issue following command:
sh carte.sh ./pwd/carte-config-8181.xml >> ./pwd/err.log

Check if the server is running

Issue following commands:

[root@ip-11-111-11-111 data-integration]# ifconfig
eth0      Link encap:Ethernet  HWaddr ...
     inet addr:11.111.11.111  Bcast:

[... details omitted ...]


[root@ip-11-111-11-111 data-integration]# wget http://cluster:yourpassword@11.111.11.111:8181

--2011-01-31 13:53:02--  http://cluster:*password*@11.111.11.111:8181/
Connecting to 11.111.11.111:8181... connected.
HTTP request sent, awaiting response... 401 Unauthorized
Reusing existing connection to 11.111.11.111:8181.
HTTP request sent, awaiting response... 200 OK
Length: 158 [text/html]
Saving to: `index.html'

100%[======================================>] 158         --.-K/s   in 0s

2011-01-31 13:53:02 (9.57 MB/s) - `index.html' saved [158/158]

If you get a message like the one above, a web server call is possible, hence the web server is running.

With the wget command you have to pass on the

  • user name (highlighted blue)
  • password (highlighted violet)
  • IP address (highlighted yellow)
  • port number (highlighted red)

Or you can install lynx:

[root@ip-11-111-11-111 data-integration]# yum install lynx
[root@ip-11-111-11-111 data-integration]# lynx http://cluster:yourpassword@11.111.11.111:8181

It will ask you for user name and password and then you should see a simple text representation of the website: Not more than a nearly empty Status page will be shown.

Kettle slave server

Slave server menu

   Show status


Commands: Use arrow keys to move, '?' for help, 'q' to quit, '<-' to go back.
  Arrow keys: Up and Down to move.  Right to follow a link; Left to go back.
 H)elp O)ptions P)rint G)o M)ain screen Q)uit /=search [delete]=history list


You can also just type the URL in your local web browser:
http://ec2-11-111-11-111.XXXX.compute.amazonaws.com:8181

You will be asked for user name and password and then you should see an extremely basic page.

Define salve server in Kettle


  1. Open Kettle, open a transformation or job
  2. Click on the View panel
  3. Right click on Slave server and select New.


Specify all the details and click OK. In the tree view, right click on the slave server you just set up and choose Monitor. Kettle will now display the running transformations and jobs in a new tab:

Your transformations can only use the slave server if you specify it in the Execute a transformation dialog.

For jobs you have to specify the remote slave server in each job entry dialog. 

If you want to set up a cluster schema, define the slaves first, then right click on Kettle cluster schemas. Define a Schema Name and the other details, then click on Select slave servers. Specify the servers that you want to work with and define one as the master. A full description of this process is outside the scope of this article. For further info, the "Pentaho Kettle Solutions" book will give you a detailed overview.

For me a convenient way to debug a remote execution is to open a terminal window, ssh to the remote server and tail -f carte.err.log. You can follow the error log in Spoon as well, but you'll have to refresh it manually all the time.

Wednesday, January 26, 2011

Kettle Transformation Logging and Change Data Capture (New)

Kettle Transformation Logging and Change Data Capture
Updated Version including the great feedback from Matt Casters and Jos (both authors of the fantastic "Kettle Solutions" book)

Sample File

Tutorial Details

  • Software: PDI/Kettle 4.1 (download here), MySQL Server (download here)
  • Knowledge: Intermediate (To follow this tutorial you should have good knowledge of the software and hence not every single step will be described)
  • Tutorial files can be downloaded here

CDC

In this session we will look at the extensive Pentaho Data Integration/Kettle 4.1 logging features. Now how come we also talk about Change Date Capture (CDC)? This is because Kettle incorporates logging of the start and end date for CDC in the logging architecture. 


If you are not familiar with CDC, here a brief summary: CDC allows you to capture all the data from the source system that has changed since the last time you ran the ETL process. Wikipedia gives a more detailed overview in case you are interested. In the easiest case you have some sort of timestamp or datetime column that you can use for this purpose. The following example will be based on a simple sales table that has a date column which we can use for CDC:



Note that this is dataset has actually quite old data, something that we have to keep in mind. What happens if new data gets inserted later on? More on this after the basics.


Open your favourite SQL Client (and start your MySQL server if it is not running yet) and issue following SQL statements:


USE
test
;

DROP TABLE IF EXISTS
`sales`
;

CREATE TABLE
`sales`
(
`date` DATETIME,
`product_type` VARCHAR(45),
`sales` INT(255)
)
;

INSERT INTO
`sales`
VALUES
('2010-01-20 00:00:00','Shoes',234),
('2010-01-20 00:00:00','Cheese',456),
('2010-01-21 00:00:00','Shoes',256),
('2010-01-21 00:00:00','Cheese',156),
('2010-01-22 00:00:00','Shoes',535),
('2010-01-23 00:00:00','Cheese',433)
;


SELECT
*
FROM
`sales`
;


Now the way to get the timeframes for querying the data is to use the Get System Info step:


  1. Open Kettle and create a new transformation
  2. Drag and drop a Get System Info step on the canvas. You can find it in the Input folder.
  3. Double click on it and populate the names column in the grid with start_date and end_date.
  4. For the type choose start date range (Transformation) and end date range (Transformation) respectively

Click on Preview rows and you will see that Kettle displays a rather old datetime for the startdate (1899/12/31 23:00:00) and the current datetime for the enddate. Now this makes perfectly sense as with first run of your transfromation you want to get all the raw data from your source system. In case you don’t want this, I’ll show you a trick later on on how to change this.
Now this is an easy approach for CDC. You can feed the start and end date from the Get System Info step to a Table Input step in example and use the start and end date in the WHERE clause of your SQL query:
SELECT 
date 
, product_type 
, sales 
FROM sales 
WHERE 
date>=? AND 
date<? 
;

The question marks will be replaced on execution by the start and end date (but make sure they are defined in this order in the Get System Info step).

Let's add some more steps to our transformation:

Make sure that you enable Replace variables in script? and choose the Get System Info step for Insert data from step. The hop between the Get System Info step and the Table Input step now also displays an info icon. 

Setting up Logging
Kettle provides various levels of logging, the transformation logging being the one with the highest level. We will only look at the transformation logging here. 


  1. Press CTRL+T and the Transformation properties dialog will be displayed. 
  2. Click on the Logging tab and then highlight Transformation on the left hand side. 
  3. Our log will be placed in a database table, hence provide the info for the data connection and table name.
  4. We only want to keep the log records for the last 30 days, hence we set Log record timeout (in days) to 30.
  5. It’s best to keep all the logging fields (less maintenance)
  6. Provide step names for some of the fields, in example:

  • LINES_INPUT: specify the input step that represents the amount of imported data best. 
  • LINES_OUTPUT: specify the output step that represents the amount of imported data best

  1. Press the SQL button and Kettle will automatically generate the DDL (the create table statement) for you. Just press Execute and your logging table will be created on your specified database. Nice and easy!


So now we are all set and can run our transformation and see what’s happening. Click the Play button to execute the transformation. If your transformation executed successfully, close the transformation and open it again, then on the bottom click on the Execution History tab and you will so the logging information. (Kettle automatically reads the data from the table we just created).



Now pay attention to the Date Range Start and Date Range End field: It includes the data from our Get System Info step. Now execute the transformation again and see what happens (Click the Refresh button):



  • You can clearly see that Kettle new choose the CDC end datetime (which is the start datetime) of the last transformation execution for the start date. The last transformation execution has the Batch ID 0 in the screenshot above.
  • The CDC end date is set to the start date of the current transformation execution.

A safe approach to CDC

The CDC approach mentioned above is actually quite dangerous if your source data table doesn't get filled consistently. A safer approach is to use the maximum date of your source data as the CDC end date. Kettle provides this functionality out of the box: In the Transformation properties you can find a Dates tab. Click on it and provide following details of your fact table:


  1. Maxdate Connection: the database connection
  2. Maxdate table: the table name of your source table
  3. Maxdate field: the datetime field in your source table that you use for CDC
  4. Maxdate offset (seconds): If there are some inconsistencies in your source data, you can set a negative offset here. In example: -120 means that 120 seconds will be deducted from the max end date for CDC


As you can see from the screenshot above, I referenced again our main input table. So whatever the maximum date in this table is, Kettle will now use it for CDC. Open MySQL Client and delete all the records from our transformation log table, then start the transformation. As you can see from the Execution History Kettle now use the maximum date of our data set.



What more to say? Well, if you are unhappy with the first timeframe that Kettle chooses, just run the transformation once without any raw data input step, open a SQL client and update the end date with a date of your liking (this end date should be the start date that you want for the next execution). The next time you run your transformation, this datetime will be used for the start of your timeframe. 

Saturday, January 1, 2011

Pentaho Data Integration and Infinidb Series: Bulk Upload


Pentaho Data Integration and InfiniDB Series: Bulk Upload

Introduction

Calpont InfiniDB is one of the more popular column oriented databases. If you are not familiar with the concept of column oriented databases, I suggest visiting InfiniDB.org for a good overview. InfiniDB is available as an open source version as well as in a paid for enterprise edition. 

This is the first one is short series of articles on how to use Pentaho Data Integration (Kettle) with InfiniDB. A column oriented database is one of the main building blocks of a BI solution. In the last few years Kettle has become one of the most popular open source ETL tools. Currently there is no dedicated step in Kettle that allows the direct export of data into InfiniDB, but this doesn't mean that it is difficult to achieve. This article will show the fairly easy process of setting up such a solution. The article assumes that you are familiar with Kettle, Linux and InfiniDB.

Imagine that we have to load a data warehouse on an hourly basis. Our data warehouse has 3 fact tables that we want to populate using the InfiniDB bulk loader.

Our Kettle job will look like this:

  1. Start Job step
  2. A standard transformation with the results exported to a pipe separated text files (export happens within the transformation)
  3. Check if file exists
  4. Create InfiniDB Job file using the colxml utility
  5. Run bulk upload using the cpimport utility

I will not go into much detail about the 2nd step. For the purpose of this exercise I only created quite a simple transformation.

Prepare Tables

First off, let's create a special database on InfiniDB called dwh_test with three tables:

mysql> CREATE DATABASE dwh_test;
Query OK, 1 row affected (0.00 sec)

mysql> CREATE TABLE dwh_test.fact_impression (`count` INT(255)) ENGINE=infinidb;
Query OK, 0 rows affected (2.82 sec)

mysql> CREATE TABLE dwh_test.fact_click (`count` INT(255)) ENGINE=infinidb;
Query OK, 0 rows affected (1.09 sec)

mysql> CREATE TABLE dwh_test.fact_advert_event (`count` INT(255)) ENGINE=infinidb;
Query OK, 0 rows affected (0.37 sec)

For the following examples, please find the files here:


  • InfiniDB bulk load job (using mainly Kettle steps): download
  • InfiniDB bulk load job (using shell): download
  • Sample transformation: download

Export to text files 
Imagine that our job is run on an hourly level. The transformation exports three | (pipe) separated text files into the /usr/local/Calpont/data/bulk/data/import/ directory, which is the InfiniDB default bulk import directory. Infinidb accepts pipe separated text files per default, but you are free to use other separators as well.

For our ETL process, it is not always guaranteed that there is data for every run, which means, there is a possibility that there is no output data at all. This is something we have to keep in mind. Hence, one thing we can do in our main transformation is is to use a Switch/Case step to figure out if we have data in the stream (Note: There are other ways to do this check as well):


  • We have some sort of input etc, which we will not discuss here.
  • After the Group By step we add a Switch/Case step and create a hub from the first one to the second one
  • Add a Dummy step and a Text file output step
  • Create a hub from the Switch/Case step to the Dummy step
  • Create a hub from the Switch/Case step to the Text file output step
  • Now double click on the Switch/Case step and fill it out:

  • Set filed name to switch to the count field (but in theory it can be any field, as long as you know that it is definitely populate when data is available)
  • We add only one case leaving the value empty and setting the target step to a Dummy step. Leaving the value empty means that the field has to be NULL. So in case there are no records in the stream, the stream will be directed to the Dummy step.
  • The default target step is the Text file output step

  • Double click the Text file output:

  • Set the filename to /usr/local/Calpont/data/bulk/data/import/<tablename>. Replace <tablename> with the actual table name.
  • Set Extension to tbl
  • Click on the Content tab:

  • Set Separator to |
  • Leave the Enclosure field empty
  • Tick Enable the enclosure fix?
  • Untick Header
  • Set Format to Unix
  • Set Encoding to UTF-8. Note: InfiniDB only accepts UTF-8!

  • Fields tab: If your fields are not in the same order as the database columns, make sure you bring them now into the right order.

Find below a screenshot of the extremely simplified transformation:
Please find below two different solutions: The first one ("Using mainly Kettle steps") tries to solve most tasks in Kettle itself, whereas the second one makes a bit more use of shell script. It's down to you then to decide which one suits your project better.


Using mainly Kettle steps

It is advisable to check if the text output file of our transformation exists or not, otherwise Infinidb will throw an error. Our transformation should only create a file if there is data available.

The easiest approach is to use the Check if file exists step. 
This has two advantages: 


  • You don't have to write a shell script and 
  • It is OS independent. 
The disadvantage is that your flow gets quite a bit longer if you are importing more than one file.

The example below is for one file upload process only. 
The general flow is as follows:

  • Check if file exists step
  • Execute a shell script step: Create InfiniDB job file
  • Execute a shell script step: Execute bulk upload

Check if file exists

Create a new job in Kettle and insert 

  1. a Start step
  2. a Transformation step and link it to your main transformation
  3. Add a Check if file exists step to the canvas 
  4. Create hubs between the first three steps
  5. Add two Execute a shell script steps.
  6. Create a "Follow when result is true" hub to the 1st Execute a shell script step
  7. Create a "Follow when result is true" hub from the first Execute a shell script step to the second one.
  8. Add a Delete file step. Create a hub from the 2nd Execute a shell script step to this one.
  9. Double click on the Check if file exists step:

  • Add following file path: /usr/local/Calpont/data/bulk/data/import/fact_impression.tbl

Find below a screenshot of the job:

Setup Infinidb job files

Basically, the colxml utility creates the InfiniDB bulk job file in /usr/local/Calpont/data/bulk/data/job/ for you and accepts the database name ("dwh_test" in our example), a job number ("9993", which you can set to any convenient number) and the table name ("fact_impression") as command line arguments. For additional arguments please reference the Infinidb Admin manual. If you use another separator than the pipe, you have to mention it here as well.

Open the 1st Execute a shell script step; name it Setup Bulk Job

  • Double click on Load Tables and make sure the Insert script is ticked. 
  • Set the Working directory to /usr/local/Calpont/bin/  
  • Click the Script tab and insert following lines:

./colxml dwh_test -j 9993 -t fact_impression

Load Tables

Open the 2nd Execute a shell script step and name it Load Tables; create a hop from the Setup Bulk Job to this one (if you haven't done so already).

  • Double click on Load Tables and make sure Insert script is ticked. 
  • Set the Working directory to /usr/local/Calpont/bin/ 
  • Click the Script tab and insert following lines:

./cpimport -j 9993

cpimport is the Infinidb bulk upload utility. It accepts the job number as a command line argument. For additional arguments please reference the Infinidb Admin manual.

Delete text file

Now we also want to make sure that we clean up everything. 

  • Insert a Delete file step
  • Create a hop from the load Load Tables step to this one.
  • Double click on the Delete file step
  • Insert into the File/Folder cell the following: /usr/local/Calpont/data/bulk/data/import/fact_impression.tbl

Round up

Save the job and transformation, execute the job and check the InfiniDB table. In my case, everything happens on the command line on EC2:

[xxxx@ip-xxxx data-integration] nohup ./kitchen.sh -file="../my-files/testing/infinidb_bulk_upload_example/jb_infinidb_bulk_upload_example.kjb" -Level=Basic &
[xxxx@ip-xxxx data-integration] idbmysql
mysql> USE dwh_test;
Database changed
mysql> SELECT * FROM fact_impression;
+-------+
| count |
+-------+
|     3 |
+-------+
1 row in set (0.11 sec)

Note: To further improve this example, you should add a "Delete Files" step to the beginning of your job to delete any existing InfiniDB job files. 

Using the power of shell scripts

Setup Infinidb job files

First off, create and test a standard shell file. You can create it in any convenient folder. Depending on your Linux distribution, the syntax may vary. The example below is for RedHat Linux.

vi test.sh

Press i and insert following lines:

#!/bin/sh

[ -f /usr/local/Calpont/data/bulk/data/import/fact_advert_event.tbl ] && echo "File exists" || echo "File does not exist"

Press ESC :wq to write and close the file

Make the file executable:
chmod a+x test.sh

Run the shell script:
./test.sh

If you get an error back, try to fix it, otherwise we are ready to go ahead.

Create a new job in Kettle and insert 

  1. a Start step
  2. a Transformation step and link it to your main transformation
  3. an Execute a shell script step; name it Setup Bulk Job
  4. Create a hub from the Start step to the Transformation step and another one from the Transformation step to the Execute a shell script step

Next up:

  • Double click on Load Tables and make sure Insert script is ticked. 
  • Set the Working directory to /usr/local/Calpont/bin/  
  • Click the Script tab and insert following lines:

We slightly change the script now and include a check for all our files: We create a separate Infinidb job for each table import so that we can do some easy checking.

Basically, the colxml utility creates the bulk job file in /usr/local/Calpont/data/bulk/data/job/ for you and accepts the database name ("dwh_test" in our example), a job number ("9991", which you can set to any convenient number) and the table name ("fact_advert_event") as command line arguments. For additional arguments please reference the Infinidb Admin manual.

Before we request the new job files, we also remove all existing ones (in case there is no data, no text file will exist, hence no new job file would be created with our condition, but there might be still an old job file in the directory). 

#!/bin/sh
rm -rf  /usr/local/Calpont/data/bulk/job/Job_9991.xml
rm -rf  /usr/local/Calpont/data/bulk/job/Job_9992.xml
rm -rf  /usr/local/Calpont/data/bulk/job/Job_9993.xml

[ -f /usr/local/Calpont/data/bulk/data/import/fact_advert_event.tbl ] && ./colxml dwh_test -j 9991 -t fact_advert_event
[ -f /usr/local/Calpont/data/bulk/data/import/fact_advert_click.tbl ] && ./colxml dwh_test -j 9992 -t fact_click
[ -f /usr/local/Calpont/data/bulk/data/import/fact_advert_impression.tbl ] && ./colxml dwh_test -j 9993 -t fact_impression

Load Tables

Insert another Execute a shell script and name it Load Tables; create a hop from the Setup Bulk Job to this one.

  • Double click on Load Tables and make sure the Insert script is ticked. 
  • Set the Working directory to /usr/local/Calpont/bin/  
  • Click the Script tab and insert following lines:

#!/bin/sh
rm -rf  /usr/local/Calpont/data/bulk/job/Job_9991.xml
rm -rf  /usr/local/Calpont/data/bulk/job/Job_9992.xml
rm -rf  /usr/local/Calpont/data/bulk/job/Job_9993.xml

[ -f /usr/local/Calpont/data/bulk/data/import/fact_advert_event.tbl ] && ./colxml dwh_test -j 9991 -t fact_advert_event
[ -f /usr/local/Calpont/data/bulk/data/import/fact_advert_click.tbl ] && ./colxml dwh_test -j 9992 -t fact_click
[ -f /usr/local/Calpont/data/bulk/data/import/fact_advert_impression.tbl ] && ./colxml dwh_test -j 9993 -t fact_impression

cpimport is the Infinidb bulk load utility. It accepts the job number as a command line argument.  For additional arguments please reference the Infinidb Admin manual.

Delete text files

Now we also want to make sure that we clean up everything. 

  • Insert a Delete files step
  • Create a hop from the load Load Tables step to this one.
  • Double click on the Delete files step
  • Insert into the File/Folder cell the following: /usr/local/Calpont/data/bulk/data/import
  • and into the Wildcard (RegEx) cell: ^.+\.tbl$
This will delete any files ending in .tbl from this directory.

Find below a screenshot of the job:

Round up

Save the job and transformation, execute the job and check the InfiniDB table. In my case, everything happens on the command line on EC2:

[xxxx@ip-xxxx data-integration] nohup ./kitchen.sh -file="../my-files/testing/infinidb_bulk_upload_example/jb_infinidb_bulk_upload_example_using_shell.kjb" -Level=Basic &
[xxxx@ip-xxxx data-integration] idbmysql
mysql> USE dwh_test;
Database changed
mysql> SELECT * FROM fact_impression;
+-------+
| count |
+-------+
|     3 |
+-------+
1 row in set (0.11 sec)

If things go wrong ... Error log is your friend

InfiniDB provides quite good error logging. You will find a short error description in the Kettle log (make sure you output the basic log data to a file).
Have a look at following directories to find the detailed InfiniDB log files:

/usr/local/Calpont/data/bulk/log:

  • errors can be found in job_#.err
  • successful jobs will be logged in job_#.log

/usr/local/Calpont/data/bulk/data/import:

  • If the data of the import file does not match the table definition, then a file tablename.tbl.bad will be created. 

Files and references

Please find the example files here:
InfiniDB bulk load job (using mainly Kettle steps): download
InfiniDB bulk load job (using shell): download
Sample transformation: download

Some information for this article was taken from the Calpont Infinidb manuals as well as from the Pentaho forums.