Creating an input node in Java

An input node is used to receive a message into a message flow, typically from a source that is not supported by the built-in input nodes.

Before you begin

About this task

A Java user-defined node is distributed as a .jar file.

Creating a Java project

About this task

Before you can create Java nodes in the IBM® Integration Toolkit, you must create a Java project:

Procedure

  1. Click File > New > Project, select Java Project then click Next.
  2. Give the project a name, then click Next.
  3. On the Java Settings pane, select the Libraries tab, and click Add External JARs.
  4. Select install_dir\server\classes\jplugin2.jar, where install_dir is the home directory of your IBM Integration Bus installation.
  5. Follow the prompts on the other tabs to define any other build settings.
  6. Click Finish.

What to do next

You can now develop the source for your Java node in this project.

Declaring the input node class

About this task

Every class that implements MbInputNodeInterface and is contained in the integration node LIL path is registered with the integration node as an input node. When you implement MbInputNodeInterface, you must also implement a run method for this class. The run method represents the start of the message flow, contains the data that formulates the message, and propagates it down the flow. The integration node calls the run method when threads become available in accordance with your specified threading model.

The class name must end with the word "Node". For example, if the name is BasicInput in the IBM Integration Toolkit, the class name must be BasicInputNode.

For example, to declare the input node class:

package com.ibm.jplugins;

import com.ibm.broker.plugin.*;

public class BasicInputNode extends MbInputNode implements MbInputNodeInterface
{
...

Follow these steps to complete this action in the IBM Integration Toolkit:

Procedure

  1. Click File > New > Other, select Class, then click Next.
  2. Set the package and class name fields to appropriate values.
  3. Delete the text in the Superclass text field and click the Browse button.
  4. Select MbInputNode.
  5. Click the Add button next to Interfaces text field, and select MbInputNodeInterface.
  6. Click Finish.

Defining the node constructor

About this task

When the node is instantiated, the constructor of the node class is called. This class is where you create the terminals of the node, and initialize the default values for the attributes.

An input node has a number of output terminals associated with it, but does not typically have any input terminals. Use the createOutputTerminal method to add output terminals to a node when the node is instantiated. For example, to create a node with three output terminals:

public BasicInputNode() throws MbException
{
	createOutputTerminal ("out");
	createOutputTerminal ("failure");
	createOutputTerminal ("catch");
	setAttribute ("firstParserClassName","myParser");
	attributeVariable  = "none"; 
}

Receiving external data into a buffer

About this task

An input node can receive data from any type of external source, such as a file system, a queue, or a database, in the same way as all other Java programs, if the output from the node is in the correct format.

Provide an input buffer (or bit stream) to contain input data, and associate it with a message object. Create a message from a byte array by using the createMessage method of the MbInputNode class, and then generate a valid message assembly from this message. For example, to read the input data from a file:

Procedure

  1. Create an input stream to read from the file:
    FileInputStream inputStream = new FileInputStream("myfile.msg");
  2. Create a byte array the size of the input file:
    byte[] buffer = new byte[inputStream.available()];
  3. Read from the file into the byte array:
    inputStream.read(buffer);
  4. Close the input stream:
    inputStream.close();
  5. Create a message to put on the queue:
    MbMessage msg = createMessage(buffer);
    
  6. Create a message assembly to hold this message:
    msg.finalizeMessage(MbMessage.FINALIZE_VALIDATE);
    MbMessageAssembly newAssembly =
         new MbMessageAssembly(assembly, msg);
    

Propagating the message

About this task

After creating a message assembly, you can propagate it to one of the output terminals that are defined on the node.

For example, to propagate the message assembly to the terminal named out:
MbOutputTerminal out = getOutputTerminal("out");
out.propagate(newAssembly);
To delete the message:
msg.clearMessage();

To clear the memory that is allocated for the message tree, call the clearMessage() function within the finally block of try/catch.

Controlling threading and transactionality

About this task

The integration node infrastructure handles transaction issues such as controlling the commit of a WebSphere® MQ or database unit of work when message processing has completed. However, resources modified from within a user-defined node are not necessarily under the transactional control of the integration node.

Each message flow thread is allocated from a pool of threads maintained for each message flow, and starts in the run method.

The user-defined node uses return values to indicate whether a transaction is successful, to control whether transactions are committed or rolled back, and to control when the thread is returned to the pool. The integration node infrastructure catches all unhandled exceptions, and rolls back the transaction.

You determine the behavior of transactions and threads by using the appropriate return value:

MbInputNode.SUCCESS_CONTINUE
The transaction is committed and the integration node calls the run method again by using the same thread.
MbInputNode.SUCCESS_RETURN
The transaction is committed and the thread is returned to the thread pool, assuming that it is not the only thread for this message flow.
MbInputNode.FAILURE_CONTINUE
The transaction is rolled back and the integration node calls the run method again by using the same thread.
MbInputNode.FAILURE_RETURN
The transaction is rolled back and the thread is returned to the thread pool, assuming that it is not the only thread for this message flow.
MbInputNode.TIMEOUT
The run method must not block indefinitely while waiting for input data to arrive. While the flow is blocked by user code, you cannot shut down or reconfigure the integration node. The run method must yield control to the integration node periodically by returning from the run method. If input data is not received after a certain period (for example, 5 seconds), the method must return with the TIMEOUT return code. Assuming that the integration node does not need to reconfigure or shut down, the run method of the input node gets called again immediately.
To create multithreaded message flows, you call the dispatchThread method after a message is created, but before the message is propagated to an output terminal. This action ensures that only one thread is waiting for data while other threads are processing the message. New threads are obtained from the thread pool up to the maximum limit specified by the Additional Instances property of the message flow. For example:
public int run( MbMessageAssembly assembly ) throws MbException
{
  byte[] data = getDataWithTimeout();  // user supplied method
                                       // returns null if timeout
  if( data == null )
    return TIMEOUT;

  MbMessage msg = createMessage( data );
  msg.finalizeMessage( MbMessage.FINALIZE_VALIDATE );
  MbMessageAssembly newAssembly =
       new MbMessageAssembly( assembly, msg );

  dispatchThread();

  getOutputTerminal( "out" ).propagate( newAssembly );

  return SUCCESS_RETURN;
}

Declaring the node name

About this task

You must declare the name of the node for use and identification by the IBM Integration Toolkit. All node names must end with the characters "Node". Declare the name by using the following method:


public static String getNodeName()
{
   return "BasicInputNode";
}

If this method is not declared, the Java API framework creates a default node name by using the following rules:
  • The class name is appended to the package name.
  • The periods are removed, and the first letter of each part of the package and class name is capitalized.
For example, by default, the following class is assigned the node name "ComIbmPluginsamplesBasicInputNode":
package com.ibm.pluginsamples;
public class BasicInputNode extends MbInputNode implements MbInputNodeInterface
{
   ...

Declaring attributes

About this task

Declare node attributes by using the same method that you use for Java bean properties. You are responsible for writing get and set methods for the attributes; the API framework infers the attribute names by using the Java bean introspection rules. For example, if you declare the following two methods:


private String attributeVariable;

public String getFirstAttribute()
{
  return attributeVariable;
}

public void setFirstAttribute(String value)
{
  attributeVariable = value;
}

The integration node infers that this node has an attribute called firstAttribute. This name is derived from the names of the get or set methods, not from the variable names of any internal class members. Attributes can be exposed only as strings, so convert numeric types to and from strings in the get or set methods. For example, the following method defines an attribute called timeInSeconds:

int seconds;

public String getTimeInSeconds()
{
  return Integer.toString(seconds);
}

public void setTimeInSeconds(String value)
{
  seconds = Integer.parseInt(value);
}

Implementing the node functionality

About this task

As already described, the run method is called by the integration node to create the input message. This method must provide all the processing function for the input node.

Overriding default message parser attributes (optional)

About this task

An input node implementation normally determines which message parser initially parses an input message. For example, the built-in MQInput node dictates that an MQMD parser is required to parse the MQMD header. A user-defined input node can select an appropriate header or message parser, and the mode in which the parsing is controlled, by using the following default attributes that are included, which you can override:

rootParserClassName
Defines the name of the root parser that parses message formats supported by the user-defined input node. It defaults to GenericRoot, a supplied root parser that causes the integration node to allocate and chain parsers together. It is unlikely that a node would have to modify this attribute value.
firstParserClassName
Defines the name of the first parser, in what might be a chain of parsers that are responsible for parsing the bit stream. It defaults to XML.
messageDomainProperty
An optional attribute that defines the name of the message parser required to parse the input message. The supported values are the same as the values that are supported by the MQInput node.
messageSetProperty
An optional attribute that defines the message set identifier, or the message set name, in the Message Set field, only if the MRM parser was specified by the messageDomainProperty attribute.
messageTypeProperty
An optional attribute that defines the identifier of the message in the MessageType field, only if the MRM parser was specified by the messageDomainProperty attribute.
messageFormatProperty
An optional attribute that defines the format of the message in the Message Format field, only if the MRM parser was specified by the messageDomainProperty attribute.

Deleting an instance of the node

About this task

An instance of the node is deleted when either:
  • You shut down the integration node.
  • You remove the node or the message flow that contains the node, and redeploy the configuration.
When the node is deleted, it can perform cleanup operations, such as closing sockets, if it implements the optional onDelete method. This method, if present, is called by the integration node just before the node is deleted.

Implement the onDelete method as follows:

public void onDelete()
{
  // perform node cleanup if necessary
}