hadoop上传文件功能实例代码

作者:mrr 时间:2021-11-01 00:57:59 

hdfs上的文件是手动执行命令从本地linux上传至hdfs的。在真实的运行环境中,我们不可能每次手动执行命令上传的,这样太过繁琐。那么,我们可以使用hdfs提供的Java api实现文件上传至hdfs,或者直接从ftp上传至hdfs。 

然而,需要说明一点,之前笔者是要运行MR,都需要每次手动执行yarn jar,在实际的环境中也不可能每次手动执行。像我们公司是使用了索答的调度平台/任务监控平台,可以定时的以工作流执行我们的程序,包括普通java程序和MR。其实,这个调度平台就是使用了quartz。当然,这个调度平台也提供其它的一些功能,比如web展示、日志查看等,所以也不是免费的。 

首先,给大家简单介绍一下hdfs。hdfs是以流式数据访问模式来存储超大文件,hdfs的构建思路是一次写入,多次读取,这样才是最高效的访问模式。hdfs是为高数据吞吐量应用优化的,所以会以提高时间延迟为代价。对于低延时的访问需求,我们可以使用hbase。 

然后,还要知道hdfs中块(block)的概念,默认为64MB。块是hdfs的数据读写的最小单位,通常每个map任务一次只处理一个block,像我们对集群性能评估就会使用到这个概念,比如目前有多少节点,每个节点的磁盘空间、cpu以及所要处理的数据量、网络带宽,通过这些信息来进行性能评估。我们可以使用Hadoop fsck / -files -blocks列出文件系统中各个文件由哪些块构成。 

然后,再就是要知道namenode和datanode,这个在之前的博文已经介绍过,下面看看cm环境中hdfs的管理者(namenode)和工作者(datanode),如下 

hadoop上传文件功能实例代码

在yarn环境中是可以有多个nameNode的。此环境中没有SecondaryNameNode,当然也可以有。 

好了,关于hdfs的基本概念就讲到这儿了,下面来看看具体的代码。

一、java实现上传本地文件至hdfs

这里,可以直接使用hdfs提供的java api即可实现,代码如下:


package com.bjpowernode.hdfs.local;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
* ClassName:UploadLocalFileToHdfs <br/>
* Function: 本地文件上传至hdfs. <br/>
* Date:  2016年3月28日 下午10:06:05 <br/>
* @author qiyongkang
* @version
* @since JDK 1.6
* @see  
*/
public class UploadLocalFileToHdfs {
public static void main(String[] args) {
 Configuration conf = new Configuration();
 String localDir = "/home/qiyongkang";
 String hdfsDir = "/qiyongkang";
 try{
  Path localPath = new Path(localDir);
  Path hdfsPath = new Path(hdfsDir);
  FileSystem hdfs = FileSystem.get(conf);
  hdfs.copyFromLocalFile(localPath, hdfsPath);
 }catch(Exception e){
  e.printStackTrace();
 }
}
}

注意,这里hdfs上传目录如果不存在的话,hdfs会自动创建,比较智能。 

打完包后,上传至服务器,执行yarn jar mr-demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar,然后执行hadoop fs -ls /qiyongkang便可看到: 

hadoop上传文件功能实例代码

二、java实现上传ftp上的文件至hdfs

首先,我们得准备一个ftp服务器,关于ftp服务器的搭建,大家可以查阅资料,笔者就不赘述了。 

其实,从ftp上拉取文件上传到hdfs上,这个过程大家不要想复杂了,我们讲本地文件上传到hdfs,其实就是采用流的方式。因此,我们可以直接读取ftp上的文件流,然后以流的方式写入到hdfs。 

下面,直接贴出代码:


package com.bjpowernode.hdfs.ftp;
import java.io.InputStream;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
/**
* ClassName:UploadFtpFileToHdfs <br/>
* Function: TODO ADD FUNCTION. <br/>
* Reason: TODO ADD REASON. <br/>
* Date: 2016年3月28日 下午10:50:37 <br/>
*
* @author qiyongkang
* @version
* @since JDK 1.6
* @see
*/
public class UploadFtpFileToHdfs {
public static void main(String[] args) {
 Configuration conf = new Configuration();
 loadFromFtpToHdfs("172.31.26.200", "qiyongkang", "qyk123456", "/www/input/", "/qiyongkang/", conf);
}
/**
 *
 * loadFromFtpToHdfs:将数据从ftp上传到hdfs上. <br/>
 *
 * @author qiyongkang
 * @param ip
 * @param username
 * @param password
 * @param filePath
 * @param outputPath
 * @param conf
 * @return
 * @since JDK 1.6
 */
private static boolean loadFromFtpToHdfs(String ip, String username, String password, String filePath,
  String outputPath, Configuration conf) {
 FTPClient ftp = new FTPClient();
 InputStream inputStream = null;
 FSDataOutputStream outputStream = null;
 boolean flag = true;
 try {
  ftp.connect(ip);
  ftp.login(username, password);
  ftp.setFileType(FTP.BINARY_FILE_TYPE);
  ftp.setControlEncoding("UTF-8");
  int reply = ftp.getReplyCode();
  if (!FTPReply.isPositiveCompletion(reply)) {
   ftp.disconnect();
  }
  FTPFile[] files = ftp.listFiles(filePath);
  FileSystem hdfs = FileSystem.get(conf);
  for (FTPFile file : files) {
   if (!(file.getName().equals(".") || file.getName().equals(".."))) {
    inputStream = ftp.retrieveFileStream(filePath + file.getName());
    outputStream = hdfs.create(new Path(outputPath + file.getName()));
    IOUtils.copyBytes(inputStream, outputStream, conf, false);
    if (inputStream != null) {
     inputStream.close();
     ftp.completePendingCommand();
    }
   }
  }
  ftp.disconnect();
 } catch (Exception e) {
  flag = false;
  e.printStackTrace();
 }
 return flag;
}
}

然后同样打包上传后执行yarn jar mr-demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar,便可看到: 

hadoop上传文件功能实例代码

总结

以上所述是小编给大家介绍的hadoop上传文件功能实例代码网站的支持!

来源:http://blog.sina.com.cn/s/blog_9c6852670102wwuz.html

标签:java,hadoop
0
投稿

猜你喜欢

  • c语言10个经典小程序

    2023-11-03 01:11:35
  • Java事务管理学习之Hibernate详解

    2023-06-07 03:14:09
  • java实现面板之间切换功能

    2021-12-03 15:06:39
  • Eclipse+Java+Swing实现斗地主游戏(代码)

    2023-08-17 18:10:40
  • springboot整合shiro实现记住我功能

    2023-07-29 20:21:34
  • Spring SpringMVC在启动完成后执行方法源码解析

    2023-01-01 12:46:57
  • C#中winform实现自动触发鼠标、键盘事件的方法

    2022-02-23 22:50:58
  • Android车载多媒体开发MediaSession框架示例详解

    2022-08-10 19:49:20
  • docker网络配置过程详解介绍

    2023-02-07 23:04:15
  • Java数据结构之链表相关知识总结

    2023-11-02 00:29:28
  • 使用IDEA配置Maven搭建开发框架ssm教程

    2023-04-05 13:33:21
  • Android仿360悬浮小球自定义view实现示例

    2021-10-22 10:16:14
  • 浅谈Java线程间通信之wait/notify

    2022-06-09 11:26:19
  • C#实现异步编程的方法

    2022-03-01 22:43:27
  • React Native与Android 原生通信的方法

    2021-08-11 19:02:56
  • MyBatis中XML 映射文件中常见的标签说明

    2023-01-07 08:02:59
  • C#编程中设置程序只可被运行一次的方法

    2022-08-09 08:36:32
  • winform拦截关闭按钮触发的事件示例

    2022-05-19 00:06:14
  • Android 支付宝支付、微信支付、银联支付 整合第三方支付接入方法(后台订单支付API设计)

    2023-08-28 01:36:01
  • c#异常处理示例分享

    2022-09-10 23:20:54
  • asp之家 软件编程 m.aspxhome.com