Sunday, December 12, 2010

Be careful with running multiple step copies in Pentaho Kettle

Be careful with running multiple step copies in Pentaho Kettle

In this review we have a look at running multiple copies of one step in Kettle (Pentaho Data Integration). If your computer has more than one core, Kettle can use the power by running multiple copies of a given step in parallel. Each copy of one step uses one core. This is an extremely powerful feature, but it should be used with care. 
It is important to consider that you cannot just thoughtlessly apply x amounts of copies to any given step. You always have to keep in mind what this step actually does.

I prepared here an example which will demonstrate you how you can run into troubles if you don't pay attention. Note: This is not a Kettle error, but a user error. 

You can download the sample transformation here.

Scenario 1: This simple example uses a data grid input step. The data is denormalised and then joined to an additional data set and finally we create a summary.

Scenario 2: We use exactly the same process again, only now we increase the amount of copies for the denormaliser step to 3.You can change the amount of copies to run in parallel by right clicking on the step and choosing "Change number of copies to start ...". All this does is use the definition of your step and run multiple copies of this step in parallel (highlighted by x3 on the top left corner of the step). 

Screenshot of our transformation:

Output of the denormaliser step scenario 1:

Output of the denormaliser step scenario 2 (running 3 copies of the denormaliser step):

Output of the Group by step scenario 1:

Output of the Group by step scenario 2:

As you can see, there is a huge difference in the total amount of additional revenue in scenario 2:
The aggregation of our original sales records works fine, but as you can see in the preview the additional sales figures are 3 times as much as they should be, which is due to the 3 copies of the "Row denormaliser 2". This is down to the fact that we forgot to aggregate the output of this step.

So why did this happen? How does it work when you run multiple copies of a step?

Basically Kettle distributes rows in a round-robin fashion to each copy of the step, so in our example (running 3 copies of one step) the first row will go to the first step, the 2nd row to the 2nd step, the 3rd row to the 3rd step, the 4th row to the 1st step and so on. 

What is the correct approach to create this transformation using multiple copies?

After the denormalise step, we add a group by step to summarize the data by date:
Now the output of the Group by step looks fine:

Note: If you placed the Join step directly after the denormaliser step (set to multiple copies), Kettle would show a warning message, indicating that you have to summarize your data before the join. Sometimes you will have some additional steps between the step that your run in multiple copies and a join, hence no warning message is displayed. 

After applying multiple copies to one step I strongly suggest that you make use of the preview function to analyze how your data set looks.

You can download the transformation here.


  1. Good to know! Was just about to start utilizing this feature, so it came at a good time =p

  2. Diethard
    Multiple step copies is an interesting topic indeed.

    For some reason your google docs links don't work, I think you might need to change the Share settings to make the file accessible.

    I was wondering if you really need multiple CPU cores to take advantage of parallelism here. I am thinking that even on a single core processor machine, if a step has built-in latency, e.g. a database lookup or something, then perhaps there will be idle time which allows another copy of a step some CPU time. I haven't tested this but I suspect that a multiple step copies approach may still be beneficial in some scenarios, even if a machine has a single core.

    What do you think?

  3. Thanks a lot Tony for pointing this out. The files should be available now.
    Each copy that you define uses one core of your processor as far as I know. You can see this by looking a graphical representation of the load of each processor (your OS should have a GUI for this).

  4. I have 10000 records coming to my stream and they hit a "HTTP Client" step to get data from an API (the result of which I split into individual values with a "JSON Input" step).

    Anyway, I have set the "number of copies to start" on the HTTP Client step to 10.

    Does this mean that the 10000 incoming records will be divided amongst 10 parallel instances of the HTTP Client's get requests?

    I am trying to performance enhance and pinpoint if the slowpoint is my ETL or the API being called. :)