数据导入Hadoop的多种方法
立即解锁
发布时间: 2025-08-25 00:55:09 订阅数: 6 

# 数据导入 Hadoop 的多种方法
## 1. HBase 数据导入 HDFS
### 1.1 问题与解决方案
当需要将 HBase 中的客户数据用于 MapReduce 并与 HDFS 中的数据结合时,有两种选择。一种是编写 MapReduce 作业,在 map 或 reduce 代码中直接从 HBase 拉取数据;另一种是将 HBase 数据直接转储到 HDFS 中,特别是当计划在多个 MapReduce 作业中使用该数据,且 HBase 数据不可变或很少更改时,后一种方法更有用。
HBase 提供了一个 Export 类,可以将 HBase 数据以 SequenceFile 格式导入 HDFS。同时,还可以通过代码读取导入的 HBase 数据。
### 1.2 操作步骤
1. **启动并加载数据到 HBase**:在开始之前,需要启动 HBase 并加载一些数据。可以使用以下命令运行加载器,将示例股票数据加载到 HBase 中:
```bash
$ hip hip.ch5.hbase.HBaseWriter \
--input test-data/stocks.txt
```
使用 HBase shell 查看加载结果:
```bash
$ hbase shell
hbase(main):012:0> list
TABLE
stocks_example
1 row(s) in 0.0100 seconds
hbase(main):007:0> scan 'stocks_example'
ROW
COLUMN+CELL
AAPL2000-01-03
column=details:stockAvro, timestamp=1322315975123,...
AAPL2001-01-02
column=details:stockAvro, timestamp=1322315975123,...
...
```
2. **将 HBase 数据导出到 HDFS**:使用 HBase 的 Export 类导出数据。以下命令可以导出整个 HBase 表:
```bash
$ hip org.apache.hadoop.hbase.mapreduce.Export \
stocks_example \
output
```
也可以只导出单个列族并压缩输出:
```bash
$ hip org.apache.hadoop.hbase.mapreduce.Export \
-D hbase.mapreduce.scan.column.family=details \
-D mapred.output.compress=true \
-D mapred.output.compression.codec=\
org.apache.hadoop.io.compress.SnappyCodec \
stocks_example output
```
3. **读取导出的数据**:以下是读取 HBase SequenceFile 并提取 Avro 股票记录的代码:
```java
import static com.manning.hip.ch5.HBaseWriteAvroStock.*;
public class HBaseExportedStockReader {
public static void main(String... args) throws IOException {
read(new Path(args[0]));
}
public static void read(Path inputPath) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
SequenceFile.Reader reader =
new SequenceFile.Reader(fs, inputPath, conf);
HBaseScanAvroStock.AvroStockReader stockReader =
new HBaseScanAvroStock.AvroStockReader();
try {
ImmutableBytesWritable key = new ImmutableBytesWritable();
Result value = new Result();
while (reader.next(key, value)) {
Stock stock = stockReader.decode(value.getValue(
STOCK_DETAILS_COLUMN_FAMILY_AS_BYTES,
STOCK_COLUMN_QUALIFIER_AS_BYTES));
System.out.println(new String(key.get()) + ": " +
ToStringBuilder
.reflectionToString(stock, ToStringStyle.SIMPLE_STYLE));
}
} finally {
reader.close();
}
}
}
```
运行代码查看结果:
```bash
$ hip hip.ch5.hbase.ExportedReader \
--input output/part-m-00000
AAPL2000-01-03: AAPL,2000-01-03,104.87,...
AAPL2001-01-02: AAPL,2001-01-02,14.88,...
AAPL2002-01-02: AAPL,2002-01-02,22.05,...
...
```
## 2. 以 HBase 为数据源的 MapReduce 作业
### 2.1 问题与解决方案
内置的 HBase 导出器使用 SequenceFile 写出 HBase 数据,这种格式仅受 Java 支持,不支持模式演变,且仅支持 Hadoop 文件系统作为数据接收器。如果想对 HBase 数据提取有更多控制,可能需要使用 HBase 的 TableInputFormat 类,直接在 MapReduce 作业中操作 HBase。
### 2.2 操作步骤
以下是一个使用 TableInputFormat 类从 HBase 读取数据的 MapReduce 作业示例:
```java
public class HBaseSourceMapReduce extends
TableMapper<Text, DoubleWritable> {
private HBaseScanAvroStock.AvroStockReader stockReader;
private Text outputKey = new Text();
private DoubleWritable outputValue = new DoubleWritable();
@Override
protected void setup(
Context context)
throws IOException, InterruptedException {
stockReader = new HBaseScanAvroStock.AvroStockReader();
}
@Override
public void map(ImmutableBytesWritable row, Result columns
```
0
0
复制全文
相关推荐









