Friday 14 September 2012

Fixing "Inconsistent split cardinality" problem in Mahout Matrix Multiplication

I was trying out mahout's matrix multiplication facility that  can be invoked via the times() method of a DistributedRowMatrix instance. i.e.

1:  final Configuration conf = new Configuration();  
2:  DistributedRowMatrix A = new DistributedRowMatrix(   
3:                    new Path("/A"), new Path("/tmp/A"), 100, 60000);  
4:  A.setConf(conf);  
5:  DistributedRowMatrix B = new DistributedRowMatrix(   
6:                    new Path("/B"), new Path("/tmp/B"), 100, 30);  
7:  B.setConf(conf);  
8:  DistributedRowMatrix C = A.times(B);  // C = A'*B

I got the following exception:

12/09/01 23:33:46 ERROR security.UserGroupInformation: PriviledgedActionException as:ahmed cause:java.io.IOException: Inconsistent split cardinality from child 1 (2/1)
Exception in thread "main" java.io.IOException: Inconsistent split cardinality from child 1 (2/1)
    at org.apache.hadoop.mapred.join.Parser$CNode.getSplits(Parser.java:369)
    at org.apache.hadoop.mapred.join.CompositeInputFormat.getSplits(CompositeInputFormat.java:117)
    at org.apache.hadoop.mapred.JobClient.writeOldSplits(JobClient.java:989)
    at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:981)
    at org.apache.hadoop.mapred.JobClient.access$600(JobClient.java:174)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:897)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:850)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
    at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:850)
    at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:824)
    at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1261)
    at org.apache.mahout.math.hadoop.DistributedRowMatrix.times(DistributedRowMatrix.java:189)
    at baselines.ReconstructionError.clacReconstructionErr(ReconstructionError.java:45)
    at baselines.Main.runRandomSelection(Main.java:108)
    at baselines.Main.main(Main.java:154)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:156)


Checking out the source of the MatrixMultiplicationJob, I found that the job's input format is  CompositeInputFormat of the matrices being multiplied. The CompositeInputFormat joins the input records of the two matrices on the Row Number and provides the mapper with an IntWritable (the row number i) and a TubleWritable (combined record of row #i of the first matrix and row #i of the second matrix). In order for the CompositeInputFormat to be able to join the two matrices, the two matrices should be sorted and partitioned the same. And that was the cause of the exception above. The two matrices were not partitioned the same. The first matrix was written to a single file (original format) while the second matrix was the output of a previous MapReduce job with number of reducers > 1.

Although It is not an uncommon scenario, the documentation of the times() method mentions nothing about that "limitation". My solution was to repartition one of the matrices using the same partitioner (default hashing function of Hadoop's Partitioner + number of reducers) of the other. In my case, one of the matrices (A) was pretty much larger than the other one (B). So to minimize the partitioning overhead, I decided to repartition the small one (B). One way to know about the number of A's partitions is to count the number of the files under A's directory (each reducer generates a single file).

The repartitioning method is simple. we just use the default Mapper and the default Reducer and set the number of the reducers to the number of A's partitions. Here is the my implementation:
1:  public static void repartitionMatrix(Path input, Path output, int numPartitions) throws Exception {  
2:    final Configuration conf = new Configuration();  
3:    Job job = new Job(conf);  
4:    job.setJarByClass(Helpers.class);  
5:    FileInputFormat.addInputPaths(job, input.toString());  
6:    FileOutputFormat.setOutputPath(job, output);  
7:    job.setMapperClass(Mapper.class);  
8:    job.setReducerClass(Reducer.class);  
9:    job.setOutputKeyClass(IntWritable.class);  
10:    job.setOutputValueClass(VectorWritable.class);  
11:    job.setInputFormatClass(SequenceFileInputFormat.class);  
12:    job.setOutputFormatClass(SequenceFileOutputFormat.class);  
13:    job.setNumReduceTasks(numPartitions);  
14:    job.waitForCompletion(false);  
15:    if (!job.isSuccessful())  
16:        throw new RuntimeException("Repartition Job is unsuccessful");  
17:  }  


1 comment: