Exception in Reducer in Hadoop when run on Cluster

I have a map reduce program that runs perfectly when run in stand-alone mode but when I run it on Hadoop Cluster at my school, an exception is happening in the Reducer. I have no clue what exception it is. I came to know this as when I keep a try/catch in reducer, the job passes but empty output. When I don't keep the try/catch, job fails. Since it is a school cluster, I do not have access to any of the job trackers or other files. All I can find is through programatically only. Is there a way I can find what exception happened on hadoop during run time ?

Following are snippets of my code

public static class RowMPreMap extends MapReduceBase implements
            Mapper<LongWritable, Text, Text, Text> {

    private Text keyText = new Text();
    private Text valText = new Text();

    public void map(LongWritable key, Text value,
            OutputCollector<Text, Text> output, Reporter reporter)
            throws IOException {

        // Input: (lineNo, lineContent)

        // Split each line using seperator based on the dataset.
        String line[] = null;

        line = value.toString().split(Settings.INPUT_SEPERATOR);

        keyText.set(line[0]);
        valText.set(line[1] + "," + line[2]);

        // Output: (userid, "movieid,rating")
        output.collect(keyText, valText);
    }
}

public static class RowMPreReduce extends MapReduceBase implements
        Reducer<Text, Text, Text, Text> {

    private Text valText = new Text();

    public void reduce(Text key, Iterator<Text> values,
            OutputCollector<Text, Text> output, Reporter reporter)
            throws IOException {

        // Input: (userid, List<movieid, rating>)

        float sum = 0.0F;
        int totalRatingCount = 0;

        ArrayList<String> movieID = new ArrayList<String>();
        ArrayList<Float> rating = new ArrayList<Float>();

        while (values.hasNext()) {
            String[] movieRatingPair = values.next().toString().split(",");
            movieID.add(movieRatingPair[0]);
            Float parseRating = Float.parseFloat(movieRatingPair[1]);
            rating.add(parseRating);

            sum += parseRating;
            totalRatingCount++;
        }

        float average = ((float) sum) / totalRatingCount;

        for (int i = 0; i < movieID.size(); i++) {
            valText.set("M " + key.toString() + " " + movieID.get(i) + " "
                    + (rating.get(i) - average));
            output.collect(null, valText);
        }

        // Output: (null, <M userid, movieid, normalizedrating>)
    }
}

Exception happens in the above reducer. Below is the config

public void normalizeM() throws IOException, InterruptedException {
    JobConf conf1 = new JobConf(UVDriver.class);
    conf1.setMapperClass(RowMPreMap.class);
    conf1.setReducerClass(RowMPreReduce.class);
    conf1.setJarByClass(UVDriver.class);

    conf1.setMapOutputKeyClass(Text.class);
    conf1.setMapOutputValueClass(Text.class);

    conf1.setOutputKeyClass(Text.class);
    conf1.setOutputValueClass(Text.class);

    conf1.setKeepFailedTaskFiles(true);

    conf1.setInputFormat(TextInputFormat.class);
    conf1.setOutputFormat(TextOutputFormat.class);

    FileInputFormat.addInputPath(conf1, new Path(Settings.INPUT_PATH));
    FileOutputFormat.setOutputPath(conf1, new Path(Settings.TEMP_PATH + "/"
            + Settings.NORMALIZE_DATA_PATH_TEMP));

    JobConf conf2 = new JobConf(UVDriver.class);
    conf2.setMapperClass(ColMPreMap.class);
    conf2.setReducerClass(ColMPreReduce.class);
    conf2.setJarByClass(UVDriver.class);

    conf2.setMapOutputKeyClass(Text.class);
    conf2.setMapOutputValueClass(Text.class);

    conf2.setOutputKeyClass(Text.class);
    conf2.setOutputValueClass(Text.class);

    FileInputFormat.addInputPath(conf2, new Path(Settings.TEMP_PATH + "/"
            + Settings.NORMALIZE_DATA_PATH_TEMP));
    FileOutputFormat.setOutputPath(conf2, new Path(Settings.TEMP_PATH + "/"
            + Settings.NORMALIZE_DATA_PATH));

    Job job1 = new Job(conf1);
    Job job2 = new Job(conf2);

    JobControl jobControl = new JobControl("jobControl");
    jobControl.addJob(job1);
    jobControl.addJob(job2);
    job2.addDependingJob(job1);
    handleRun(jobControl);

}

Answers


I caught the exception in reducer and write the stack trace to a file in the file system. I know this is the dirtiest possible way of doing this, but I have no option at this point. Following is the code if it helps any one in future. Put the code in catch block.

                String valueString = "";
                while (values.hasNext()) {
                    valueString += values.next().toString();
                }

                StringWriter sw = new StringWriter();
                e.printStackTrace(new PrintWriter(sw));
                String exceptionAsString = sw.toString();

                Path pt = new Path("errorfile");
                FileSystem fs = FileSystem.get(new Configuration());
                BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fs.create(pt,true)));
                br.write(exceptionAsString + "\nkey: " + key.toString() + "\nvalues: " + valueString);
                br.close();

Inputs to do this in a clean way are welcome.

On a sider note, Eventually I found it is a NumberFormatException. Counters would not have helped me identify this. Later I realized the format of splitting input in stand-alone and on cluster is happening in different fashion, which I am yet to find the reason.


Even if you don't have access to the server, you can get the counters for a job:

Counters counters = job.getCounters();

and dump the set of counters to your local console. These counters will show, among other things, the counts for the number of records input to and written from the mappers and reducers. The counters that have value zero indicate the problem location in your workflow. You can instrument your own counters to help debug/monitor the flow.


Need Your Help

What exactly is CPU load if instructions are executed one at a time?

cpu scheduling cpu-usage scheduler

I know this question has been asked many times in many different manners, but it's still not clear for me what the CPU load % means.

Change Resource getString() programmatically

java android getstring

Is there any way to change a R.string programmatically? Because it's throwing an error.