[Java programming language only]

Example: Writing a write-behind dumper class

This sample source code shows how to write a watcher (dumper) to handle failed write-behind updates.

//
//This sample program is provided AS IS and may be used, executed, copied and
//modified without royalty payment by customer (a) for its own instruction and
//study, (b) in order to develop applications designed to run with an IBM
//WebSphere product, either for customer's own internal use or for redistribution
//by customer, as part of such an application, in customer's own products. "
//
//5724-J34 (C) COPYRIGHT International Business Machines Corp. 2009
//All Rights Reserved * Licensed Materials - Property of IBM
//
package utils;

import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

import com.ibm.websphere.objectgrid.BackingMap;
import com.ibm.websphere.objectgrid.ObjectGrid;
import com.ibm.websphere.objectgrid.ObjectGridException;
import com.ibm.websphere.objectgrid.ObjectGridRuntimeException;
import com.ibm.websphere.objectgrid.ObjectMap;
import com.ibm.websphere.objectgrid.Session;
import com.ibm.websphere.objectgrid.UndefinedMapException;
import com.ibm.websphere.objectgrid.plugins.ObjectGridEventGroup;
import com.ibm.websphere.objectgrid.plugins.ObjectGridEventListener;
import com.ibm.websphere.objectgrid.writebehind.FailedUpdateElement;
import com.ibm.websphere.objectgrid.writebehind.WriteBehindLoaderConstants;

/**
 * Write behind expects transactions to the Loader to succeed. If a transaction for a key fails then
 * it inserts an entry in a Map called PREFIX + mapName. The application should be checking this
 * map for entries to dump out write behind transaction failures. The application is responsible for
 * analyzing and then removing these entries. These entries can be large as they include the key, before
 * and after images of the value and the exception itself. Exceptions can easily be 20k on their own.
 * 
 * The class is registered with the grid and an instance is created per primary shard in a JVM. It creates 
 * a single thread
 * and that thread then checks each write behind error map for the shard, prints out the problem and 
 * then removes the entry.
 * 
 * This means there will be one thread per shard. If the shard is moved to another JVM then the deactivate 
 * method stops the thread.
 * @author bnewport
 *
 */
public class WriteBehindDumper implements ObjectGridEventListener, ObjectGridEventGroup.ShardEvents,
 Callable<Boolean>
{
	static Logger logger = Logger.getLogger(WriteBehindDumper.class.getName());
	
	ObjectGrid grid;
	
	/**
	 * Thread pool to handle table checkers. If the application has it's own pool
	 * then change this to reuse the existing pool
	 */
	static ScheduledExecutorService pool = new ScheduledThreadPoolExecutor(2); // two threads to dump records

	// the future for this shard
	ScheduledFuture<Boolean> future;
	
	// true if this shard is active
	volatile boolean isShardActive;

	/**
	 * Normal time between checking Maps for write behind errors
	 */
	final long BLOCKTIME_SECS = 20L;
	
	/**
	 * An allocated session for this shard. No point in allocating them again and again
	 */
	Session session;
	/**
	 * When a primary shard is activated then schedule the checks to periodically check
	 * the write behind error maps and print out any problems
	 */
	public void shardActivated(ObjectGrid grid) 
	{
		try
		{
			this.grid = grid;
			session = grid.getSession();
	
			isShardActive = true;
			future = pool.schedule(this, BLOCKTIME_SECS, TimeUnit.SECONDS); // check every BLOCKTIME_SECS seconds initially
		}
		catch(ObjectGridException e)
		{
			throw new ObjectGridRuntimeException("Exception activating write dumper", e);
		}
	}

	/**
	 * Mark shard as inactive and then cancel the checker
	 */
	public void shardDeactivate(ObjectGrid arg0) 
	{
		isShardActive = false;
		// if it's cancelled then cancel returns true
		if(future.cancel(false) == false)
		{
			// otherwise just block until the checker completes
			while(future.isDone() == false) // wait for the task to finish one way or the other
			{
				try
				{
					Thread.sleep(1000L); // check every second
				}
				catch(InterruptedException e)
				{
				}
			}
		}
	}

	/**
	 * Simple test to see if the map has write behind enabled and if so then return
	 * the name of the error map for it.
	 * @param mapName The map to test
	 * @return The name of the write behind error map if it exists otherwise null
	 */
	static public String getWriteBehindNameIfPossible(ObjectGrid grid, String mapName)
	{
		BackingMap map = grid.getMap(mapName);
		if(map != null && map.getWriteBehind() != null)
		{
			return WriteBehindLoaderConstants.WRITE_BEHIND_FAILED_UPDATES_MAP_PREFIX + mapName;
		}
		else
			return null;
	}

	/**
	 * This runs for each shard. It checks if each map has write behind enabled and if it does 
   * then it prints out any write behind
	 * transaction errors and then removes the record.
	 */
	public Boolean call()
	{
		logger.fine("Called for " + grid.toString());
		try
		{
			// while the primary shard is present in this JVM
			// only user defined maps are returned here, no system maps like write behind maps are in
			// this list.
			Iterator<String> iter = grid.getListOfMapNames().iterator();
			boolean foundErrors = false;
			// iterate over all the current Maps
			while(iter.hasNext() && isShardActive)
			{
				String origName = iter.next();
				
				// if it's a write behind error map
				String name = getWriteBehindNameIfPossible(grid, origName);
				if(name != null) 
				{
					// try to remove blocks of N errors at a time
					ObjectMap errorMap = null;
					try
					{
						errorMap = session.getMap(name);
					}
					catch(UndefinedMapException e)
					{
						// at startup, the error maps may not exist yet, patience...
						continue;
					}
					// try to dump out up to N records at once
					session.begin();
					for(int counter = 0; counter < 100; ++counter)
					{
						Integer seqKey = (Integer)errorMap.getNextKey(1L);
						if(seqKey != null)
						{
							foundErrors = true;
							FailedUpdateElement elem = (FailedUpdateElement)errorMap.get(seqKey);
							//
							// Your application should log the problem here
							logger.info("WriteBehindDumper ( " + origName + ") for key (" + elem.getKey() + ") Exception: " + 
								elem.getThrowable().toString());
							//
							//
							errorMap.remove(seqKey);
						}
						else
							break;
					}
					session.commit();
				}
			} // do next map
			// loop faster if there are errors
			if(isShardActive)
			{
				// reschedule after one second if there were bad records
				// otherwise, wait 20 seconds.
				if(foundErrors)
					future = pool.schedule(this, 1L, TimeUnit.SECONDS);
				else
					future = pool.schedule(this, BLOCKTIME_SECS, TimeUnit.SECONDS);
			}
		}
		catch(ObjectGridException e)
		{
			logger.fine("Exception in WriteBehindDumper" + e.toString());
			e.printStackTrace();
			
			//don't leave a transaction on the session.
			if(session.isTransactionActive())
			{
				try { session.rollback(); } catch(Exception e2) {}
			}
		}
		return true;
	}
	
	public void destroy() {
		// TODO Auto-generated method stub

	}

	public void initialize(Session arg0) {
		// TODO Auto-generated method stub

	}

	public void transactionBegin(String arg0, boolean arg1) {
		// TODO Auto-generated method stub

	}

	public void transactionEnd(String arg0, boolean arg1, boolean arg2,
			Collection arg3) {
		// TODO Auto-generated method stub

	}
}