The DataGrid APIs support two common grid programming patterns: parallel map and parallel reduce.
Parallel Map
The parallel map allows the entries for a set of keys to be processed and returns a result for each entry processed. The application makes a list of keys and receives a Map of key/result pairs after invoking a Map operation. The result is the result of applying a function to the entry of each key. The function is supplied by the application.
MapGridAgent call flow
When the AgentManager.callMapAgent method is invoked with a collection of keys, the MapGridAgent instance is serialized and sent to each primary partition that the keys resolve to. This means that any instance data stored in the agent can be sent to the server. Each primary partition therefore has one instance of the agent. The process method is invoked for each instance one time for each key that resolves to the partition. The result of each process method is then serialized back to the client and returned to the caller in a Map instance, where the result is represented as the value in the map.
import com.ibm.websphere.projector.annotations.Entity;
import com.ibm.websphere.projector.annotations.Id;
@Entity
public class Person
{
@Id String ssn;
String firstName;
String surname;
int age;
}
The application supplied function is written as a class
that implements the MapAgentGrid interface. An
example agent that shows a function to return the age of a Person
multiplied by two.public class DoublePersonAgeAgent implements MapGridAgent, EntityAgentMixin
{
private static final long serialVersionUID = -2006093916067992974L;
int lowAge;
int highAge;
public Object process(Session s, ObjectMap map, Object key)
{
Person p = (Person)key;
return new Integer(p.age * 2);
}
public Map processAllEntries(Session s, ObjectMap map)
{
EntityManager em = s.getEntityManager();
Query q = em.createQuery("select p from Person p where p.age > ?1 and p.age < ?2");
q.setParameter(1, lowAge);
q.setParameter(2, highAge);
Iterator iter = q.getResultIterator();
Map<Person, Interger> rc = new HashMap<Person, Integer>();
while(iter.hasNext())
{
Person p = (Person)iter.next();
rc.put(p, (Integer)process(s, map, p));
}
return rc;
}
public Class getClassForEntity()
{
return Person.class;
}
}
The previous example shows the Map agent for doubling
a Person. The first process method is supplied with the Person to
work with and returns double the age of that entry. The second process
method is called for each partition and finds all Person objects with
an age between lowAge and highAge and returns their ages doubled. Session s = grid.getSession();
ObjectMap map = s.getMap("Person");
AgentManager amgr = map.getAgentManager();
DoublePersonAgeAgent agent = new DoublePersonAgeAgent();
// make a list of keys
ArrayList<Person> keyList = new ArrayList<Person>();
Person p = new Person();
p.ssn = "1";
keyList.add(p);
p = new Person ();
p.ssn = "2";
keyList.add(p);
// get the results for those entries
Map<Tuple, Object> = amgr.callMapAgent(agent, keyList);
// Close the session (optional in Version 7.1.1 and later) for improved performance
s.close();
The previous
example shows a client obtaining a Session and a reference to the
Person Map. The agent operation is performed against a specific Map.
The AgentManager interface is retrieved from that Map. An instance
of the agent to invoke is created and any necessary state is added
to the object by setting attributes, there are none in this case.
A list of keys are then constructed. A Map with the values for person
1 doubled, and the same values for person 2 are returned.The agent is then invoked for that set of keys. The agents process method is invoked on each partition with some of the specified keys in the grid in parallel. A Map is returned providing the merged results for the specified key. In this case, a Map with the values holding the age for person 1 doubled and the same for person 2 is returned.
Session s = grid.getSession();
ObjectMap map = s.getMap("Person");
AgentManager amgr = map.getAgentManager();
DoublePersonAgeAgent agent = new DoublePersonAgeAgent();
agent.lowAge = 20;
agent.highAge = 9999;
Map m = amgr.callMapAgent(agent);
The previous example
shows the AgentManager being obtained for the Person Map, and the
agent constructed and initialized with the low and high ages for Persons
of interest. The agent is then invoked using the callMapAgent method.
Notice, no keys are supplied. As a result, the ObjectGrid invokes
the agent on every partition in the grid in parallel and returns the
merged results to the client. This set of returns contains all Person
objects in the grid with an age between low and high and calculates
the age of those Person objects doubled. This example shows how the
grid APIs can be used to run a query to find entities that match a
certain query. The agent is serialized and transported by the ObjectGrid
to the partitions with the needed entries. The results are similarly
serialized for transport back to the client. Care needs to be taken
with the Map APIs. If the ObjectGrid was hosting terabytes of objects
and running on many servers, then potentially this processing would
overwhelm client machines. Use Map APIs to process a small subset.
If a large subset needs processing, use a reduce agent to do the processing
out in the data grid rather than on a client.Parallel Reduction or aggregation agents
ReduceGridAgent call flow
When the AgentManager.callReduceAgent method is invoked with a collection of keys, the ReduceGridAgent instance is serialized and sent to each primary partition that the keys resolve to. This means that any instance data stored in the agent can be sent to the server. Each primary partition therefore has one instance of the agent. The reduce(Session s, ObjectMap map, Collection keys) method is invoked once per instance (partition) with the subset of keys that resolves to the partition. The result of each reduce method is then serialized back to the client. The reduceResults method is invoked on the client ReduceGridAgent instance with the collection of each result from each remote reduce invocation. The result from the reduceResults method is returned to the caller of the callReduceAgent method.
package com.ibm.ws.objectgrid.test.agent.jdk5;
import java.util.Collection;
import java.util.Iterator;
import com.ibm.websphere.objectgrid.ObjectMap;
import com.ibm.websphere.objectgrid.Session;
import com.ibm.websphere.objectgrid.datagrid.EntryErrorValue;
import com.ibm.websphere.objectgrid.datagrid.ReduceGridAgent;
import com.ibm.websphere.objectgrid.query.ObjectQuery;
import com.ibm.websphere.samples.objectgrid.entityxmlgen.PersonFeature1Entity.PersonKey;
public class SumAgeReduceAgent implements ReduceGridAgent {
private static final long serialVersionUID = 2521080771723284899L;
/**
* Invoked on the server if a collection of keys is passed to
* AgentManager.callReduceAgent(). This is invoked on each primary shard
* where the key applies.
*/
public Object reduce(Session s, ObjectMap map, Collection keyList) {
try {
int sum = 0;
Iterator<PersonKey> iter = keyList.iterator();
while (iter.hasNext()) {
Object nextKey = iter.next();
PersonKey pk = (PersonKey) nextKey;
Person p = (Person) map.get(pk);
sum += p.age;
}
return sum;
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
/**
* Invoked on the server if a collection of keys is NOT passed to
* AgentManager.callReduceAgent(). This is invoked on every primary shard.
*/
public Object reduce(Session s, ObjectMap map) {
ObjectQuery q = s
.createObjectQuery("select p from Person p where p.age > -1");
Iterator<Person> iter = q.getResultIterator();
int sum = 0;
while (iter.hasNext()) {
Object nextKey = iter.next();
Person p = (Person) nextKey;
sum += p.age;
}
return sum;
}
/**
* Invoked on the client to reduce the results from all partitions.
*/
public Object reduceResults(Collection results) {
// If we encounter an EntryErrorValue, then throw a RuntimeException
// to indicate that there was at least one failure and include each
// EntryErrorValue
// as part of the thrown exception.
Iterator<Integer> iter = results.iterator();
int sum = 0;
while (iter.hasNext()) {
Object nextResult = iter.next();
if (nextResult instanceof EntryErrorValue) {
EntryErrorValue eev = (EntryErrorValue) nextResult;
throw new RuntimeException(
"Error encountered on one of the partitions: "
+ nextResult, eev.getException());
}
sum += ((Integer) nextResult).intValue();
}
return new Integer(sum);
}
}
The previous example shows the agent. The agent has three
important parts. The first allows a specific set of entries to be
processed without a query. It iterates over the set of entries, adding
the ages. The sum is returned from the method. The second uses a query
to select the entries to be aggregated. It then sums all the matching
Person ages. The third method is used to aggregate the results from
each partition to a single result. The ObjectGrid performs the entry
aggregation in parallel across the grid. Each partition produces an
intermediate result that must be aggregated with other partition intermediate
results. This third method performs that task. In the following example
the agent is invoked, and the ages of all Persons with ages 10 - 20
exclusively are aggregated:Session s = grid.getSession();
ObjectMap map = s.getMap("Person");
AgentManager amgr = map.getAgentManager();
SumAgeReduceAgent agent = new SumAgeReduceAgent();
Person p = new Person();
p.ssn = "1";
ArrayList<Person> list = new ArrayList<Person>();
list.add(p);
p = new Person ();
p.ssn = "2";
list.add(p);
Integer v = (Integer)amgr.callReduceAgent(agent, list);
// Close the session (optional in Version 7.1.1 and later) for improved performance
s.close();
Agent functions
The agent is free to do ObjectMap or EntityManager operations within the local shard where it is running. The agent receives a Session and can add, update, query, read, or remove data from the partition the Session represents. Some applications query only data from the grid, but you can also write an agent to increment all the Person ages by 1 that match a certain query. There is a transaction on the Session when the agent is called, and is committed when the agent returns unless an exception is thrown
Error handling
If a map agent is invoked with an unknown key then the value that is returned is an error object that implements the EntryErrorValue interface.
Transactions
A map agent runs in a separate transaction from the client. Agent invocations may be grouped into a single transaction. If an agent fails and throws an exception, the transaction is rolled back. Any agents that ran successfully in a transaction rolls back with the failed agent. The AgentManager reruns the rolled-back agents that ran successfully in a new transaction.