DataGrid API example
The DataGrid APIs support two common grid programming patterns: parallel map and parallel reduce.
Parallel Map
The parallel map allows the entries for a set of keys to be processed and returns a result for each entry processed. The application makes a list of keys and receives a Map of key/result pairs after invoking a Map operation. The result is the result of applying a function to the entry of each key. The function is supplied by the application.
MapGridAgent call flow
When the AgentManager.callMapAgent method is invoked with a collection of keys, the MapGridAgent instance is serialized and sent to each primary partition that the keys resolve to. This means that any instance data stored in the agent can be sent to the server. Each primary partition therefore has one instance of the agent. The process method is invoked for each instance one time for each key that resolves to the partition. The result of each process method is then serialized back to the client and returned to the caller in a Map instance, where the result is represented as the value in the map.
import com.ibm.websphere.projector.annotations.Entity;
import com.ibm.websphere.projector.annotations.Id;
@Entity
public class Person
{
@Id String ssn;
String firstName;
String surname;
int age;
}
The
application supplied function is written as a class that implements the
MapAgentGrid interface. An example agent that shows a function to return the age
of a Person multiplied by
two.public class DoublePersonAgeAgent implements MapGridAgent, EntityAgentMixin
{
private static final long serialVersionUID = -2006093916067992974L;
int lowAge;
int highAge;
public Object process(Session s, ObjectMap map, Object key)
{
Person p = (Person)key;
return new Integer(p.age * 2);
}
public Map processAllEntries(Session s, ObjectMap map)
{
EntityManager em = s.getEntityManager();
Query q = em.createQuery("select p from Person p where p.age > ?1 and p.age < ?2");
q.setParameter(1, lowAge);
q.setParameter(2, highAge);
Iterator iter = q.getResultIterator();
Map<Person, Interger> rc = new HashMap<Person, Integer>();
while(iter.hasNext())
{
Person p = (Person)iter.next();
rc.put(p, (Integer)process(s, map, p));
}
return rc;
}
public Class getClassForEntity()
{
return Person.class;
}
}
The
previous example shows the Map agent for doubling a Person. The first process method is supplied
with the Person to work with and returns double the age of that entry. The second process method is
called for each partition and finds all Person objects with an age between lowAge and highAge and
returns their ages doubled.
Session s = grid.getSession();
ObjectMap map = s.getMap("Person");
AgentManager amgr = map.getAgentManager();
DoublePersonAgeAgent agent = new DoublePersonAgeAgent();
// make a list of keys
ArrayList<Person> keyList = new ArrayList<Person>();
Person p = new Person();
p.ssn = "1";
keyList.add(p);
p = new Person ();
p.ssn = "2";
keyList.add(p);
// get the results for those entries
Map<Tuple, Object> = amgr.callMapAgent(agent, keyList);
// Close the session (optional in Version 7.1.1 and later) for improved performance
s.close();
The
previous example shows a client obtaining a Session and a reference to the Person Map. The agent
operation is performed against a specific Map. The AgentManager interface is retrieved from that
Map. An instance of the agent to invoke is created and any necessary state is added to the object by
setting attributes, there are none in this case. A list of keys are then constructed. A Map with the
values for person 1 doubled, and the same values for person 2 are returned.The agent is then invoked for that set of keys. The agents process method is invoked on each partition with some of the specified keys in the grid in parallel. A Map is returned providing the merged results for the specified key. In this case, a Map with the values holding the age for person 1 doubled and the same for person 2 is returned.
Session s = grid.getSession();
ObjectMap map = s.getMap("Person");
AgentManager amgr = map.getAgentManager();
DoublePersonAgeAgent agent = new DoublePersonAgeAgent();
agent.lowAge = 20;
agent.highAge = 9999;
Map m = amgr.callMapAgent(agent);
The
previous example shows the AgentManager being obtained for the Person Map, and the agent constructed
and initialized with the low and high ages for Persons of interest. The agent is then invoked using
the callMapAgent method. Notice, no keys are supplied. As a result, the
ObjectGrid invokes the agent on every partition in the grid in parallel and returns the merged
results to the client. This set of returns contains all Person objects in the grid with an age
between low and high and calculates the age of those Person objects doubled. This example shows how
the grid APIs can be used to run a query to find entities that match a certain query. The agent is
serialized and transported by the ObjectGrid to the partitions with the needed entries. The results
are similarly serialized for transport back to the client. Care needs to be taken with the Map APIs.
If the ObjectGrid was hosting terabytes of objects and running on many servers, then potentially
this processing would overwhelm client machines. Use Map APIs to process a small subset. If a large
subset needs processing, use a reduce agent to do the processing out in the data grid rather than on
a client.Parallel Reduction or aggregation agents
- Minimum value
- Maximum value
- Some other business-specific function
ReduceGridAgent call flow
When the AgentManager.callReduceAgent method is invoked with a collection of keys, the ReduceGridAgent instance is serialized and sent to each primary partition that the keys resolve to. This means that any instance data stored in the agent can be sent to the server. Each primary partition therefore has one instance of the agent. The reduce(Session s, ObjectMap map, Collection keys) method is invoked once per instance (partition) with the subset of keys that resolves to the partition. The result of each reduce method is then serialized back to the client. The reduceResults method is invoked on the client ReduceGridAgent instance with the collection of each result from each remote reduce invocation. The result from the reduceResults method is returned to the caller of the callReduceAgent method.
package com.ibm.ws.objectgrid.test.agent.jdk5;
import java.util.Collection;
import java.util.Iterator;
import com.ibm.websphere.objectgrid.ObjectMap;
import com.ibm.websphere.objectgrid.Session;
import com.ibm.websphere.objectgrid.datagrid.EntryErrorValue;
import com.ibm.websphere.objectgrid.datagrid.ReduceGridAgent;
import com.ibm.websphere.objectgrid.query.ObjectQuery;
import com.ibm.websphere.samples.objectgrid.entityxmlgen.PersonFeature1Entity.PersonKey;
public class SumAgeReduceAgent implements ReduceGridAgent {
private static final long serialVersionUID = 2521080771723284899L;
/**
* Invoked on the server if a collection of keys is passed to
* AgentManager.callReduceAgent(). This is invoked on each primary shard
* where the key applies.
*/
public Object reduce(Session s, ObjectMap map, Collection keyList) {
try {
int sum = 0;
Iterator<PersonKey> iter = keyList.iterator();
while (iter.hasNext()) {
Object nextKey = iter.next();
PersonKey pk = (PersonKey) nextKey;
Person p = (Person) map.get(pk);
sum += p.age;
}
return sum;
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
/**
* Invoked on the server if a collection of keys is NOT passed to
* AgentManager.callReduceAgent(). This is invoked on every primary shard.
*/
public Object reduce(Session s, ObjectMap map) {
ObjectQuery q = s
.createObjectQuery("select p from Person p where p.age > -1");
Iterator<Person> iter = q.getResultIterator();
int sum = 0;
while (iter.hasNext()) {
Object nextKey = iter.next();
Person p = (Person) nextKey;
sum += p.age;
}
return sum;
}
/**
* Invoked on the client to reduce the results from all partitions.
*/
public Object reduceResults(Collection results) {
// If we encounter an EntryErrorValue, then throw a RuntimeException
// to indicate that there was at least one failure and include each
// EntryErrorValue
// as part of the thrown exception.
Iterator<Integer> iter = results.iterator();
int sum = 0;
while (iter.hasNext()) {
Object nextResult = iter.next();
if (nextResult instanceof EntryErrorValue) {
EntryErrorValue eev = (EntryErrorValue) nextResult;
throw new RuntimeException(
"Error encountered on one of the partitions: "
+ nextResult, eev.getException());
}
sum += ((Integer) nextResult).intValue();
}
return new Integer(sum);
}
}
The
previous example shows the agent. The agent has three important parts. The first allows a specific
set of entries to be processed without a query. It iterates over the set of entries, adding the
ages. The sum is returned from the method. The second uses a query to select the entries to be
aggregated. It then sums all the matching Person ages. The third method is used to aggregate the
results from each partition to a single result. The ObjectGrid performs the entry aggregation in
parallel across the grid. Each partition produces an intermediate result that must be aggregated
with other partition intermediate results. This third method performs that task. In the following
example the agent is invoked, and the ages of all Persons with ages 10 - 20 exclusively are
aggregated:Session s = grid.getSession();
ObjectMap map = s.getMap("Person");
AgentManager amgr = map.getAgentManager();
SumAgeReduceAgent agent = new SumAgeReduceAgent();
Person p = new Person();
p.ssn = "1";
ArrayList<Person> list = new ArrayList<Person>();
list.add(p);
p = new Person ();
p.ssn = "2";
list.add(p);
Integer v = (Integer)amgr.callReduceAgent(agent, list);
// Close the session (optional in Version 7.1.1 and later) for improved performance
s.close();
Agent functions
The agent is free to do ObjectMap or EntityManager operations within the local shard where it is running. The agent receives a Session and can add, update, query, read, or remove data from the partition the Session represents. Some applications query only data from the grid, but you can also write an agent to increment all the Person ages by 1 that match a certain query. There is a transaction on the Session when the agent is called, and is committed when the agent returns unless an exception is thrown
Error handling
If a map agent is invoked with an unknown key then the value that is returned is an error object that implements the EntryErrorValue interface.
Transactions
A map agent runs in a separate transaction from the client. Agent invocations may be grouped into a single transaction. If an agent fails and throws an exception, the transaction is rolled back. Any agents that ran successfully in a transaction rolls back with the failed agent. The AgentManager reruns the rolled-back agents that ran successfully in a new transaction.