Wednesday, March 20, 2013

Partitioning data on clustered Pentaho Kettle ETL transformations


This is the second article on clustering ETL transformations with Pentaho Kettle (Pentaho Data Integration). It is highly recommended that you read the first article Creating a clustered transformation in Pentaho Kettle before continuing with this one. Make sure that the slave and master servers are running and the cluster schema is defined - as outlined in the first article.

Prerequisites:

  • Current version of PDI installed.
  • Download the sample transformations from here.

How to create a partitioning schema

Create a new transformation (or open an existing one). Click on the View tab on the left hand side and right click on Partition schemas. Choose New:
In our case we want to define a dynamic schema. Tick Dynamically create the schema definition and set the Number of partitions by slave server to 1:

How to assign the partition schema

Right click on the step that you want to assign the partition schema to and choose Partitioning.
You will be given following options:
For our purposes we want to choose Remainder of division. In the next dialog choose the partitioning schema you created earlier on:
Next specify which field should be used for partitioning. In our case this is the city field:
That’s it. Now partitioning will be dynamically applied to this step.

Why apply data partitioning on distributed ETL transformation?

As we have 2 slave servers running (setup instructions can be find in the first article), the data will be dynamically partitioned into 2 sets based on the city field. So even if we do an aggregation on the slave servers, we will derive a clean output set on the server. To be more precise: If we don’t use partitioning in our transformation, each slave server would received data in a round robin fashion (randomly), so each data set could contain records for New York in example. Each slave creates an aggregate and when we combine the data on the master we can possibly end up we two aggregates for New York. This would then require an additional sort and aggregation step on the master to arrive at a final clean aggregate. To avoid this kind of scenario, it is best to define data partitioning, so that each slave server receives a “unique” set of data. Note, this is just one reason why you should apply partitioning.

No partitioning schema applied:
With partitioning schema applied:
Notice the difference between the two output datasets!

Also note the additional red icon [Dx1] in the above screenshot of the transformation. This indicates that a partitioning schema is applied to this particular step.

At the end of this second article I hope that you got a good overview of the Pentaho Kettle clustering and partitioning features which are very useful when you are dealing with a lot of data. My special thanks go to Matt and Slawo for shedding some light into this very interesting functionality.

Creating a clustered transformation in Pentaho Kettle


Prerequisites:



  • Current version of PDI installed.
  • Download the sample transformations from here.


Navigate to the PDI root directory. Let’s start three local carte instances for testing (Make sure these ports are not in use beforehand):

sh carte.sh localhost 8077

sh carte.sh localhost 8078
sh carte.sh localhost 8079

In PDI Spoon create a new transformation.

Click on the View tab on the left hand side and right click on Slave server and choose New. Add the Carte servers we started earlier on one by one and define one as the slave server. Note the default carte user is cluster and the default password is cluster.
Next right click on Kettle cluster schemas and choose New.
Provide a Schema name  and then click on Select slave servers. Mark all of them in the pop-up window and select OK.
Next we want to make sure that Kettle can connect to all of the carte servers. Right click on the cluster schema you just created and choose Monitor all slave servers:
For each of the servers Spoon will open a monitoring tab/window. Check the log in each monitoring window for error messages. 

Additional info: Dynamic clusters
If the slave servers are not all known upfront, can be added or removed at any time,  Kettle offers as well a dynamic cluster schema. A typical use case is when running a cluster in the cloud. With this option you can also define several slave servers for failover purposes. Take a look at the details on the Pentaho Wiki.

If Kettle can connect to all of them without problems, proceed as follows:

How to define clustering for a step

Add a Text input step for example.
Right click on the Text input step and choose Clustering.
In the Cluster schema dialog choose the cluster schema you created earlier on:
Click OK.
Note that the Text input step has a clustering indicator now:
Note: Only the steps that you assign the cluster schema this way will be run on the slave servers. All other ones will be run on the master server.

Our input dataset:

Creating swimlanes

In this example we will be reading the CSV files directly from the slave servers. All the steps will be executed on the slaves (as indicated by the Cx2). 

To run the transformation on our local test environment, click the execute button and choose Execute clustered:

The last option Show transformations is not necessary for running the transformation, but helps to understand how Kettle creates individual transformations for your slave servers and master server in the background.

As we test this locally, the results will be read from the same file twice (we have two slave servers running locally and one master server) and will be output to the same file, hence we see the summary twice in the same file:


Debugging: Observer the logs of the slave and master servers as the main transformation log in Spoon (v4.4) doesn’t seem to provide you an error logs/messages in clustered execution. So always monitor the server logs while debugging!
Preview: If you perform preview on a step, a standard (non-clustered) transformation will be run.


Summarizing all data on the master

Now we will change the transformation so that the last 3 steps run on the master (notice that these steps do not have a clustering indicator):
If we execute the transformation now, the result looks like this:
So as we expect, all the data from all the slaves is summarized on the master.

Importing data from the master

Not in all cases will the input data reside on the slave servers, hence we will explore a way to input the data from the master:

Note that in this case only the Dummy step runs on the slave server.

Here is the output file:
So what happens is that the file will be input the data on the master, records will be distributed to the dummy steps running on the slave server and then aggregated on the master again.

My special thanks go to Matt and Slawo for shedding some light into this very interesting functionality.

Thursday, March 7, 2013

Pentaho Kettle (PDI): Get Pan and Kitchen Exit Code

Various monitoring applications require the exit code/status of a process as an input.

A simple example (test1.sh):

#!/bin/bash
echo "Hi"
exit $?

Let’s run it:
$ ./test1.sh
Let’s check the exit status (of the last command) which can be accessed via $?:
$ echo $?
0

Let’s take a look at how we can get the exit status from Pan and Kitchen:

For demonstration purposes we create a very simple dummy transformation which just outputs some data to the log:
Now create a shell file:
#!/bin/bash
/opt/pentaho/pdi/pdi-ce-4.4.0-stable/pan.sh -file='/home/dsteiner/Dropbox/pentaho/Examples/PDI/exit_code/tr_dummy.ktr' -Level=Basic > /home/dsteiner/Dropbox/pentaho/Examples/PDI/exit_code/err.log
echo $?

Note the echo $? in the last line which will return the exit status. This is for demonstration purposes here only. Normally you would use exit $? instead.

On Windows use instead:
echo %ERRORLEVEL%

Now lets run the shell script:
The exit status tells us that the transformation was executed successfully.

Next we will introduce an error into the transformation. I just add a formula step with a wrong formula:
We run the shell script again and this time we get a return code other than 0:
Any return code other than 0 means it is an error.

Please find below an overview of all the return codes (src1, src2):

Error Code
Description
0
The job ran without a problem
1
Errors occurred during processing
2
An unexpected error occurred during loading / running of the job / transformation, an error in the XML format, reading the file, problems with the repository connection, ...
3
unable to connect to a database, open a file or other initialization error.
7
The job / transformation couldn't be loaded from XML or the Repository
8
Error loading job entries or steps or plugins (error in loading one of the plugins mostly).one of the plugins in the plugins/ folder is not written correctly or is incompatible. You should never see this anymore though. If you do it's going to be an installation problem with Kettle.
9
Command line usage printing