I recently worked on a customer project, part of which
required sorting and merging multiple large files based on a timestamp field in
the files. These files were Call data records (CDR) files which are basically
CSV files containing usage records for all subscribers of 2G, 3G and GPRS for
each day. The total number of files was about 2000 with varied number of 2G, 3G
and GPRS files. The total size was roughly 200 Gb.
Although this can be done using BigMemory, the customer wanted
a quick and dirty way of doing this.
Each of the files had a particular naming convention that
would help understand what type of file it is. The 1st 2 or 3
characters enable us to understand what type of file it is. Ex. File names –
3G
'GMI02A_13082900_0183'
'GMI02A_13082900_0184'
2G
'RTPAHLR1_13082700_8853'
'RTPAHLR1_13082701_8854'
'RTPAHLR1_13082702_8855'
'RTPAHLR1_13082703_8856'
'RTPAHLR1_13082704_8857'
GPRS
'MOU05_1308270005'
'MOU05_1308270020'
'MOU05_1308270035'
As you can see the characters before the first underscore
identify what type of file it is. The characters after the underscore are
timestamps. If we sort and group the files as per these timestamps it is likely
that we will have the same range of timestamps in the file contents. An
interesting thing to note is, there can be multiple files per timestamp, which
is the character after the 2nd underscore for 3G and 2G.
Single sample line from each of the CSV was as follows (with dummy
data) –
2G
5x944324461a1b18d08d7ab25d9fa1595f,'ABCDFE12_13050112_9998','
','','4','43',' ',' ','','','466974301229529','886983095821','','',' ','
','','','2013-05-01 11:10:58.000',307,' ','0','0',' ','','',' ',' ',' ','','',' ','
','',' ',' ',' ','','1','2','1',’6899066686','','','','1','2','1','6899066686','
',' ','','',' ',' ','
','','',' ','08-0-9-3 ','08-0-9-3 ',' ',' ABCDFE12 ','17','0','
',' ',' ',0,'IGMRL01','2-15','ONXDEM','7-25','0','','','778158',0,0,'','','
',,'','','',,,'','','','','','','',' ',0,'','','','','','','',' ',' ','','','
',' ',' ',' ','8DCA28F284000007','1','1','1 ','656935***319','','','','','','','
',' ','
','2013-05-01 11:04:58.000',307,' ','',' ',181,'1','1','1 ','656935***319','
',' ',' ',' ',' ',' ','
','0','2','0','1','D9110935388202','','
','','','',' ','',' ',' ',' ',,'','',' ',' ',' ',' ',' ',' ',' ','','','','','653095821','456683539688','6544066686','0823206559','','','','','','','654066686','34222206559','',''
GPRS
5x944324461a1f18d093bfb25d9fa1595f,'MCCDSC_1305011035','18','0','3','1339655','466974301229529','2013-05-01
10:03:37.000','355026050423934','886983095821','133.22.29.4','31.39.11.321,'11892013','INTERNET','CMSC099.M22dd66.GPRS','1','111.334.23.12','413427','239','58844','1','8
','46697 ','0','1','2',0,0,'02','FF','00','00','00','FE','00','00','00','00','00','4A','00','02','02','93','96','83','FE','74','81','FF','FF','00','00','00','2013-05-01
09:56:37.000','2013-05-01 10:03:03.000',386,'0
','0',0,0,'00','00','00','00','00','00','00','00','00','00','00','00',,,0,'0','','0','0','','','00','0',0,0,0,0,,,,,'5630934411','7331295359688','00','00','00','3','1','1','0','0','0','','
','',' ','','',' ','01306800','Samsung','Galaxy','Handset','1','','','','','','0','','0','0'
3G
5x944324461a1f18d093ba1595f,'DGCCSD_13050112_2938',1,1,1,1,1,1,1,1,0,2013-05-01
12:55:08.000',0,'2013-05-01
12:48:04.000','11','00','413B','55','0AD8','2013-05-01
12:47:59.000','03','FF','
','FFFF','','466974104424565','','','','','','','','FF','939644958 ','05','06','5430***660','05','06','65535',65535,,,'65535',,'FF',65535,,,'00','03','FFFF','67822059982110','123974700000830','','0000','0000','0000','0000','0000','0000','07','886983***686','05','05','8488',21111,466,97,'19546',886935***416,'05',21401,466,97,'FFFFFFFFFFFFFFFF',,'FF','FF','00000000',' ','3','8','2013-05-01 12:50:49.000','2013-05-01
12:48:08.000','F5B9','8130***660','05','06','00',' ','FFFF','FF',0,'1233804097','05','04',0,0,0,0,0,0,'886935***374',0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,' ','2013-05-01
12:47:59.000',,'',,,'00',0,0,0,0,0,0,0,0,0,'413B','55','0AD8','','FF','10','FF','00',0,0,'',' ','
',,0,0,0,0,0,161,161.11,'01','0',0,'000000','08',,53,'1246',0,0,'FFFF','FFFFF','01','FF',436,62871450,'00','1','00','',' ','
','FF','FF','FF','FF','FF','FF','FF',0,'2013-05-01
12:47:58.000',2,'00',,,,,,,,,'',,'
','FFFF','FF','00','FF','FF','00','FF','2013-05-01
12:47:59.000','642***660',’1243408079','823***686','124191389688','121***660','066208079','','','',''
The idea was to read 3 files (of each type) at a time and
insert into a shared sorted buffer. Once the reading is done, write out the
sorted buffer into an output file, clear it and move onto the next batch of 3
files. Another requirement for the writer was that, all data should not be merge sorted into a single file, rather should be split into multiple output files. So the File writer thread must keep track of the number of lines written and roll over to the next output file.
The problem with this approach is that not all files will
contain the same timestamp.
Ex. 2G might be till timestamp T5, 3G till timestamp T7 and
GPRS till timestamp T4.
In this case we can only flush data till timestamp T4 and
retain all the data after that in the buffer since the next batch of files
might contain those timestamps to be sorted. This puts additional memory
pressure.
The approach I used was to do a modified concurrent external
merge sort.
- Based on
producer consumer pattern
- 3 threads read from
the 3 files in parallel (2G, 3G and GPRS) and insert into a shared task buffer
- 1 thread
consumes from the buffer
- Reader threads
read line by line and insert into a data structure, which internally sorts
on timestamp via a custom comparator for each insert. It is basically a ConcurrentSkipListMap
backed by several ConcurrentLinkedQueue. The key for the Map is the
timestamp, and the value is the List of lines associated with that
timestamp.
- After the each
reader thread finishes inserting into the task queue, they wait on a
CyclicBarrier. The last thread to reach the barrier notifies the Consumer
that the file reading and sorting is completed into the queue
- Consumer is
awakened and just spits out the sorted Map into a CSV
- Once file is
written, CyclicBarrier is reset and cycle is repeated for next batch of files
Here is a snippet of the sorted map
A snippet of the reader (producer) thread looks like this
And a snippet of the writer (consumer) thread looks like this
A sample flow of the threads with CyclicBarrier coordination is as follows –
On a 4 core, 35Gb machine this with 120 Gb worth of data
files, this took about 2 hours to complete. This solution worked for my use
case, however, optimally you would read only a predefined number of lines into
the buffer rather than the entire file to avoid heap pressure.
On a side note, I tested this with the CMS and G1 GC and I
found CMS to be much more performant and predictable for my use case.
The entire code base is available here https://github.com/sourabhghose/LargeFileMergeSort
The entire code base is available here https://github.com/sourabhghose/LargeFileMergeSort