Wednesday 15 August 2012

hadoop - mapreduce reducer size is wrong -


I am writing a simple MapReduce program, how many times each row appears in the input. My goal is to have two identical data in the directories. So my goal in reducing the stage is that each key actually appears twice (one in each input directory)

This is my code -

  Public class resultsValidator configured device {public static class tupals scanner mapper and lieutenant; Bytesable, Tapsible, Bitesrat, Longwaytate & gt; {Private longevative one = new long (1); @ Override Public Aid Map (Bitesable row, unrecognized ignore, context reference) throws IOException, interrupted; Exception {reference.write (line, one); }} Public Static Class TuplesSummer Reducer & Lt; Bitesable, Longweight, Bitesrat, LongVerable & gt; {@ Override throws IOException, disrupts Public Wide Low (bytesable line, iterative> langarative> values, context context); Exception {int sum = 0; (Value for long value: value) {sum + = value.get (); } Context.write (line, new long (sum)); }} Public Static Class Tuples Reducer Reducer & Lt; Bitesable, longweight, bite, null, viable & gt; {@ Override throws IOException, disrupts Public Wide Low (bytesable line, iterative> langarative> values, context context); Exception {int sum = 0; (Value for long value: value) {sum + = value.get (); } If (zodiac! = 2) {context.write (line, NullWritable.get ()); }}} Public at run (string [] args) IOException throws, Interrupted speed, ClassNotFoundException {Job Job = Job.getInstance (getConf ()); Path inputDir0 = new path (args [0]); Path inputDir1 = new path (args [1]); Path outputDir = new path (args [2]); Int radiusanum = integer .Portset (args [3]); If (outputDir.getFileSystem (getConf) is present (outputDir)) {new IOException ("Output directory" + output + "already exists"); } FileInputFormat.addInputPath (Job, inputDir0); FileInputFormat.addInputPath (Job, inputDir1); FileOutputFormat.setOutputPath (job, outputDir); Job.setJobName ("ResultsValidator"); Job.setJarByClass (ResultsValidator.class); Job.setMapperClass (TuplesScanner.class); Job.setCombinerClass (TuplesCombiner.class); Job.setReducerClass (TuplesReducer.class); Job.setNumReduceTasks (reducersNum); Job.setMapOutputKeyClass (BytesWritable.class); Job.setMapOutputValueClass (LongWritable.class); Job.setOutputKeyClass (BytesWritable.class); Job.setOutputValueClass (NullWritable.class); Job.setInputFormatClass (ResultsValidatorInputFormat.class); Job.setOutputFormatClass (ResultsValidatorOutputFormat.class); Return job End of waiting (true)? 0: 1; } Public static zero main (string [] args throws exceptions {int res = ToolRunner.run (new configuration), new resultant (), args); System.exit (race); }}  

I have not found the reason that due to lowering the stage I get a wrong number in the log, I think every reddener gets a number, which merge The number of shuffle made is equal to the number.

Where am I?


No comments:

Post a Comment