import java.io.*;

import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RecordReader;

/**
 * ParagraphInputFormat supports reading one paragraph at a time.
 * This class is based on Hadoop's TextInputFormat.
 * The only reason for this file's existence is to return ParagraphRecordReader
 * in getRecordReader().
 */
public class ParagraphInputFormat extends FileInputFormat<LongWritable, Text>
	implements JobConfigurable {

	private CompressionCodecFactory compressionCodecs = null;
	
	public void configure(JobConf conf) {
		compressionCodecs = new CompressionCodecFactory(conf);
	}
	
	protected boolean isSplitable(FileSystem fs, Path file) {
		return compressionCodecs.getCodec(file) == null;
	}

	public RecordReader<LongWritable, Text> getRecordReader(InputSplit genericSplit, JobConf job, Reporter reporter)
		throws IOException {
		
		reporter.setStatus(genericSplit.toString());
		return new ParagraphRecordReader(job, (FileSplit) genericSplit);
	}
}
