/**
* (C) 2010-2011 Alibaba Group Holding Limited.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* version 2 as published by the Free Software Foundation.
*
*/
package com.taobao.datax.plugins.writer.hdfswriter;
import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.lang.reflect.Method;
import java.net.URI;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.log4j.Logger;
import com.taobao.datax.common.exception.DataExchangeException;
import com.taobao.datax.common.exception.ExceptionTracker;
import com.taobao.datax.common.plugin.Line;
import com.taobao.datax.common.plugin.LineReceiver;
import com.taobao.datax.common.plugin.PluginParam;
import com.taobao.datax.common.plugin.PluginStatus;
import com.taobao.datax.common.plugin.Writer;
import com.taobao.datax.plugins.common.DFSUtils;
public class HdfsWriter extends Writer {
private static final Logger logger = Logger.getLogger(HdfsWriter.class);
private FileSystem fs;
private Path p = null;
private char FIELD_SPLIT = '\u0001';
private char LINE_SPLIT = '\n';
private int BUFFER_SIZE = 8 * 1024;
private String ENCODING = "UTF-8";
private String delMode = "3";
private String hadoop_conf = "";
private int concurrency = 10;
private char[] nullChars = null;
private static char[] searchChars = new char[2];
private DfsWriterStrategy dfsWriterStrategy = null;
static {
Thread.currentThread().setContextClassLoader(HdfsWriter.class.getClassLoader());
}
/*
* NOTE: if user set parameter 'splitnum' to 1, which means no-split in
* hdfswriter, we use dir + prefixname as the fixed hdfs-file name for
* example: dir = hdfs://taobao/dw prefixname = bazhen.csy splitname = 1 we
* use hdfs://taobao/dw/bazhen.csy as the target filename which hdfswriter
* dump file to
*
* for other cases, we use prefixname as just prefix filename, for example
* dir = hdfs://taobao/dw prefixname = bazhen.csy splitname = 2 at last, the
* generated filename will be hdfs://taobao/dw/bazhen.csy-0
* hdfs://taobao/dw/bazhen.csy-1 the suffix is thread number
*/
@Override
public int prepare(PluginParam param) {
String dir = param.getValue(ParamKey.dir);
String ugi = param.getValue(ParamKey.ugi, null);
String prefixname = param.getValue(ParamKey.prefixname,
"prefix");
delMode = param.getValue(ParamKey.delMode, this.delMode);
concurrency = param.getIntValue(ParamKey.concurrency, 1);
hadoop_conf = param.getValue(ParamKey.hadoop_conf, "");
if (dir.endsWith("*")) {
dir = dir.substring(0, dir.lastIndexOf("*"));
}
if (dir.endsWith("/")) {
dir = dir.substring(0, dir.lastIndexOf("/"));
}
Path rootpath = new Path(dir);
try {
fs = DFSUtils.createFileSystem(new URI(dir),
DFSUtils.getConf(dir, ugi, hadoop_conf));
/* No split to dump file, use dir as absolute filename . */
if (concurrency == 1) {
DFSUtils.deleteFile(fs, new Path(dir + "/" + prefixname), true);
}
/* use dir as directory path . */
else {
if ("4".equals(delMode))
DFSUtils.deleteFiles(fs, rootpath, true, true);
else if ("3".equals(delMode))
DFSUtils.deleteFiles(fs, new Path(dir + "/" + prefixname
+ "-*"), true, true);
}
} catch (Exception e) {
logger.error(ExceptionTracker.trace(e));
throw new DataExchangeException(String.format(
"HdfsWriter Init file system failed:%s,%s", e.getMessage(),
e.getCause()));
} finally {
closeAll();
}
return PluginStatus.SUCCESS.value();
}
@Override
public List<PluginParam> split(PluginParam param) {
HdfsFileSplitter spliter = new HdfsFileSplitter();
spliter.setParam(param);
spliter.init();
return spliter.split();
}
@Override
public int init() {
FIELD_SPLIT = param.getCharValue(ParamKey.fieldSplit,
FIELD_SPLIT);
ENCODING = param.getValue(ParamKey.encoding, ENCODING);
LINE_SPLIT = param.getCharValue(ParamKey.lineSplit,
LINE_SPLIT);
searchChars[0] = FIELD_SPLIT;
searchChars[1] = LINE_SPLIT;
BUFFER_SIZE = param.getIntValue(ParamKey.bufferSize,
BUFFER_SIZE);
delMode = param.getValue(ParamKey.delMode, this.delMode);
nullChars = param.getValue(ParamKey.nullChar, "")
.toCharArray();
hadoop_conf = param.getValue(ParamKey.hadoop_conf, "");
String ugi = param.getValue(ParamKey.ugi, null);
String dir = param.getValue(ParamKey.dir);
try {
fs = DFSUtils.createFileSystem(new URI(dir),
DFSUtils.getConf(dir, ugi, hadoop_conf));
} catch (Exception e) {
logger.error(ExceptionTracker.trace(e));
closeAll();
throw new DataExchangeException(String.format(
"HdfsWriter Initialize file system failed:%s,%s",
e.getMessage(), e.getCause()));
}
if (dir != null) {
p = new Path(dir);
} else {
closeAll();
throw new DataExchangeException("Can't find the param ["
+ ParamKey.dir + "] in hdfs-writer-param.");
}
String filetype = param.getValue(ParamKey.fileType, "TXT");
if ("SEQ".equalsIgnoreCase(filetype)
|| "SEQ_COMP".equalsIgnoreCase(filetype))
dfsWriterStrategy = new DfsWriterSequeueFileStrategy();
else if ("TXT_COMP".equalsIgnoreCase(filetype))
dfsWriterStrategy = new DfsWriterTextFileStrategy(true);
else if ("TXT".equalsIgnoreCase(filetype))
dfsWriterStrategy = new DfsWriterTextFileStrategy(false);
else {
closeAll();
throw new DataExchangeException(
"HdfsWriter cannot recognize filetype: " + filetype);
}
return PluginStatus.SUCCESS.value();
}
@Override
public int connect() {
if (p == null) {
closeAll();
throw new DataExchangeException(
"HdfsWriter Can't initialize file system .");
}
try {
if ("2".equals(delMode))
DFSUtils.deleteFile(fs, p, true);
dfsWriterStrategy.open();
getMonitor().setStatus(PluginStatus.CONNECT);
return PluginStatus.SUCCESS.value();
} catch (Exception ex) {
closeAll();
logger.error(ExceptionTracker.trace(ex));
throw new DataExchangeException(String.format(
"HdfsWriter initialize file system failed: %s, %s",
ex.getMessage(), ex.getCause()));
}
}
@Override
public int startWrite(LineReceiver receiver) {
getMonitor().setStatus(PluginStatus.WRITE);
try {
dfsWriterStrategy.write(receiver);
} catch (Exception ex) {
throw new DataExchangeException(String.format(
"Some errors occurs on starting writing: %s,%s",
ex.getMessage(), ex.getCause()));
} finally {
dfsWriterStrategy.close();
closeAll();
}
return PluginStatus.SUCCESS.value();
}
@Override
public int commit() {
return PluginStatus.SUCCESS.value();
}
@Override
public int finish() {
closeAll();
getMonitor().setStatus(PluginStatus.WRITE_OVER);
return PluginStatus.SUCCESS.value();
}
private void closeAll() {
try {
IOUtils.closeStream(fs);
} catch (Exception e) {
throw new DataExchangeException(String.format(
"HdfsWriter closing filesystem failed: %s,%s",
e.getMessage(), e.getCause()));
}
}
@Override
public int cleanup() {
closeAll();
return PluginStatus.SUCCESS.value();
}
public interface DfsWriterStrategy {
void open();
void write(LineReceiver receiver);
void close();
}
class DfsWriterSequeueFileStrategy implements DfsWriterStrategy {
private Configuration conf = null;
private SequenceFile.Writer writer = null;
private Writable key = null;
private Writable value = null;
private boolean compressed = false;
private String keyClassName = null;
private String valueClassName = null;
priva