import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;

/**
 * ParagraphRecordReader is a light wrapper around <code>LineRecordReader</code>
 * that overrides <code>next()</code> to return a full paragraph rather than a line.
 *
 * @author Philip M. White
 */
public class ParagraphRecordReader implements RecordReader<LongWritable, Text> {
	private LineRecordReader lrr;

	public ParagraphRecordReader(Configuration job, FileSplit split) throws IOException {
		lrr = new LineRecordReader(job, split);
	}

	public ParagraphRecordReader(InputStream in, long offset, long endOffset, int maxLineLength) {
		lrr = new LineRecordReader(in, offset, endOffset, maxLineLength);
	}

	public ParagraphRecordReader(InputStream in, long offset, long endOffset, Configuration job) throws IOException {
		lrr = new LineRecordReader(in, offset, endOffset, job);
	}
	
	public void close() throws IOException {
		lrr.close();
	}

	public LongWritable createKey() {
		return lrr.createKey();
	}
	
	public Text createValue() {
		return lrr.createValue();
	}
	
	public long getPos() throws IOException {
		return lrr.getPos();
	}

	public float getProgress() {
		return lrr.getProgress();
	}

	/**
	 * Fills <code>value</code> with text from an input file until encountering either
	 * EOF or a blank line.  Between each line that it reads from the input file, this
	 * function adds a single space.
	 * @return True iff <code>value</code> was updated.
	 */
	public synchronized boolean next(LongWritable key, Text value) throws IOException {
		Text linevalue = new Text();
		boolean appended, gotsomething;
		boolean retval;
		byte space[] = {' '};

		value.clear();
		gotsomething = false;
		do {
			appended = false;
			retval = lrr.next(key, linevalue);
			if (retval) {
				if (linevalue.toString().length() > 0) {
					byte[] rawline = linevalue.getBytes();
					int rawlinelen = linevalue.getLength();
					value.append(rawline, 0, rawlinelen);
					value.append(space, 0, 1);
					appended = true;
				}
				gotsomething = true;
			}
		} while (appended);
		//System.out.println("ParagraphRecordReader::next() returns "+gotsomething+" after setting value to: ["+value.toString()+"]");
		return gotsomething;
	}
}
