import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * NGram scans a directory of text files and generates a list of N-grams.
 * More information: <a href="http://en.wikipedia.org/wiki/N-gram">N-gram on Wikipedia</a>.
 *
 * The class has two map/reduce stages: one to find all N-grams starting
 * with the desired word or phrase, and the next to filter out (prune)
 * those N-grams that do not occur frequently enough.
 *
 * @author Philip M. White
 */
public class NGram extends Configured implements Tool {
	/**
	 * This is the map class of the first stage.
	 */
	public static class FindJob_MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
		private Text ngram = new Text();
		private String desiredPhrase;
		private int Nvalue;
		private static int paragraphNum = 0;
		private boolean caseSensitive;
		static enum Counters { INCOMPLETE_NGRAMS };
		
		public void configure(JobConf conf) {
			desiredPhrase = conf.get("mapper.desired-phrase");
			Nvalue = conf.getInt("mapper.N-value", 3);
			caseSensitive = conf.getBoolean("mapper.case-sensitive", false);
		}

		/**
		 * Takes a string of text, breaks it up by whitespace, applies WordManipulator
		 * methods to each word, and returns the ArrayList of words.
		 * @param paragraph	a paragraph (or some other quantity) of plain text
		 * @return		a list of cleaned words from the paragraph
		 */
		public ArrayList<String> findWords(final String paragraph) {
			ArrayList<String> words = new ArrayList<String>(100); // what's a good average number of words in a paragraph?
			words.addAll(Arrays.asList(paragraph.split("\\s")));
			// clean up
			String word;
			for (int w = 0; w < words.size();) {
				word = words.get(w);
				word = WordManipulator.fixCase(word, caseSensitive);
				word = WordManipulator.strip(WordManipulator.fixCase(word, caseSensitive), true);
				if (word.isEmpty()) {
					words.remove(w);
				}
				else {
					words.set(w, word); // update the ArrayList with the sanitized word
					w++;
				}
			}
			return words;
		}

		public void map(LongWritable key, Text value, 
				OutputCollector<Text, IntWritable> output, 
				Reporter reporter) throws IOException {
			reporter.setStatus("Processing paragraph #" + (++paragraphNum));
			String paragraph = value.toString(), ngram_tmp;
			int i, newpos;
			do {
				// If the phrase doesn't exist anywhere in the paragraph, skip this paragraph.
				if ((i = paragraph.indexOf(desiredPhrase)) == -1)
					return;
				newpos = i+desiredPhrase.length();
				ArrayList<String> words = findWords(paragraph.substring(newpos)); // these are all the remaining words after the desired phrase
				ngram_tmp = desiredPhrase + " ";
				// It's "Nvalue-1" because one of the values is the search string itself.
				for (i=0; i < Nvalue-1 && i < words.size(); i++)
					ngram_tmp += words.get(i) + " ";
				if (i < Nvalue-1)
					reporter.incrCounter(Counters.INCOMPLETE_NGRAMS, 1);
				ngram_tmp = ngram_tmp.substring(0, ngram_tmp.length()-1); //remove the trailing space

				ngram.set(ngram_tmp);
				output.collect(ngram, new IntWritable(1));
				paragraph = paragraph.substring(newpos);
			} while (true);
		}
	}
	
	public static class FindJob_ReduceClass extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
		public void reduce(Text key, Iterator<IntWritable> values,
				 OutputCollector<Text, IntWritable> output, 
				 Reporter reporter) throws IOException {
			int sum = 0;
			while (values.hasNext()) {
				sum += values.next().get();
			}
			output.collect(key, new IntWritable(sum));
		}
	}

	public static class PruneJob_MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
		public void map(LongWritable key, Text value, 
				OutputCollector<Text, IntWritable> output, 
				Reporter reporter) throws IOException {
			String valuestr = value.toString();
			int tab = valuestr.lastIndexOf('\t');
			if (tab == -1) {
				System.err.println("PruneJob_MapClass::map() encountered a value without a tab character!");
				return;
			}
			String newkey = valuestr.substring(0, tab);
			int newvalue = new Integer(valuestr.substring(tab).trim()).intValue();
			output.collect(new Text(newkey), new IntWritable(newvalue));
		}
	}

	public static class PruneJob_ReduceClass extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
		private static IntWritable min_freq;
		public void configure(JobConf conf) {
			min_freq = new IntWritable(new Integer(conf.get("reducer.min-freq")).intValue());
		}

		public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
			IntWritable freq = values.next();
			if (min_freq.compareTo(freq) > 0)
				return;
			output.collect(key, freq);
		}
	}

	static int printUsage() {
		System.out.println("ngram [-m <find-maps>] [-r <find-reduces>] [-n <min-freq>] [-c] <input> <output> <phrase> <N>");
		System.out.println("\t-n <min-freq>       the minimum cutoff for the number of occurrences of the N-gram");
		System.out.println("\t-c                  search for N-grams is case-sensitive");
		ToolRunner.printGenericCommandUsage(System.out);
		return -1;
	}
	
	/**
	 * The main driver for the N-gram map/reduce program.
	 * Invoke this method to submit the map/reduce job.
	 * @throws IOException When there is communication problems with the 
	 *                     job tracker.
	 */
	public int run(String[] args) throws Exception {
		JobConf ngram_find_conf = new JobConf(getConf(), NGram.class),
			ngram_prune_conf = new JobConf(getConf(), NGram.class);
		int min_freq = 0;
		boolean caseSensitive = false;

		List<String> other_args = new ArrayList<String>();
		for(int i=0; i < args.length; ++i) {
			try {
				if ("-m".equals(args[i])) {
					ngram_find_conf.setNumMapTasks(Integer.parseInt(args[++i]));
				} else if ("-r".equals(args[i])) {
					ngram_find_conf.setNumReduceTasks(Integer.parseInt(args[++i]));
				} else if ("-n".equals(args[i])) {
					min_freq = Integer.parseInt(args[++i]);
				} else if ("-c".equals(args[i])) {
					caseSensitive = true;
				} else {
					other_args.add(args[i]);
				}
			} catch (NumberFormatException except) {
				System.out.println("ERROR: Integer expected instead of " + args[i]);
				return printUsage();
			} catch (ArrayIndexOutOfBoundsException except) {
				System.out.println("ERROR: Required parameter missing from " + args[i-1]);
				return printUsage();
			}
		}
		if (other_args.size() != 4) {
			System.out.println("ERROR: Wrong number of parameters: " + other_args.size() + " instead of 4.");
			return printUsage();
		}

		// We have two jobs:
		// 1. Find N-grams
		// 2. Sort the list by frequency and prune it

		Path tempDir = new Path("ngram-tmp-"+Integer.toString(new Random().nextInt()));
		ngram_find_conf.setJobName("ngram-find");
 		ngram_find_conf.setInputFormat(ParagraphInputFormat.class);
		ngram_find_conf.setOutputKeyClass(Text.class);
		ngram_find_conf.setOutputValueClass(IntWritable.class);
		ngram_find_conf.setMapperClass(FindJob_MapClass.class);        
		//ngram_find_conf.setCombinerClass(FindJob_ReduceClass.class);
		ngram_find_conf.setReducerClass(FindJob_ReduceClass.class);
		
		FileInputFormat.setInputPaths(ngram_find_conf, other_args.get(0));
		FileOutputFormat.setOutputPath(ngram_find_conf, tempDir);
		ngram_find_conf.set("mapper.desired-phrase", WordManipulator.strip(WordManipulator.fixCase(other_args.get(2), caseSensitive), true));
		ngram_find_conf.setInt("mapper.N-value", new Integer(other_args.get(3)).intValue());
		ngram_find_conf.setBoolean("mapper.case-sensitive", caseSensitive);

		JobClient.runJob(ngram_find_conf);

		// Why do we have two jobs?
		// No reason other than to demonstrate how to do it to my Cloud Computing class.
		// To make this code production-quality, put prune-reduce into find-reduce and
		// remove the prune stage completely.

		// job #2
		ngram_prune_conf.setJobName("ngram-prune");
		ngram_prune_conf.setInt("reducer.min-freq", min_freq);
		ngram_prune_conf.setOutputKeyClass(Text.class);
		ngram_prune_conf.setOutputValueClass(IntWritable.class);
		ngram_prune_conf.setMapperClass(PruneJob_MapClass.class);        
		//ngram_prune_conf.setCombinerClass(PruneJob_ReduceClass.class);
		ngram_prune_conf.setReducerClass(PruneJob_ReduceClass.class);
		FileInputFormat.setInputPaths(ngram_prune_conf, tempDir);
		FileOutputFormat.setOutputPath(ngram_prune_conf, new Path(other_args.get(1)));
		JobClient.runJob(ngram_prune_conf);
		FileSystem.get(ngram_prune_conf).delete(tempDir, true);
		return 0;
	}
	
	
	public static void main(String[] args) throws Exception {
		int res = ToolRunner.run(new Configuration(), new NGram(), args);
		System.exit(res);
	}

}
