下面说说JAVA api 提供的这些类的功能和他们之间有什么样的联系。1.HBaseConfiguration关系:org.apache.hadoop.hbase.HBaseConfiguration作用:
下面说说JAVA api 提供的这些类的功能和他们之间有什么样的联系。
关系:org.apache.hadoop.hbase.HBaseConfiguration
作用:通过此类可以对HBase进行配置
用法实例: Configuration config = HBaseConfiguration.create();
说明: HBaseConfiguration.create() 默认会从classpath 中查找 hbase-site.xml 中的配置信息,初始化 Configuration。
2.HBaseAdmin 类
关系:org.apache.hadoop.hbase.client.HBaseAdmin
作用:提供接口关系HBase 数据库中的表信息
用法:HBaseAdmin admin = new HBaseAdmin(config);
3.Descriptor类
关系:org.apache.hadoop.hbase.HTableDescriptor
作用:HTableDescriptor 类包含了表的名字以及表的列族信息
用法:HTableDescriptor htd =new HTableDescriptor(tablename);
构造一个表描述符指定TableName对象。
Htd.addFamily(new HColumnDescriptor(“myFamily”));
将列家族给定的描述符
4.HTable
关系:org.apache.hadoop.hbase.client.HTable
作用:HTable 和 HBase 的表通信
用法:HTable tab = new HTable(config,Bytes.toBytes(tablename));
ResultScanner sc = tab.getScanner(Bytes.toBytes(“familyName”));
说明:获取表内列族 familyNme 的所有数据。
5.Put
关系:org.apache.hadoop.hbase.client.Put
作用:获取单个行的数据
用法:HTable table = new HTable(config,Bytes.toBytes(tablename));
Put put = new Put(row);
p.add(family,qualifier,value);
说明:向表 tablename 添加 “family,qualifier,value”指定的值。
6.Get
关系:org.apache.hadoop.hbase.client.Get
作用:获取单个行的数据
用法:HTable table = new HTable(config,Bytes.toBytes(tablename));
Get get = new Get(Bytes.toBytes(row));
Result result = table.get(get);
说明:获取 tablename 表中 row 行的对应数据
7.ResultScanner
关系:Interface
作用:获取值的接口
用法:ResultScanner scanner = table.getScanner(Bytes.toBytes(family));
For(Result rowResult : scanner){
Bytes[] str = rowResult.getValue(family,column);
}
说明:循环获取行中列值。
例1 HBase之读取HDFS数据写入HBase
package org.hadoop.hbase;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapReduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFORMat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCountHbaseWriter {
public static class WordCountHbaseMapper extends
Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);// 输出<key,value>为<word,one>
}
}
}
public static class WordCountHbaseReducer extends
TableReducer<Text, IntWritable, ImmutableBytesWritable> {
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {// 遍历求和
sum += val.get();
}
Put put = new Put(key.getBytes());//put实例化,每一个词存一行
//列族为content,列修饰符为count,列值为数目
put.add(Bytes.toBytes("content"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum)));
context.write(new ImmutableBytesWritable(key.getBytes()), put);// 输出求和后的<key,value>
}
}
public static void main(String[] args){
String tablename = "wordcount";
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.ZooKeeper.quorum", "192.168.1.139");
conf.set("hbase.zookeeper.property.clientPort", "2191");
HBaseAdmin admin = null;
try {
admin = new HBaseAdmin(conf);
if(admin.tableExists(tablename)){
System.out.println("table exists!recreating.......");
admin.disableTable(tablename);
admin.deleteTable(tablename);
}
HTableDescriptor htd = new HTableDescriptor(tablename);
HColumnDescriptor tcd = new HColumnDescriptor("content");
htd.addFamily(tcd);//创建列族
admin.createTable(htd);//创建表
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 1) {
System.err.println("Usage: WordCountHbaseWriter <in>");
System.exit(2);
}
Job job = new Job(conf, "WordCountHbaseWriter");
job.setNumReduceTasks(2);
job.setjarByClass(WordCountHbaseWriter.class);
//使用WordCountHbaseMapper类完成Map过程;
job.setMapperClass(WordCountHbaseMapper.class);
TableMapReduceUtil.initTableReducerJob(tablename, WordCountHbaseReducer.class, job);
//设置任务数据的输入路径;
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
//设置了Map过程的输出类型,其中设置key的输出类型为Text;
job.setOutpuTKEyClass(Text.class);
//设置了Map过程的输出类型,其中设置value的输出类型为IntWritable;
job.setOutputValueClass(IntWritable.class);
//调用job.waitForCompletion(true) 执行任务,执行成功后退出;
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
} finally{
if(admin!=null)
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
例2 HBase之读取HBase数据写入HDFS
package org.hadoop.hbase;
import java.io.IOException;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCountHbaseReader {
public static class WordCountHbaseReaderMapper extends
TableMapper<Text,Text>{
@Override
protected void map(ImmutableBytesWritable key,Result value,Context context)
throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer("");
for(Entry<byte[],byte[]> entry:value.getFamilyMap("content".getBytes()).entrySet()){
String str = new String(entry.getValue());
//将字节数组转换为String类型
if(str != null){
sb.append(new String(entry.getKey()));
sb.append(":");
sb.append(str);
}
context.write(new Text(key.get()), new Text(new String(sb)));
}
}
}
public static class WordCountHbaseReaderReduce extends Reducer<Text,Text,Text,Text>{
private Text result = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
for(Text val:values){
result.set(val);
context.write(key, result);
}
}
}
public static void main(String[] args) throws Exception {
String tablename = "wordcount";
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "192.168.1.139");
conf.set("hbase.zookeeper.property.clientPort", "2191");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 1) {
System.err.println("Usage: WordCountHbaseReader <out>");
System.exit(2);
}
Job job = new Job(conf, "WordCountHbaseReader");
job.setJarByClass(WordCountHbaseReader.class);
//设置任务数据的输出路径;
FileOutputFormat.setOutputPath(job, new Path(otherArgs[0]));
job.setReducerClass(WordCountHbaseReaderReduce.class);
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob(tablename,scan,WordCountHbaseReaderMapper.class, Text.class, Text.class, job);
//调用job.waitForCompletion(true) 执行任务,执行成功后退出;
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
程序中用到hadoop的相关JAR包(如下图)及hbase所有jar包
如果上面的API还不能满足你的要求,可以到下面这个网站里面Hbase全部API介绍
Http://www.yiibai.com/hbase/
--结束END--
本文标题: HBase和HDFS数据互导程序
本文链接: https://lsjlt.com/news/44095.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-10-23
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
2024-10-22
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0