IBM InfoSphere Streams Version 4.1.0

Toolkit com.ibm.streamsx.dps 2.0.9

SPL standard and specialized toolkits > com.ibm.streamsx.dps 2.0.9

General Information

The Distributed Process Store (DPS) toolkit enables multiple applications running processing elements (PEs) on one or more machines to share application specific state information. The shared information is stored in an external Redis® NoSQL Key-Value (K/V) data store. The toolkit thus allows non-fused SPL, C++ and Java™ operators running on different machines to share information. The toolkit currently officially supports Redis 2.8.2+ and Redis 3. Thus, using the DPS toolkit requires you to have installed and configured a Redis server infrastructure. Please note that the Redis store is an open source offering requiring acceptance of its BSD license. See the Redis home page for information on how to download, install and configure a Redis server or cluster. The toolkit consists of a set of native functions that provide access to shared state, and additional locking functionality to provide safe, concurrent access to the shared data. These functions can be used from anywhere inside a Streams application, whether it be SPL code, SPL functions, SPL native functions, C++ primitive operators. They also have counterparts written in Java that can be used from a Java primitive operator. The actual state information is stored separately in the Redis distributed back-end in-memory store which is transparent to the Streams application.

Conceptual Overview

The following diagram provides a conceptual overview of the interaction between the Streams application, the DPS toolkit, and the Redis distributed back-end in-memory store. Each green box represents a processing element (PE) of the application. These interconnected PEs have access to the DPS logical container as well as full access to the data store APIs. In the logical view layer, code abstraction is done for these four important aspects:
  • Serialization required before storing the user data in the store
  • Deserialization required after fetching the data from the store and just before returning it to the user
  • In-memory database abstraction in order to integrate the Redis store with the Distributed Process Store
  • Platform dependent implementation necessary to communicate with the Redis back-end in-memory store
Any PE, regardless of whether it originally created the store and populated it with its original contents (if any), can safely access the store (shown in the physical view layer) to save and retrieve contents. To access the store, the PE requires a valid store name or a store handle.

As shown in the figure above, an instance of the DPS accommodates one or more user-created stores. Each store can hold an unlimited number of data items stored as key-value pairs created using any SPL type as a key and any SPL type as a value. Additionally, a DPS instance contains a store factory that can manufacture stores on demand. Similarly, any store can be accessed concurrently by any number of PEs running on different machines. In order to ensure safe access (i.e. store operations do not override each other), each DPS instance contains a lock factory enabling you to create sharable locks for the purpose of locking a store during a critical private operation and then releasing the lock once that critical store operation has completed.

Toolkit overview

The functions provided by the DPS toolkit are divided into 3 categories:

  • User created data stores,
  • TTL global store, in which pairs can be configured to expire after a certain time period,
  • Distributed lock API
Note: The toolkit provides the aforementioned functionality as native functions that can be used from C++ and SPL applications and through a fully object oriented Java API. The comments below apply regardless of the programming language being used. See the "Getting Started" section below for an example. The native function documentation and Javadoc have more details on how to use the toolkit within SPL and Java applications respectively.

User Created Stores

A DPS store is a container in which pairs can be created, retrieved from, updated and deleted. The keys and values must be a SPL primitive type, such as rstring or int32, or a SPL composite type such as a map. You can perform Create, Read, Update, Delete (CRUD) operations simultaneously on numerous stores that you own, share, and manage within the context of one or more Streams applications. You can create your own stores and share application-specific data across multiple Streams processing elements (PEs) and across multiple Streams applications running on one or more machines. The key-value pairs are kept inside each individual user-created stores. Applications can create and use many such user specified stores at the same time. There is also support for running arbitrary one-way data store commands from within your application. See the dpsRunDataStoreCommand() functions. The core functions needed to manage the lifecycle of a DPS store in a SPL application are in the com.ibm.streamsx.store.distributed namespace. These functions allow you to perform operations such as create/get/remove stores, put/get/check key value pairs, iterate over the store, serialize/deserialize stores.

Time to Live (TTL) Store

The toolkit also provides a set of functions for storing key-value pairs that are configured to expire after a certain time period. When the pre-assigned TTL value expires, these data items will be automatically removed from the data store. Key Value pairs stored and obtained using these functions will not belong to any user created stores. Instead, they will be stored at the top level global area of the configured Redis back-end. When applications have a need to put data items with a TTL value and get them back, you can use these functions without having to create individual stores. These functions all have a "TTL" suffix. These ephemeral key value pairs can only be used with the aforementioned functions and cannot be used with any functions that take store name or store id as a function argument.

A non-zero TTL value passed to these functions will automatically remove the key-value pairs at the end of their specified lifetime. A TTL value of zero will keep the key-value pair in the back-end data store forever or until such time as they are removed manually using the dpsRemoveTTL() function.

These functions will return true or false indicating whether the operation succeeded or not. In the case of a false return value, an error code and an error string can be obtained. In the Redis back-end data store, TTL based data items can coexist with other user created stores containing data items that could live forever. Such TTL based APIs only provide a limited set of functions (put, get, has, and remove), but at the same time will have a slightly lower overhead than the feature rich non-TTL based functions.

Distributed Locking API

The DPS Toolkit also provides the following distributed locking functions which can be used for accessing stores from multiple threads and multiple processes safely without overriding each other. This is achieved using a trust based cooperative locking scheme to gain exclusive access into the stores for performing a set of transaction based store activities.

Using the toolkit in SPL applications

All the functionality of the toolkit is available via the native functions in the com.ibm.streamsx.store.distributed and com.ibm.streamsx.lock.distributed namespaces. You need to include these namespaces in your SPL application via a use directive.

Using the toolkit in Java applications

Ensure that <dps_toolkit_home>/impl/java/lib/dps-helper.jar, which contains the Java implementation of the DPS functions is accessible to your application. Packages of interest are com.ibm.streamsx.dps and com.ibm.streamsx.dl. See the Javadoc for full documentation.

Using the toolkit in C++ applications

Include the C++ header file DistributedProcessStoreWrappers.h found in impl/include. This is the main entry point for the C++ functions, which are in the C++ namespace com::ibm::streamsx::store::distributed.

Getting Started

The following snippets demonstrate the basic usage of the toolkit from SPL and Java. Usage from C++ is very similar to the SPL example below.

SPL:

rstring dummyRstring = "";
uint32 dummyUint32 = 0u;
mutable uint64 err = 0ul;
mutable uint64 dbStore_handle = 0ul;
dbStore_handle = dpsCreateStore("myDBStore1", dummyRstring, dummyUint32, err);

if (err == 0ul ) { //no error occured
	 //create lock for the store
	mutable uint64 lock_id = dlCreateOrGetLock("My db store lock", err);
	// Acquire the newly created lock, specifying a lease time and maximum time to wait to acquire the lock.
	float64 max_wait = 10.0;
	float64 lease_time = 10.0;
	dlAcquireLock(lock_id, lease_time, max_wait, err);
	//add a key/value pair to the store
	mutable boolean result = true;
	rstring key = "IBM";
	uint32 value = 399;
	err = 0ul;
	result = dpsPut(dbStore_handle, key, value, err);
	
	if (err != 0ul) {
		//use  dpsGetLastStoreErrorCode() and  dpsGetLastStoreErrorString() as needed
	}
	// finished our store operations, release the lock
	err = 0ul;
	dlReleaseLock(lock_id, err);	
}

Java:

StoreFactory sf = DistributedStores.getStoreFactory();
Store store = null;

try {
   //specify the SPL types for the keys and values in the store
   String keyType = "rstring";
   String valueType = "int32";
   store = sf.createOrGetStore("Java Test Store1", keyType, valueType);
} catch (StoreFactoryException sfe) {
	// use	sfe.getErrorCode() and  sfe.getErrorMessage()) for more info
}

...
//once ready to access the store,
//get the lock for the store, may have previously been created
   LockFactory lf = DistributedLocks.getLockFactory(); 
   Lock myLock = lf.createOrGetLock("Lock_For_Test_Store1");

// Acquire the lock
try {
 	myLock.acquireLock();
} catch (LockException le) {
	System.out.print("Unable to acquire the lock named 'Lock_For_Test_Store1'");
	System.out.println(" Error = " + le.getErrorCode() + ", Error msg = " + le.getErrorMessage());
    throw le;
}

//perform store operations
store.put("IBM", 39);
store.put("Lenovo", 50);
//release the lock  when finished
myLock.releaseLock();

Note that error checking in the above examples is minimal.

Error Handling in C++ and SPL

Most C++ functions include a mutable err parameter that will contain the result of executing the function. If an error occurs, this variable's value will be non-zero. It is the caller's responsibility to provide a mutable parameter to contain the error code and check its value afterwards.

In addition, there are functions that can be called when an error occurs to return the last error code and a message describing the error. The correct function to call after an operation depends on the operation that was last executed.
  • For errors relating to the store functions, use dpsGetLastStoreErrorCode() and dpsGetLastStoreErrorString().
  • For errors related to the TTL functions, use dpsGetLastStoreErrorCodeTTL() and dpsGetLastStoreErrorStringTTL().
  • For locking related errors, use dlGetLastDistributedLockErrorCode() and dlGetLastDistributedLockErrorString().

The following example shows how to check for errors, after a function call, in this case, after creating a lock:

mutable uint64 err = 0ul;
mutable uint64 lock_id = dlCreateOrGetLock("My Sentinel Lock1", err);

if (err != 0ul) {
	rstring msg = dlGetLastDistributedLockErrorString();
	uint64 rc = dlGetLastDistributedLockErrorCode();
	printStringLn("Error creating lock, rc = " + (rstring)(rc) + ", msg =" + msg );
}

Additional Examples

To specifically learn how to call the DPS APIs from SPL native functions, C++ and Java primitive operators, download the SPL-Examples-For-Beginners-In-Streams4x.tar.gz package from developerWorks®. The examples in that package explain how to call the DPS APIs from different parts of a Streams application. In particular, you may want to refer to the following three examples within the package:

  • 058_data_sharing_between_non_fused_spl_custom_and_cpp_primitive_operators
  • 061_data_sharing_between_non_fused_spl_custom_operators_and_a_native_function
  • 062_data_sharing_between_non_fused_spl_custom_and_java_primitive_operators

Redis configuration

The DPS Toolkit supports the Redis version 2.8.21 server and above (referred to as the DPS Toolkit redis implementation); and the Redis version 3.0.0 server (including support for the new cluster feature) and above (referred to as the DPS Toolkit redis-cluster implementation).

IMPORTANT TO NOTE:
  • With the Redis v2.8.x data store, you can manually configure and start multiple instances of Redis to listen and run on different ports and machines. The DPS toolkit will internally treat these Redis v2.8.x multiple instances as multiple shards and distribute the key-value pairs among those instances to improve performance with increased memory capacity. Use of the Redis v2.8.x store in any form (single instance or multiple instances) must be identified as “redis” in the DPS toolkit configuration file.
  • The new Redis 3.x release is based on a completely new architecture with a built-in clustering and fault tolerance facility. The Redis v3.x data store offers an elaborate method to run multiple instances across one or more machines in a master/slave configuration for load balancing as well as for fault tolerance features. Use of the Redis v3.x store in any form (single instance mode or cluster mode) must be identified as “redis-cluster” in the DPS toolkit configuration file.

Prerequisites

In addition to Redis, the DPS toolkit requires installation of one of the following supported operating system and gcc version combinations:

On Linux x86_64:
  • RedHat Enterprise Linux 6.4 (or an equivalent CentOS version) + gcc version 4.4.7 20120313 (Red Hat 4.4.7-3) (GCC)
  • RedHat Enterprise Linux 7.1 (or an equivalent CentOS version) + gcc version 4.8.3 20140911 (Red Hat 4.8.3-9) (GCC)
  • SUSE Linux Enterprise Server (SLES)) 11 + gcc version 4.3.4 )
On Linux for IBM® Power® 8 Little Endian:
  • RedHat Enterprise Linux 7.1 + gcc version 4.8.3 20140911 (Red Hat 4.8.3-9) (GCC)
On Linux for IBM Power 7 Big Endian:
  • RedHat Enterprise Linux 6.4 (or an equivalent CentOS version) + gcc version 4.4.7 20120313 (Red Hat 4.4.7-3) (GCC)
Additionally, one of the following Redis store versions are required:
  • Redis v2.8.21 or higher
  • Redis v3.0.0 or higher
See http://redis.io/topics/admin for Redis store Linux administration requirements.

Redis configuration

The DPS Toolkit supports the Redis version 2.8.21 server and above (referred to as the DPS Toolkit redis implementation); and the Redis version 3.0.0 server (including support for the new cluster feature) and above (referred to as the DPS Toolkit redis-cluster implementation).

IMPORTANT TO NOTE:
  • With the Redis v2.8.x data store, you can manually configure and start multiple instances of Redis to listen and run on different ports and machines. The DPS toolkit will internally treat these Redis v2.8.x multiple instances as multiple shards and distribute the key-value pairs among those instances to improve performance with increased memory capacity. Use of the Redis v2.8.x store in any form (single instance or multiple instances) must be identified as “redis” in the DPS toolkit configuration file.
  • The new Redis 3.x release is based on a completely new architecture with a built-in clustering and fault tolerance facility. The Redis v3.x data store offers an elaborate method to run multiple instances across one or more machines in a master/slave configuration for load balancing as well as for fault tolerance features. Use of the Redis v3.x store in any form (single instance mode or cluster mode) must be identified as “redis-cluster” in the DPS toolkit configuration file.

Notes for Redis 2.8.x (redis) store

With the Redis 2.8.x store, the DPS toolkit supports a multi-server setup. If you have multiple Redis servers configured, using the DPS Time To Live (TTL) APIs will result in better scalability of your put/get requests than using the APIs that take the user created store id as a function argument. Choose the APIs according to your functional and scaling needs. When configuring the system to run multiple Redis server instances on a single machine for scaling purposes, take into account the total number of CPU cores available on that single machine. Start only a suitable number of Redis server instances on a single machine to give them enough CPU cycles to handle your peak workload of put/get requests. If you require more CPU cores than are available on a single machine, run Redis server instances on multiple machines.

While running multiple Redis server instances either on a single machine or on multiple machines, the DPS logic will automatically shard (or distribute) the put/get workload to different Redis server instanc

Notes for Redis 3.x (redis cluster)

Redis version 3.0.0 and higher provides a new feature to install and run a Redis cluster. You can install multiple Redis instances and configure them with a replication factor. For every master Redis instance, you can have one or more slave instances. When the DPS toolkit writes data into the Redis v3.x cluster, it will be automatically sharded and distributed among all the master Redis instances running on your Redis cluster. Also, with the Redis v3.x store, when a master Redis instance fails, its slave copy will automatically take-over the operation to ensure as much continuous operation as possible on a subset of the Redis nodes. Note that high available requires three or more instances.

Installing the Client

In order to run and use the DPS toolkit, it is important to have the no-sql store client software available on all the machines where the Streams applications will be running. The DPS toolkit manages the client installation for you by including the Redis client libraries (.so files) in the toolkit installation. The DPS toolkit uses the hiredis C client and the hiredis-cluster wrapper include files for accessing the Redis no-sql store. The library files are included in the impl/ext/lib sub-directory of the DPS toolkit.

Configuring the DPS toolkit

Copy the DPS toolkit sample configuration file from STREAMS_INSTALL/samples/com.ibm.streamsx.dps/dp_test_1/etc/no-sql-kv-store-servers.cfg to your-project-directory/etc/no-sql-kv-store-servers.cfg. If using the Redis v2.8.x store, specify redis in the configuration file; if using the Redis v3.x store, specify redis-cluster in the configuration file. Each time you create a new SPL project, copy the configuration file from the example provided in the DPS toolkit to the /etc directory inside of your SPL project directory. Make any configuration changes needed to the file, based on your application's needs.

Reference information

DPS Java API Reference

Version
2.0.9
Required Product Version
4.0.0.0

Namespaces

com.ibm.streamsx.lock.distributed
Functions
com.ibm.streamsx.store.distributed
Functions