ExampleOperator example

The ExampleOperator operator is derived from APT_Operator and uses no arguments.

The ExampleOperator operator changes the contents of its two record fields and then writes each record to a single output data set.

Figure 1. Interface schema for the ExampleOperator operatorInterface schema for the ExampleOperator operator
The following table contains the code for the ExampleOperator. Comments follow the table.
Table 1. APT_Operator Derivation With No Arguments
Comment Code
1
#include <apt_framework/orchestrate.h>
2
class ExampleOperator : public APT_Operator
{
4
APT_DECLARE_RTTI(ExampleOperator);
5
APT_DECLARE_PERSISTENT(ExampleOperator);
public:
ExampleOperator();
9

11
protected:
	virtual APT_Status describeOperator();
	virtual APT_Status runLocally();
	virtual APT_Status initializeFromArgs_(const APT_PropertyList &args, 
          APT_Operator::InitializeContext context);
 };
12
#define ARGS_DESC "{}" 
13
APT_DEFINE_OSH_NAME(ExampleOperator, exampleOp, ARGS_DESC); 
14
15
APT_IMPLEMENT_RTTI_ONEBASE(ExampleOperator, APT_Operator); 
APT_IMPLEMENT_PERSISTENT(ExampleOperator);
16
ExampleOperator::ExampleOperator()
{}
18
APT_Status ExampleOperator::initializeFromArgs_(const 
     APT_PropertyList &args, APT_Operator::InitializeContext 
        context) 
{ 
	return APT_StatusOk; 
}
22
void ExampleOperator::serialize(APT_Archive& archive, APT_UInt8)
{}
24
APT_Status ExampleOperator::describeOperator()
{
	setKind(APT_Operator::eParallel);
	setInputDataSets(1);
	setOutputDataSets(1);
	setInputInterfaceSchema("record (iField:int32; sField:string)", 0);
	setOutputInterfaceSchema("record (iField:int32; sField:string)",0);
	return APT_StatusOk;
}
33
APT_Status ExampleOperator::runLocally()
{
	APT_InputCursor inCur;
	APT_OutputCursor outCur;
	setupInputCursor(&inCur, 0); 
	setupOutputCursor(&outCur, 0); 
	APT_InputAccessorToInt32 iFieldInAcc("iField", &inCur); 
	APT_InputAccessorToString sFieldInAcc("sField", &inCur); 
	APT_OutputAccessorToInt32 iFieldOutAcc("iField", &outCur);
	APT_OutputAccessorToString sFieldOutAcc("sField", &outCur);
	while (inCur.getRecord())
	{
		cout << "*sFieldInAcc =" << *sFieldInAcc << endl;
  		cout << "*iFieldInAcc =" << *iFieldInAcc << endl;
		*sFieldOutAcc = "XXXXX" + *sFieldInAcc;
		*iFieldOutAcc = *iFieldInAcc + 100;
		cout << "*sFieldOutAcc =" << *sFieldOutAcc << endl;
		cout << "*iFieldOutAcc =" << *iFieldOutAcc << endl;
		outCur.putRecord();
	}

	return APT_StatusOk;
}
Comments
1
Include the orchestrate.h header file.
2
All operators are derived, directly or indirectly, from APT_Operator.
4
Use the required macro, APT_DECLARE_RTTI, to declare runtime type information for the operator.
5
Use the required macro, APT_DECLARE_PERSISTENT, to declare object persistence for operator objects transmitted to the processing nodes.
9-11
You must override the virtual function initializeFromArgs_ and the two pure virtual functions, describeOperator() and runLocally(). Overrides are in this example.
12
See the header file, install_directory/Server/PXEngine/include/apt_util/argvcheck.h, for documentation on the ARGS_DESC string.
13
Use APT_DEFINE_OSH_NAME to connect the class name to the name used to invoke the operator from osh, and pass your argument description string to DataStage.
14-15
APT_IMPLEMENT_RTTI_ONEBASE and APT_IMPLEMENT_PERSISTENT are required macros that implement runtime type information and persistent object processing.
16
ExampleOperator::ExampleOperator() defines a default constructor for the operator. All operators must have a public default constructor, even if the constructor is empty.
18
Use the override of initializeFromArgs_() to transfer information from the arguments to the class instance. Because there are no arguments for this example, initializeFromArgs_() simply returns APT_StatusOk.
22
The function serialize() defines complex persistence in a derived class. The serialization operators operator||, operator<<, and operator>> are declared and defined by macros in terms of APT_Persistent::serialize(). There are no data variables in ExampleOperator; therefore, serialize() is empty. When there are data variables, as in the HelloWorldOp example, each variable has it own line. For example: archive2 || member_variable5.
24
Use the override of describeOperator() to describe the configuration information for the operator. This example implementation specifies that:
  • The operator is run in parallel. Parallel mode is the default; therefore it is not necessary to explicitly set parallel mode with setKind().
  • There is one input data set and one output data set.
  • The input and output schemas are as defined by setInputInterfaceSchema() and setOutputInterfaceSchema().
  • The data partitioning method is random.
33
With your override of runLocally(), you define what the operator does at run time. In this example implementation, the lines APT_InputCursor inCur and APT_OutputCursor outCur declare the input and out cursors; and the functions setupInputCursor() and setupOutputCursor() initialize them. The APT_InputAccessorTotype() and APT_OutputAccessorTotype() functions set up the input and output field accessors. The example while statement loops over all the records, and does these tasks for each record:
  • Prints out the initial contents of the two fields in the record.
  • Changes the contents of the fields and writes the new contents to the output record.
  • Prints out the new contents of the fields.
  • Writes the record to output.