org.knime.base.node.parallel
Class AbstractParallelNodeModel

java.lang.Object
  extended by org.knime.core.node.NodeModel
      extended by org.knime.base.node.parallel.AbstractParallelNodeModel
Direct Known Subclasses:
ThreadedNodeModel

public abstract class AbstractParallelNodeModel
extends NodeModel

This class is an extension of a normal NodeModel that offers parallel processing of DataTables. Therefore the executeByChunk( BufferedDataTable, BufferedDataTable[], RowAppender[], ExecutionMonitor) method must be overriden. This method is called with a DataTable containing only a part of the input rows as often as necessary. A default value for the maximal chunk size (i.e. the number of rows in the chunked data table) is given in the constructor.
If the node has more than one input table only the first input table is chunked, the remaining ones are passed to executeByChunk( BufferedDataTable, BufferedDataTable[], RowAppender[], ExecutionMonitor) completely.

Author:
Thorsten Meinl, University of Konstanz

Field Summary
protected  ThreadPool m_workers
          The execution service that is used.
 
Constructor Summary
AbstractParallelNodeModel(int nrDataIns, int nrDataOuts, int chunkSize, ThreadPool workers)
          Creates a new AbstractParallelNodeModel.
 
Method Summary
protected  BufferedDataTable[] execute(BufferedDataTable[] data, ExecutionContext exec)
          This function is invoked by the Node#executeNode() method of the node (through the #executeModel(BufferedDataTable[],ExecutionMonitor) method)only after all predecessor nodes have been successfully executed and all data is therefore available at the input ports.
protected abstract  void executeByChunk(BufferedDataTable inDataChunk, BufferedDataTable[] additionalData, RowAppender[] outputTables, ExecutionMonitor exec)
          This method is called as often as necessary by multiple threads.
 int getChunkSize()
          Returns the current chunk size.
protected abstract  DataTableSpec[] prepareExecute(DataTable[] data)
          This method is called before the first chunked is processed.
 void setChunkSize(int newValue)
          Sets the chunk size of the split data tables.
 
Methods inherited from class org.knime.core.node.NodeModel
addWarningListener, configure, configure, continueLoop, execute, executeModel, getInHiLiteHandler, getLoopEndNode, getLoopStartNode, getNrInPorts, getNrOutPorts, getOutHiLiteHandler, getWarningMessage, loadInternals, loadValidatedSettingsFrom, notifyViews, notifyWarningListeners, peekFlowVariableDouble, peekFlowVariableInt, peekFlowVariableString, peekScopeVariableDouble, peekScopeVariableInt, peekScopeVariableString, pushFlowVariableDouble, pushFlowVariableInt, pushFlowVariableString, pushScopeVariableDouble, pushScopeVariableInt, pushScopeVariableString, removeWarningListener, reset, saveInternals, saveSettingsTo, setInHiLiteHandler, setWarningMessage, stateChanged, validateSettings
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

m_workers

protected final ThreadPool m_workers
The execution service that is used.

Constructor Detail

AbstractParallelNodeModel

public AbstractParallelNodeModel(int nrDataIns,
                                 int nrDataOuts,
                                 int chunkSize,
                                 ThreadPool workers)
Creates a new AbstractParallelNodeModel.

Parameters:
nrDataIns - The number of DataTable elements expected as inputs.
nrDataOuts - The number of DataTable objects expected at the output.
chunkSize - the default number of rows in the DataTables that are passed to executeByChunk( BufferedDataTable, BufferedDataTable[], RowAppender[], ExecutionMonitor)
workers - a thread pool where threads for processing the chunks are taken from
Method Detail

prepareExecute

protected abstract DataTableSpec[] prepareExecute(DataTable[] data)
                                           throws Exception
This method is called before the first chunked is processed. The method must return the data table specification(s) for the result table(s) because the RowAppender passed to executeByChunk( BufferedDataTable, BufferedDataTable[], RowAppender[], ExecutionMonitor) must be constructed accordingly.

Parameters:
data - the input data tables
Returns:
the table spec(s) of the result table(s) in the right order. The result and none of the table specs must be null!
Throws:
Exception - if something goes wrong during preparation

execute

protected final BufferedDataTable[] execute(BufferedDataTable[] data,
                                            ExecutionContext exec)
                                     throws Exception
This function is invoked by the Node#executeNode() method of the node (through the #executeModel(BufferedDataTable[],ExecutionMonitor) method)only after all predecessor nodes have been successfully executed and all data is therefore available at the input ports. Implement this function with your task in the derived model.

The input data is available in the given array argument inData and is ensured to be neither null nor contain null elements.

In order to create output data, you need to create objects of class BufferedDataTable. Use the execution context argument to create BufferedDataTable.

Overrides:
execute in class NodeModel
Parameters:
data - An array holding DataTable elements, one for each input.
exec - The execution monitor for this execute method. It provides us with means to create new BufferedDataTable. Additionally, it should be asked frequently if the execution should be interrupted and throws an exception then. This exception might me caught, and then after closing all data streams, been thrown again. Also, if you can tell the progress of your task, just set it in this monitor.
Returns:
An array of non- null DataTable elements with the size of the number of outputs. The result of this execution.
Throws:
Exception - If you must fail the execution. Try to provide a meaningful error message in the exception as it will be displayed to the user.Please be advised to check frequently the canceled status by invoking ExecutionMonitor#checkCanceled which will throw an CanceledExcecutionException and abort the execution.

executeByChunk

protected abstract void executeByChunk(BufferedDataTable inDataChunk,
                                       BufferedDataTable[] additionalData,
                                       RowAppender[] outputTables,
                                       ExecutionMonitor exec)
                                throws Exception
This method is called as often as necessary by multiple threads. The inData-table will contain at most maxChunkSize rows from the the first table in the array passed to execute(BufferedDataTable[], ExecutionContext), the additionalData-tables are passed completely.

Parameters:
inDataChunk - the chunked input data table
additionalData - the complete tables of additional data
outputTables - data containers for the output tables where the computed rows must be added
exec - an execution monitor which is actually a subprogress monitor
Throws:
Exception - if an exception occurs

setChunkSize

public void setChunkSize(int newValue)
Sets the chunk size of the split data tables.

Parameters:
newValue - the new value which is number of rows

getChunkSize

public int getChunkSize()
Returns the current chunk size.

Returns:
the chunk size in number of rows


Copyright, 2003 - 2010. All rights reserved.
University of Konstanz, Germany.
Chair for Bioinformatics and Information Mining, Prof. Dr. Michael R. Berthold.
You may not modify, publish, transmit, transfer or sell, reproduce, create derivative works from, distribute, perform, display, or in any way exploit any of the content, in whole or in part, except as otherwise expressly permitted in writing by the copyright owner or as specified in the license file distributed with this product.