Wednesday, September 25, 2013

Concurrent Merge sorting of large number of different types of files in Java

I recently worked on a customer project, part of which required sorting and merging multiple large files based on a timestamp field in the files. These files were Call data records (CDR) files which are basically CSV files containing usage records for all subscribers of 2G, 3G and GPRS for each day. The total number of files was about 2000 with varied number of 2G, 3G and GPRS files. The total size was roughly 200 Gb.

Although this can be done using BigMemory, the customer wanted a quick and dirty way of doing this.

Each of the files had a particular naming convention that would help understand what type of file it is. The 1st 2 or 3 characters enable us to understand what type of file it is. Ex. File names –

3G
'GMI02A_13082900_0183'
'GMI02A_13082900_0184'
  
2G
'RTPAHLR1_13082700_8853'
'RTPAHLR1_13082701_8854'
'RTPAHLR1_13082702_8855'
'RTPAHLR1_13082703_8856'
'RTPAHLR1_13082704_8857'

GPRS
'MOU05_1308270005'
'MOU05_1308270020'
'MOU05_1308270035'

As you can see the characters before the first underscore identify what type of file it is. The characters after the underscore are timestamps. If we sort and group the files as per these timestamps it is likely that we will have the same range of timestamps in the file contents. An interesting thing to note is, there can be multiple files per timestamp, which is the character after the 2nd underscore for 3G and 2G.

Single sample line from each of the CSV was as follows (with dummy data) –

2G
5x944324461a1b18d08d7ab25d9fa1595f,'ABCDFE12_13050112_9998',' ','','4','43',' ',' ','','','466974301229529','886983095821','','',' ',' ','','','2013-05-01 11:10:58.000',307,' ','0','0',' ','','','                        ',' ',' ','','',' ',' ','',' ',' ',' ','','1','2','1',’6899066686','','','','1','2','1','6899066686',' ',' ','','','                ',' ',' ','','','      ','08-0-9-3    ','08-0-9-3    ','                                  ',' ABCDFE12   ','17','0','  ',' ',' ',0,'IGMRL01','2-15','ONXDEM','7-25','0','','','778158',0,0,'','',' ',,'','','',,,'','','','','','','',' ',0,'','','','','','','',' ',' ','','',' ','  ',' ','  ','8DCA28F284000007','1','1','1 ','656935***319','','','','','','',' ','  ','  ','2013-05-01 11:04:58.000',307,' ','',' ',181,'1','1','1 ','656935***319',' ','        ',' ',' ',' ','  ','  ','0','2','0','1','D9110935388202','','  ','','','',' ','',' ','  ','  ',,'','',' ',' ',' ',' ',' ',' ','  ','','','','','653095821','456683539688','6544066686','0823206559','','','','','','','654066686','34222206559','',''

GPRS
5x944324461a1f18d093bfb25d9fa1595f,'MCCDSC_1305011035','18','0','3','1339655','466974301229529','2013-05-01 10:03:37.000','355026050423934','886983095821','133.22.29.4','31.39.11.321,'11892013','INTERNET','CMSC099.M22dd66.GPRS','1','111.334.23.12','413427','239','58844','1','8 ','46697 ','0','1','2',0,0,'02','FF','00','00','00','FE','00','00','00','00','00','4A','00','02','02','93','96','83','FE','74','81','FF','FF','00','00','00','2013-05-01 09:56:37.000','2013-05-01 10:03:03.000',386,'0 ','0',0,0,'00','00','00','00','00','00','00','00','00','00','00','00',,,0,'0','','0','0','','','00','0',0,0,0,0,,,,,'5630934411','7331295359688','00','00','00','3','1','1','0','0','0','',' ','',' ','','',' ','01306800','Samsung','Galaxy','Handset','1','','','','','','0','','0','0'

3G
5x944324461a1f18d093ba1595f,'DGCCSD_13050112_2938',1,1,1,1,1,1,1,1,0,2013-05-01 12:55:08.000',0,'2013-05-01 12:48:04.000','11','00','413B','55','0AD8','2013-05-01 12:47:59.000','03','FF','  ','FFFF','','466974104424565','','','','','','','','FF','939644958               ','05','06','5430***660','05','06','65535',65535,,,'65535',,'FF',65535,,,'00','03','FFFF','67822059982110','123974700000830','','0000','0000','0000','0000','0000','0000','07','886983***686','05','05','8488',21111,466,97,'19546',886935***416,'05',21401,466,97,'FFFFFFFFFFFFFFFF',,'FF','FF','00000000','  ','3','8','2013-05-01 12:50:49.000','2013-05-01 12:48:08.000','F5B9','8130***660','05','06','00','    ','FFFF','FF',0,'1233804097','05','04',0,0,0,0,0,0,'886935***374',0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,'  ','2013-05-01 12:47:59.000',,'',,,'00',0,0,0,0,0,0,0,0,0,'413B','55','0AD8','','FF','10','FF','00',0,0,'','  ','  ',,0,0,0,0,0,161,161.11,'01','0',0,'000000','08',,53,'1246',0,0,'FFFF','FFFFF','01','FF',436,62871450,'00','1','00','','  ','  ','FF','FF','FF','FF','FF','FF','FF',0,'2013-05-01 12:47:58.000',2,'00',,,,,,,,,'',,'      ','FFFF','FF','00','FF','FF','00','FF','2013-05-01 12:47:59.000','642***660',’1243408079','823***686','124191389688','121***660','066208079','','','',''

The idea was to read 3 files (of each type) at a time and insert into a shared sorted buffer. Once the reading is done, write out the sorted buffer into an output file, clear it and move onto the next batch of 3 files. Another requirement for the writer was that, all data should not be merge sorted into a single file, rather should be split into multiple output files. So the File writer thread must keep track of the number of lines written and roll over to the next output file.

The problem with this approach is that not all files will contain the same timestamp.
Ex. 2G might be till timestamp T5, 3G till timestamp T7 and GPRS till timestamp T4.
In this case we can only flush data till timestamp T4 and retain all the data after that in the buffer since the next batch of files might contain those timestamps to be sorted. This puts additional memory pressure.

The approach I used was to do a modified concurrent external merge sort.
  1. Based on producer consumer pattern
  2. 3 threads read from the 3 files in parallel (2G, 3G and GPRS) and insert into a shared task buffer
  3. 1 thread consumes from the buffer
  4. Reader threads read line by line and insert into a data structure, which internally sorts on timestamp via a custom comparator for each insert. It is basically a ConcurrentSkipListMap backed by several ConcurrentLinkedQueue. The key for the Map is the timestamp, and the value is the List of lines associated with that timestamp.
  5. After the each reader thread finishes inserting into the task queue, they wait on a CyclicBarrier. The last thread to reach the barrier notifies the Consumer that the file reading and sorting is completed into the queue
  6. Consumer is awakened and just spits out the sorted Map into a CSV
  7. Once file is written, CyclicBarrier is reset and cycle is repeated for next batch of files


Here is a snippet of the sorted map

A snippet of the reader (producer) thread looks like this

And a snippet of the writer (consumer) thread looks like this

A sample flow of the threads with CyclicBarrier coordination is as follows –


On a 4 core, 35Gb machine this with 120 Gb worth of data files, this took about 2 hours to complete. This solution worked for my use case, however, optimally you would read only a predefined number of lines into the buffer rather than the entire file to avoid heap pressure.
On a side note, I tested this with the CMS and G1 GC and I found CMS to be much more performant and predictable for my use case.
The entire code base is available here https://github.com/sourabhghose/LargeFileMergeSort

Thursday, December 27, 2012

Clustered Spring SessionRegistry: Spring Security Concurrent sessions in a clustered environment



I was recently working with a client whose application has a login module developed on spring security that was to be clustered using web sessions. The requirement was that if the user logs in twice into the system, the previous session should be invalidated and redirected to an error page. 
Spring security provides this functionality out of the box using ConcurrentSessionFilter. This filter makes use of an internal session registry that keep track of what users have logged in and their session details. 

However while integrating with Terracotta web sessions we found that this did not work. After investigation we found that the default session registry implementation was not clustered. I.e. it was creating local copies of session on each server. Thus the user was able to login multiple times.

In order to make this work the SessionRegistry work in a distributed environment or in front of a load balancer, we created a custom session registry on top of Ehcache. All the session details are now populated into Ehcache and get replicated across the servers. Hence a login session created on one server was now visible to the other servers and we could invalidate the previous session.

Attached project is tested on Terracotta 3.7.2, Jboss 7, Spring 3.0.5.

Download the code here.

Tuesday, September 4, 2012

Plugging in Ehcache into iBATIS


As you probably know that Ehcache is the Hibernate’s default 2nd level cache. Integrating Ehcache into iBATIS is relatively easy using iBATIS’s CacheController interface. Using the CacheController interface you can plugin your own custom caching solution or plug in any third party caching solution. The javadoc for CacheController is here. In order to plugin Ehcache, you must implement the CacheController interface as follows.

Implementing the CacheController:

import java.io.File;
import java.net.URL;
import java.util.Properties;

import net.sf.ehcache.Element;
import net.sf.ehcache.CacheManager;
import net.sf.ehcache.Cache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ibatis.sqlmap.engine.cache.CacheModel;
import com.ibatis.sqlmap.engine.cache.CacheController;


public class EhcacheIbatisCacheController implements CacheController {
    final private static Logger logger = LoggerFactory.getLogger(EhcacheIbatisCacheController.class);
    
    /** EhCache CacheManager. */
    private CacheManager cacheManager;

    /**
     * Flush a cache model.
     * @param cacheModel - the model to flush.
     */
    public void flush(CacheModel cacheModel) {
        getCache(cacheModel).removeAll();
    }

    /**
     * Get an object from a cache model.
     * @param cacheModel - the model.
     * @param key        - the key to the object.
     * @return the object if in the cache.
     */
    public Object getObject(CacheModel cacheModel, Object key) {
        Object result = null;
        try {
            Element element = getCache(cacheModel).get(key.toString());
            if (element != null) {
                result = element.getObjectValue();
            }
        }
        catch(Exception e) {
            logger.debug("cache miss, will check in db");
        }
        return result;

    }

    /**
     * Put an object into a cache model.
     * @param cacheModel - the model to add the object to.
     * @param key        - the key to the object.
     * @param object     - the value to add.
     */
    public void putObject(CacheModel cacheModel, Object key, Object object) {
        getCache(cacheModel).put(new Element(key.toString(), object));
    }

    /**
     * Remove an object from a cache model.
     * @param cacheModel - the model to remove the object from.
     * @param key        - the key to the object.
     * @return the removed object.
     */
    public Object removeObject(CacheModel cacheModel, Object key) {
        Object result = this.getObject(cacheModel, key.toString());
        getCache(cacheModel).remove(key.toString());
        return result;
    }

    /**
     * Configure a cache controller. Initialize the Cache Manager of Ehcache
     * @param props - the properties object containing configuration information.
     */
    public void setProperties(Properties props) {

        String configFile = props.getProperty("configFile");
        File file = new File(configFile);
        if(file.exists()) {
            cacheManager = CacheManager.create(file.getAbsolutePath());
        }
        else {
            URL url = getClass().getResource(configFile);
            cacheManager = CacheManager.create(url);
        }
    }

    /**
     * Gets a ehcache based on an iBatis cache Model.
     * @param cacheModel - the cache model.
     * @return the Cache.
     */
    private Cache getCache(CacheModel cacheModel) {
        String cacheName = cacheModel.getId();
        return cacheManager.getCache(cacheName);
    }

}

Each method provides access to the CacheModel that controls the cache so you can access parameters in the CacheModel when required.

Registering EhcacheIbatisController with iBATIS: 


CacheModels and CacheControllers must be registered in the ibatis xml. 1st create an alias for the Ehcache controller as follows:


Now we need to apply the Cache controller to a CacheModel definition in the xml.







Sunday, October 16, 2011

EHCache - Write behind example

What is Write-Behind?

Write behind is asynchronous writing of data to the underlying database. Thus, when data is being written to the Cache, instead of writing simultaneously to the database, the cache saves the data into a queue and allows a background thread to write to the database later. 

This is a transformative capability because now you can:
  1. Move writes to the database at a particular time
  2. Use write coalescing, which means if there are multiple updates on the same key in the queue, only the latest one is considered
  3. Batch multiple write operations
  4. Specify the number of retry attempts in case of write failure
Here is an introductory video.

In order to write behind, you need to first implement the CacheWriter interface
/*
This class handles writing to the database or your backend persistence storage
*/
public class EhcacheWriteBehindClass implements CacheWriter {

 @Override
 public CacheWriter clone(Ehcache arg0) throws CloneNotSupportedException {
  throw new CloneNotSupportedException("EhcacheWriteBehindClass cannot be cloned!");
 }
 
 @Override
 public void delete(CacheEntry arg0) throws CacheException {
  // TODO Auto-generated method stub
  
 }
 
 @Override
 public void deleteAll(Collection arg0) throws CacheException {
  // TODO Auto-generated method stub
  
 }
 
 @Override
 public void dispose() throws CacheException {
  // You can close database connections here
  
 }
 
 @Override
 public void init() {
  // You can initialize the database here
  
 }
 
 @Override
 public void write(Element arg0) throws CacheException {
                // Typically you would write to your database here
  System.out.println("Write : Key is " + arg0.getKey());
  System.out.println("Write : Value is " + arg0.getValue());
 }
 
 @Override
 public void writeAll(Collection arg0) throws CacheException {
  // TODO Auto-generated method stub
  System.out.println("Write All");
 }

 @Override
 public void throwAway(Element arg0, SingleOperationType arg1,
   RuntimeException arg2) {
  // TODO Auto-generated method stub
  
 }
}

This class is instantiated by the CacheWriterFactory:

public class WriteBehindClassFactory extends CacheWriterFactory {

 public CacheWriter createCacheWriter(Ehcache arg0, Properties arg1) {
  return new EhcacheWriteBehindClass();
 }
}

Now register the factory in the ehcache.xml as follows:
 

           

              




In order to use this write behind functionality, your class would look like this:
 
public class EhcacheWriteBehindTest {

 public static void main(String[] args) throws Exception {
  // pass in the number of object you want to generate, default is 10
  int numberOfObjects = Integer.parseInt(args.length == 0 ? "100": args[0]);
  System.out.println(numberOfObjects);
  //create the CacheManager
  CacheManager cacheManager = CacheManager.getInstance();
  //get a handle on the Cache - the name "myCache" is the name of a cache in the ehcache.xml file
  Cache myCache = cacheManager.getCache("writeBehindCache");
  
  //iterate through numberOfObjects and use the iterator as the key, value does not matter at this time
  for (int i = 0; i < numberOfObjects; i++) {
   String key = new Integer(i).toString();
   if (!checkInCache(key, myCache)) {
    //when putting in the cache, it is as an Element, the key and the value must be serializable
    myCache.putWithWriter(new Element(key, "Value"));
    System.out.println(key + " NOT in cache!!!");
   } else {
    System.out.println("Put with writer ... value1");
                               //note, we use the putWithWriter method and not the put method
    myCache.putWithWriter(new Element(key, "Value1"));
   }
  }
  
  while (true) {
   Thread.sleep(1000);
  }
 }
 
 //check to see if the key is in the cache
 private static boolean checkInCache(String key, Cache myCache) throws Exception {
  Element element = myCache.get(key);
  boolean returnValue = false;
  if (element != null) {
   System.out.println(key + " is in the cache!!!");
   returnValue = true;
  }
  return returnValue;
 }
}

Thats it! For a detailed explanation of the configurations involved have a look at this.

The limitation of this is that if your JVM goes down, your write-behind queue is lost. In order to avoid this you can used clustered Terracotta, which uses the Terracotta Server Array. In this case the queue is maintained at the Terracotta Server Array which provides HA features. If one client JVM were to go down, any changes it put into the write-behind queue can always be loaded by threads in other  clustered JVMs, therefore will be applied to the database without any data loss. 


Terracotta Server Array is an enterprise feature and can be configured extremely easily. You can download a trial version from here


The only change you need to make in this app to make it clustered is in the ehcache.xml. You ehcache.xml would now look like this:



 
           
              

            
   

 

terracottaConfig url="localhost:9510" is where your Terracotta Server Array runs.

Wednesday, August 17, 2011

How to keep the Database in sync with your cache?


There are few different ways to achieve this. You can put the onus on the underlying cache to fetch the data periodically or when it determines that the data is stale. Secondly you can put the onus on the underlying database to “push” updates periodically or when the data is updated.

Read Heavy use cases

Cache -> DB


1.     The most straightforward way is to set the Time to Live (TTL) and Time to Idle (TTI) on the cache so the data will expire periodically. The next request will result in a cache miss and your application will pull the current value from the underlying database and put it into the cache.
Few things to note here are:
a.     There might be a window when data is in cache and is not in synch with the underlying database
b.     A cache miss can be interpreted as a performance hit.

This is called read-through caching.



1.     An alternate approach is to perform cache updates or invalidation periodically - use a batch process (could be scheduled using open source Quartz) running in periodic intervals to either invalidate the cache or update the cache. You could do this by using SelfPopulating Ehcache.


DB->Cache


1.     You could also transfer the synching onus to the underlying database itself. For ex. Oracle AQ provides a way to register a call back when any database updates happen. This can be leveraged to either invalidate or update the cache store.

2.     Alternatively you could also use middleware technologies like GoldenGate, JMS to capture DB changes when they occur to "push" notifications into the Memory Store.

 

Write Heavy use cases


1.     There are scenario’s that require frequent updates to stored data. Every update to the cached data must invoke a simultaneous update to the database at the same time. This is the Write-through feature provided by Ehcache. However, updates to the database are almost always slower, so this slows the effective update rate to the cache and thus the performance in general. When many write requests come in at the same time, the database can easily become a bottleneck or, even worse, be killed by heavy writes in a short period of time. The Write-behind feature provided by Ehcache allows quick cache writes with ensured consistency between cache data and database. The idea is that when writing data into the cache, instead of writing the data into database at the same time, the write-behind cache saves the changed data into a queue and lets a backend thread to do the writing later. Therefore, the cache-write process can proceed without waiting for the database-write and, thus, be finished much faster. Any data that has been changed can be persisted into database eventually. In the mean time, any read from cache will still get the latest data.

In case of Terracotta, the Terracotta Server Array maintains the write-behind queue. A thread on each JVM checks the shared queue and save each data change left in the queue.

1     Finally you could also make you application update the cache and DB simultaneously. It is advisable to use transactions to perform this in the following manner:
a.     Start a transaction
b.     Update the database
c.      Update the cache
d.     Commit the transaction
Some points to remember are that your update code is directly aware of the cache and there is a performance impact since your update latency reflects both DB and cache update time.