返回顶部
首页 > 资讯 > 数据库 >HBase和HDFS数据互导程序
  • 314
分享到

HBase和HDFS数据互导程序

2024-04-02 19:04:59 314人浏览 安东尼
摘要

下面说说JAVA api 提供的这些类的功能和他们之间有什么样的联系。1.HBaseConfiguration关系:org.apache.hadoop.hbase.HBaseConfiguration作用:




下面说说JAVA api 提供的这些类的功能和他们之间有什么样的联系。


1.HBaseConfiguration

关系:org.apache.hadoop.hbase.HBaseConfiguration

作用:通过此类可以对HBase进行配置

用法实例: Configuration config = HBaseConfiguration.create();

说明: HBaseConfiguration.create() 默认会从classpath 中查找 hbase-site.xml 中的配置信息,初始化 Configuration。

2.HBaseAdmin 类

关系:org.apache.hadoop.hbase.client.HBaseAdmin

作用:提供接口关系HBase 数据库中的表信息

用法:HBaseAdmin admin = new HBaseAdmin(config);

3.Descriptor类

关系:org.apache.hadoop.hbase.HTableDescriptor

作用:HTableDescriptor 类包含了表的名字以及表的列族信息

用法:HTableDescriptor htd =new HTableDescriptor(tablename);

             构造一个表描述符指定TableName对象。

             Htd.addFamily(new HColumnDescriptor(“myFamily”));

             将列家族给定的描述符

4.HTable

关系:org.apache.hadoop.hbase.client.HTable

作用:HTable 和 HBase 的表通信

用法:HTable tab = new HTable(config,Bytes.toBytes(tablename));

           ResultScanner sc = tab.getScanner(Bytes.toBytes(“familyName”));

说明:获取表内列族 familyNme 的所有数据。

5.Put

关系:org.apache.hadoop.hbase.client.Put

作用:获取单个行的数据

用法:HTable table = new HTable(config,Bytes.toBytes(tablename));

           Put put = new Put(row);

           p.add(family,qualifier,value);

说明:向表 tablename 添加 “family,qualifier,value”指定的值。

6.Get

关系:org.apache.hadoop.hbase.client.Get

作用:获取单个行的数据

用法:HTable table = new HTable(config,Bytes.toBytes(tablename));

           Get get = new Get(Bytes.toBytes(row));

           Result result = table.get(get);

说明:获取 tablename 表中 row 行的对应数据

7.ResultScanner

关系:Interface

作用:获取值的接口

用法:ResultScanner scanner = table.getScanner(Bytes.toBytes(family));

           For(Result rowResult : scanner){

                   Bytes[] str = rowResult.getValue(family,column);

}

说明:循环获取行中列值。



例1 HBase之读取HDFS数据写入HBase

package org.hadoop.hbase;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapReduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFORMat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCountHbaseWriter {
 public static class WordCountHbaseMapper extends
   Mapper<Object, Text, Text, IntWritable> {
  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();
  public void map(Object key, Text value, Context context)
    throws IOException, InterruptedException {
   StringTokenizer itr = new StringTokenizer(value.toString());
   while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    context.write(word, one);// 输出<key,value>为<word,one>
   }
  }
 }
 public static class WordCountHbaseReducer extends
   TableReducer<Text, IntWritable, ImmutableBytesWritable> {
  public void reduce(Text key, Iterable<IntWritable> values,
    Context context) throws IOException, InterruptedException {
   int sum = 0;
   for (IntWritable val : values) {// 遍历求和
    sum += val.get();
   }
   Put put = new Put(key.getBytes());//put实例化,每一个词存一行
   //列族为content,列修饰符为count,列值为数目
   put.add(Bytes.toBytes("content"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum)));
   context.write(new ImmutableBytesWritable(key.getBytes()), put);// 输出求和后的<key,value>
  }
 }
 
 public static void main(String[] args){
  String tablename = "wordcount";
  Configuration conf = HBaseConfiguration.create();
    conf.set("hbase.ZooKeeper.quorum", "192.168.1.139");
    conf.set("hbase.zookeeper.property.clientPort", "2191");
  HBaseAdmin admin = null;
  try {
   admin = new HBaseAdmin(conf);
   if(admin.tableExists(tablename)){
    System.out.println("table exists!recreating.......");
    admin.disableTable(tablename);
    admin.deleteTable(tablename);
   }
   HTableDescriptor htd = new HTableDescriptor(tablename);
   HColumnDescriptor tcd = new HColumnDescriptor("content");
   htd.addFamily(tcd);//创建列族
   admin.createTable(htd);//创建表
   String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
      if (otherArgs.length != 1) {
        System.err.println("Usage: WordCountHbaseWriter <in>");
        System.exit(2);
      }
      Job job = new Job(conf, "WordCountHbaseWriter");
  job.setNumReduceTasks(2);
      job.setjarByClass(WordCountHbaseWriter.class);
   //使用WordCountHbaseMapper类完成Map过程;
      job.setMapperClass(WordCountHbaseMapper.class);
      TableMapReduceUtil.initTableReducerJob(tablename, WordCountHbaseReducer.class, job);
      //设置任务数据的输入路径;
      FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
   //设置了Map过程的输出类型,其中设置key的输出类型为Text;
      job.setOutpuTKEyClass(Text.class);
   //设置了Map过程的输出类型,其中设置value的输出类型为IntWritable;
      job.setOutputValueClass(IntWritable.class);
   //调用job.waitForCompletion(true) 执行任务,执行成功后退出;
      System.exit(job.waitForCompletion(true) ? 0 : 1);
  } catch (Exception e) {
   e.printStackTrace();
  } finally{
   if(admin!=null)
    try {
     admin.close();
    } catch (IOException e) {
     e.printStackTrace();
    }
  }
  
 }
}


例2 HBase之读取HBase数据写入HDFS

package org.hadoop.hbase;
import java.io.IOException;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCountHbaseReader {
 
 public static class WordCountHbaseReaderMapper extends 
    TableMapper<Text,Text>{
    @Override
    protected void map(ImmutableBytesWritable key,Result value,Context context)
            throws IOException, InterruptedException {
        StringBuffer sb = new StringBuffer("");
        for(Entry<byte[],byte[]> entry:value.getFamilyMap("content".getBytes()).entrySet()){
            String str =  new String(entry.getValue());
            //将字节数组转换为String类型
            if(str != null){
                sb.append(new String(entry.getKey()));
                sb.append(":");
                sb.append(str);
            }
            context.write(new Text(key.get()), new Text(new String(sb)));
        }
    }
}
 public static class WordCountHbaseReaderReduce extends Reducer<Text,Text,Text,Text>{
     private Text result = new Text();
     @Override
     protected void reduce(Text key, Iterable<Text> values,Context context)
             throws IOException, InterruptedException {
         for(Text val:values){
             result.set(val);
             context.write(key, result);
         }
     }
 }
 
 public static void main(String[] args) throws Exception {
     String tablename = "wordcount";
     Configuration conf = HBaseConfiguration.create();
     conf.set("hbase.zookeeper.quorum", "192.168.1.139");
     conf.set("hbase.zookeeper.property.clientPort", "2191");
     
     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
     if (otherArgs.length != 1) {
       System.err.println("Usage: WordCountHbaseReader <out>");
       System.exit(2);
     }
     Job job = new Job(conf, "WordCountHbaseReader");
     job.setJarByClass(WordCountHbaseReader.class);
     //设置任务数据的输出路径;
     FileOutputFormat.setOutputPath(job, new Path(otherArgs[0]));
     job.setReducerClass(WordCountHbaseReaderReduce.class);
     Scan scan = new Scan();
     TableMapReduceUtil.initTableMapperJob(tablename,scan,WordCountHbaseReaderMapper.class, Text.class, Text.class, job);
     //调用job.waitForCompletion(true) 执行任务,执行成功后退出;
     System.exit(job.waitForCompletion(true) ? 0 : 1);

 }
}


程序中用到hadoop的相关JAR包(如下图)及hbase所有jar包

HBase和HDFS数据互导程序

如果上面的API还不能满足你的要求,可以到下面这个网站里面Hbase全部API介绍

Http://www.yiibai.com/hbase/

 

您可能感兴趣的文档:

--结束END--

本文标题: HBase和HDFS数据互导程序

本文链接: https://lsjlt.com/news/44095.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • HBase和HDFS数据互导程序
    下面说说JAVA API 提供的这些类的功能和他们之间有什么样的联系。1.HBaseConfiguration关系:org.apache.hadoop.hbase.HBaseConfiguration作用:...
    99+
    2024-04-02
  • 怎么将HBase的数据导入HDFS
    这篇文章主要介绍了怎么将HBase的数据导入HDFS,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。实践:将HBase数据导入HDFS如果将要在MapReduce中使用的客户数...
    99+
    2023-06-02
  • HBase 与Hive数据交互整合过程详解
    Hive和Hbase整合理论1、为什么hive要和hbase整合2、整合的优缺点优点:(1).Hive方便地提供了Hive QL的接口来简化MapReduce的使用,  而HBase提供了低延迟的...
    99+
    2024-04-02
  • 通用MapReduce程序复制HBase表数据
    编写MR程序,让其可以适合大部分的HBase表数据导入到HBase表数据。其中包括可以设置版本数、可以设置输入表的列导入设置(选取其中某几列)、可以设置输出表的列导出设置(选取其中某几列)。 原始表test...
    99+
    2024-04-02
  • hdfs/hbase 程序利用Kerberos认证超过ticket_lifetime期限后异常
    问题描述 业务需要一个长期运行的程序,将上传的文件存放至HDFS,程序启动后,刚开始一切正常,执行一段时间(一般是一天,有的现场是三天),就会出现认证错误,用的JDK是1.8,hadoop-client,对应的版本是2.5.1,为...
    99+
    2016-11-12
    hdfs/hbase 程序利用Kerberos认证超过ticket_lifetime期限后异常
  • HDFS数据存储流程
    HDFS即Hadoop Distributed File System, HDFS存储数据的流程如下: client跟NameNode交互1.1、client 发消息给NameNode,NameNode检查client是否有写的权限。如果...
    99+
    2019-11-16
    HDFS数据存储流程
  • ORACLE和MYSQL怎么互相导入数据
    这篇文章主要讲解了“ORACLE和MYSQL怎么互相导入数据”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“ORACLE和MYSQL怎么互相导入数据”吧! ...
    99+
    2024-04-02
  • mysqlimport - MySQL 数据导入程序
    mysqlimport 客户端附带一个命令行界面,可帮助执行 LOAD DATA SQL 语句。 mysqlimport 的大多数选项直接响应 LOAD DATA 语法的子句调用 mysqlimport可以调用实用程序 mysqlimpor...
    99+
    2023-10-22
  • MapReduce将文本数据导入到HBase中
    整体描述:将本地文件的数据整理之后导入到hbase中在HBase中创建表数据格式MapReduce程序map程序package com.hadoop.mapreduce.test.map; im...
    99+
    2024-04-02
  • 怎么从hbase读取数据导入mongodb
    要将数据从HBase导入MongoDB,可以使用以下步骤:1. 首先,连接到HBase并查询要导出的数据。可以使用Java编程语言或...
    99+
    2023-08-23
    hbase mongodb
  • 如何使用sqoop从oracle导数据到Hbase
    这篇文章主要为大家展示了“如何使用sqoop从oracle导数据到Hbase”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“如何使用sqoop从oracle导数据...
    99+
    2024-04-02
  • MySql Oracle数据互导简例(P
    工作需要,简单的写了一个互导的小功能,对于量大的数据处理还有待优化多的不说了,直接上代码,需要的可以看看代码注释欢迎批评指正 :)##coding=utf8 #单线程,分批导入 #Author : Jeen @ 2013...
    99+
    2023-01-31
    数据 MySql Oracle
  • hbase之python利用thrift操作hbase数据和shell操作
    前沿:        以前都是用mongodb的,但是量大了,mongodb显得不那么靠谱,改成hbase撑起一个量级。HBase是Apache Hadoop的数据库...
    99+
    2024-04-02
  • 怎么在HDFS中组织和使用数据
    这篇文章主要为大家展示了“怎么在HDFS中组织和使用数据”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“怎么在HDFS中组织和使用数据”这篇文章吧。4.1 组织数据组织数据是使用Hadoop最具挑...
    99+
    2023-06-02
  • HDFS的元数据如何存储和访问
    HDFS(Hadoop分布式文件系统)的元数据存储在称为NameNode的主服务器上,这些元数据包括文件和目录的信息,例如文件名、文...
    99+
    2024-05-08
    HDFS
  • HDFS/HBase技术报告·分布式数据库设计架构的全面解析
    Hadoop生态的分布式数据库 1、什么是分布式数据库? 从狭义的理解就是分布式关系型数据库,主要特指目前热门的NewSQL。 从广义的理解,分库分表的传统关系型数据库,传统关系型数据库集群,关系型数据库的主从架构,分布式KV数据库(例如...
    99+
    2021-06-21
    HDFS/HBase技术报告·分布式数据库设计架构的全面解析
  • 「从零单排HBase 09」HBase的那些数据结构和算法
    而对HBase的学习,也离不开索引结构的学习,它使用了一种LSM树((Log-Structured Merge-Tree))的索引结构。 下面,我们就结合HBase的实现,来深入了解HBase的核心数据结构与算法,包括索引结构LSM树,内...
    99+
    2016-05-15
    「从零单排HBase 09」HBase的那些数据结构和算法
  • hbase怎么实现数据的时间序列化
    在HBase中,数据的时间序列化通常通过以下方式实现: 使用Timestamp作为数据的时间戳:在HBase中,每条数据都会有一...
    99+
    2024-03-05
    hbase
  • HBase中怎么备份和恢复数据
    在HBase中备份和恢复数据通常有两种方法: 使用HBase自带的工具备份和恢复数据: 备份数据:可以使用HBase的命令行工...
    99+
    2024-03-11
    Hbase
  • mysql数据库和oracle数据库之间互相导入备份的示例分析
    小编给大家分享一下mysql数据库和oracle数据库之间互相导入备份的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!把...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作