Example: Writing a write-behind dumper class
This sample source code shows how to write a watcher (dumper) to handle failed write-behind updates.
//
//This sample program is provided AS IS and may be used, executed, copied and
//modified without royalty payment by customer (a) for its own instruction and
//study, (b) in order to develop applications designed to run with an IBM
//WebSphere product, either for customer's own internal use or for redistribution
//by customer, as part of such an application, in customer's own products. "
//
//5724-J34 (C) COPYRIGHT International Business Machines Corp. 2009
//All Rights Reserved * Licensed Materials - Property of IBM
//
package utils;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import com.ibm.websphere.objectgrid.BackingMap;
import com.ibm.websphere.objectgrid.ObjectGrid;
import com.ibm.websphere.objectgrid.ObjectGridException;
import com.ibm.websphere.objectgrid.ObjectGridRuntimeException;
import com.ibm.websphere.objectgrid.ObjectMap;
import com.ibm.websphere.objectgrid.Session;
import com.ibm.websphere.objectgrid.UndefinedMapException;
import com.ibm.websphere.objectgrid.plugins.ObjectGridEventGroup;
import com.ibm.websphere.objectgrid.plugins.ObjectGridEventListener;
import com.ibm.websphere.objectgrid.writebehind.FailedUpdateElement;
import com.ibm.websphere.objectgrid.writebehind.WriteBehindLoaderConstants;
/**
* Write behind expects transactions to the Loader to succeed. If a transaction for a key fails then
* it inserts an entry in a Map called PREFIX + mapName. The application should be checking this
* map for entries to dump out write behind transaction failures. The application is responsible for
* analyzing and then removing these entries. These entries can be large as they include the key, before
* and after images of the value and the exception itself. Exceptions can easily be 20k on their own.
*
* The class is registered with the grid and an instance is created per primary shard in a JVM. It creates
* a single thread
* and that thread then checks each write behind error map for the shard, prints out the problem and
* then removes the entry.
*
* This means there will be one thread per shard. If the shard is moved to another JVM then the deactivate
* method stops the thread.
* @author bnewport
*
*/
public class WriteBehindDumper implements ObjectGridEventListener, ObjectGridEventGroup.ShardEvents,
Callable<Boolean>
{
static Logger logger = Logger.getLogger(WriteBehindDumper.class.getName());
ObjectGrid grid;
/**
* Thread pool to handle table checkers. If the application has it's own pool
* then change this to reuse the existing pool
*/
static ScheduledExecutorService pool = new ScheduledThreadPoolExecutor(2); // two threads to dump records
// the future for this shard
ScheduledFuture<Boolean> future;
// true if this shard is active
volatile boolean isShardActive;
/**
* Normal time between checking Maps for write behind errors
*/
final long BLOCKTIME_SECS = 20L;
/**
* An allocated session for this shard. No point in allocating them again and again
*/
Session session;
/**
* When a primary shard is activated then schedule the checks to periodically check
* the write behind error maps and print out any problems
*/
public void shardActivated(ObjectGrid grid)
{
try
{
this.grid = grid;
session = grid.getSession();
isShardActive = true;
future = pool.schedule(this, BLOCKTIME_SECS, TimeUnit.SECONDS); // check every BLOCKTIME_SECS seconds initially
}
catch(ObjectGridException e)
{
throw new ObjectGridRuntimeException("Exception activating write dumper", e);
}
}
/**
* Mark shard as inactive and then cancel the checker
*/
public void shardDeactivate(ObjectGrid arg0)
{
isShardActive = false;
// if it's cancelled then cancel returns true
if(future.cancel(false) == false)
{
// otherwise just block until the checker completes
while(future.isDone() == false) // wait for the task to finish one way or the other
{
try
{
Thread.sleep(1000L); // check every second
}
catch(InterruptedException e)
{
}
}
}
}
/**
* Simple test to see if the map has write behind enabled and if so then return
* the name of the error map for it.
* @param mapName The map to test
* @return The name of the write behind error map if it exists otherwise null
*/
static public String getWriteBehindNameIfPossible(ObjectGrid grid, String mapName)
{
BackingMap map = grid.getMap(mapName);
if(map != null && map.getWriteBehind() != null)
{
return WriteBehindLoaderConstants.WRITE_BEHIND_FAILED_UPDATES_MAP_PREFIX + mapName;
}
else
return null;
}
/**
* This runs for each shard. It checks if each map has write behind enabled and if it does
* then it prints out any write behind
* transaction errors and then removes the record.
*/
public Boolean call()
{
logger.fine("Called for " + grid.toString());
try
{
// while the primary shard is present in this JVM
// only user defined maps are returned here, no system maps like write behind maps are in
// this list.
Iterator<String> iter = grid.getListOfMapNames().iterator();
boolean foundErrors = false;
// iterate over all the current Maps
while(iter.hasNext() && isShardActive)
{
String origName = iter.next();
// if it's a write behind error map
String name = getWriteBehindNameIfPossible(grid, origName);
if(name != null)
{
// try to remove blocks of N errors at a time
ObjectMap errorMap = null;
try
{
errorMap = session.getMap(name);
}
catch(UndefinedMapException e)
{
// at startup, the error maps may not exist yet, patience...
continue;
}
// try to dump out up to N records at once
session.begin();
for(int counter = 0; counter < 100; ++counter)
{
Integer seqKey = (Integer)errorMap.getNextKey(1L);
if(seqKey != null)
{
foundErrors = true;
FailedUpdateElement elem = (FailedUpdateElement)errorMap.get(seqKey);
//
// Your application should log the problem here
logger.info("WriteBehindDumper ( " + origName + ") for key (" + elem.getKey() + ") Exception: " +
elem.getThrowable().toString());
//
//
errorMap.remove(seqKey);
}
else
break;
}
session.commit();
}
} // do next map
// loop faster if there are errors
if(isShardActive)
{
// reschedule after one second if there were bad records
// otherwise, wait 20 seconds.
if(foundErrors)
future = pool.schedule(this, 1L, TimeUnit.SECONDS);
else
future = pool.schedule(this, BLOCKTIME_SECS, TimeUnit.SECONDS);
}
}
catch(ObjectGridException e)
{
logger.fine("Exception in WriteBehindDumper" + e.toString());
e.printStackTrace();
//don't leave a transaction on the session.
if(session.isTransactionActive())
{
try { session.rollback(); } catch(Exception e2) {}
}
}
return true;
}
public void destroy() {
// TODO Auto-generated method stub
}
public void initialize(Session arg0) {
// TODO Auto-generated method stub
}
public void transactionBegin(String arg0, boolean arg1) {
// TODO Auto-generated method stub
}
public void transactionEnd(String arg0, boolean arg1, boolean arg2,
Collection arg3) {
// TODO Auto-generated method stub
}
}