MangoCool

java api操作hdfs

2016-08-19 10:58:06   作者:MangoCool   来源:MangoCool

开始学习hadoop时写过一次,一晃就两年了,记忆也模糊起来。现在再次需要时,居然还花了我两小时搜集和整理资料才使代码正常跑起来,如果是两年前就无所谓了,可现在不同,我觉得这种时间完全没必要浪费,索性记录下来。


依赖:jdk1.7,hadoop-2.7.2

开发环境:ideaIU-14.1.4

测试环境:win7

建立maven工程Upload2HiveThrift,在pom.xml配置文件添加必要的依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xbdp.upload2hive</groupId>
    <artifactId>upload2hive</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>

        <dependency>
            <groupId>org.apache.thrift</groupId>
            <artifactId>libthrift</artifactId>
            <version>0.9.2</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.12</version>
        </dependency>

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>2.1.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpcore</artifactId>
            <version>4.4.5</version>
        </dependency>

        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.2</version>
        </dependency>

        <dependency>
            <groupId>net.sf.json-lib</groupId>
            <artifactId>json-lib</artifactId>
            <version>2.2.2</version>
        </dependency>

    </dependencies>

</project>


Oper2Hdfs.java类:

package com.xbdp.hdfs;

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;

/**
 * Created by MANGOCOOL on 2016/8/18.
 */
public class Oper2Hdfs {

    static Configuration conf = new Configuration();
    static FileSystem fs;
    static String path = "/home/hadoop/SW/hadoop/etc/hadoop/";
    static String hdfsUrl = "hdfs://h8:9000/";

    static
    {
        // 如果这些配置放在项目的resources目录下,就不需要加路径,会默认读取
        conf.addResource(new Path(path + "core-site.xml"));
        conf.addResource(new Path(path + "hdfs-site.xml"));
        conf.addResource(new Path(path + "mapred-site.xml"));

        // 设置fs.defaultFS参数,如果没有设置,会出现java.lang.IllegalArgumentException:
        // Wrong FS:hdfs://master:9000/xxx,expected: file:///
        // 也可将hadoop集群中的core-site.xml配置文件拷贝到项目下,这样在读取配置文件时就能够识别hdfs文件系统
        // 读取配置方式,可以不加,即便是集群中配置了standby节点也没关系,会自动识别
        conf.set("fs.defaultFS", hdfsUrl);

        //设置fs.hdfs.impl和fs.file.impl,否则可能出现java.io.IOException: No FileSystem for scheme: hdfs
        //也可以在core-default.xml
        //<property>
        //<name>fs.hdfs.impl</name>
        //<value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
        //<description>The FileSystem for hdfs: uris.</description>
        //</property>
        conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
        conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());

        try {
            //fs = FileSystem.get(new URI(hdfsUrl), conf, "hadoop");// 获取hdfs实例
            fs = FileSystem.get(conf);// 读取配置方式,可以用这个简单方法
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 上传文件到HDFS
     * @param localPath
     * @param file
     * @throws IOException
     */
    private static void upload2Hdfs(String localPath, String file) throws IOException
    {
        String dst = hdfsUrl + file;
        InputStream in = new BufferedInputStream(new FileInputStream(localPath));
        OutputStream out = fs.create(new Path(dst), new Progressable() {
            public void progress() {

            }
        });
        IOUtils.copyBytes(in, out, 4096, true);
    }

    /**
     * 从HDFS上读取文件
     * @param hdfsPath
     * @param localPath
     * @throws IOException
     */
    private static void readFromHdfs(String hdfsPath, String localPath) throws IOException
    {
        FSDataInputStream hdfsInStream = fs.open(new Path(hdfsPath));
        OutputStream out = new FileOutputStream(localPath);
        byte[] ioBuffer = new byte[1024];
        int readLen = hdfsInStream.read(ioBuffer);
        while(-1 != readLen){
            out.write(ioBuffer, 0, readLen);
            readLen = hdfsInStream.read(ioBuffer);
        }
        out.close();
        hdfsInStream.close();
    }

    /**
     * 删除HDFS上的文件
     * @param hdfsPath
     * @return
     * @throws IOException
     */
    private static boolean deleteFromHdfs(String hdfsPath) throws IOException
    {
        boolean flag = true;
        Path path = new Path(hdfsPath);
        if(fs.exists(path))
        {
            fs.deleteOnExit(path);
        } else
        {
            flag = false;
            System.out.println("路径不存在!");
        }
        return flag;
    }

    /**
     * 创建HDFS目录
     * @param hdfsDir
     * @throws IOException
     */
    public static void createDir(String hdfsDir) throws IOException
    {
        Path path = new Path(hdfsDir);
        fs.mkdirs(path);
        System.out.println("new dir \t" + conf.get("fs.default.name") + " | " + hdfsDir);
    }

    /**
     * 遍历HDFS上的文件和目录
     * @param hdfsDir
     * @throws IOException
     */
    private static void getDirFromHdfs(String hdfsDir) throws IOException
    {
        FileStatus fileList[] = fs.listStatus(new Path(hdfsDir));
        int size = fileList.length;
        for(int i = 0; i < size; i++){
            System.out.println("name:" + fileList[i].getPath().getName() + "\tsize:" + fileList[i].getLen());
        }
    }

    /**
     * main函数
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        System.setProperty("hadoop.home.dir", "E:\\Program Files\\hadoop-2.7.0");
        try {
            createDir("/test");

            String localPath = "E:\\Program Files\\XX-Net-2.9.2/LICENSE.txt";
            String file = "test/LICENSE.txt";
            upload2Hdfs(localPath, file);

            String hdfsPath = hdfsUrl + "test/LICENSE.txt";
            localPath = "/home/LICENSE.txt";
            readFromHdfs(hdfsPath, localPath);

            String hdfsDir = hdfsUrl + "/test";
            getDirFromHdfs(hdfsDir);

            hdfsPath = hdfsUrl + "test/";
            deleteFromHdfs(hdfsPath);

        } catch (Exception e) {
            e.printStackTrace();
        }
        finally {
            if(fs != null)
                fs.close();
        }
    }
}


遇到问题:

1、java.io.IOException: No FileSystem for scheme: hdfs

java.io.IOException: No FileSystem for scheme: hdfs
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2421)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2428)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367)
        at FileCopyToHdfs.readFromHdfs(FileCopyToHdfs.java:65)
        at FileCopyToHdfs.main(FileCopyToHdfs.java:26)

加入以下代码即可:

conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());

2、java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSumsByteArray(II[BI[BIILjava/lang/String;JZ)V

Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSumsByteArray(II[BI[BIILjava/lang/String;JZ)V
	at org.apache.hadoop.util.NativeCrc32.nativeComputeChunkedSumsByteArray(Native Method)
	at org.apache.hadoop.util.NativeCrc32.calculateChunkedSumsByteArray(NativeCrc32.java:86)
	at org.apache.hadoop.util.DataChecksum.calculateChunkedSums(DataChecksum.java:430)
	at org.apache.hadoop.fs.FSOutputSummer.writeChecksumChunks(FSOutputSummer.java:202)
	at org.apache.hadoop.fs.FSOutputSummer.flushBuffer(FSOutputSummer.java:163)
	at org.apache.hadoop.fs.FSOutputSummer.flushBuffer(FSOutputSummer.java:144)
	at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2254)
	at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2236)
	at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
	at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
	at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:61)
	at com.xbdp.hdfs.Oper2hdfs.uploadToHdfs(Oper2hdfs.java:68)
	at com.xbdp.hdfs.Oper2hdfs.main(Oper2hdfs.java:143)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

这是由于hadoop.dll 版本问题,2.4之前的和之后的需要的不一样,下载后加入你win下的hadoop/bin目录。

下载地址:https://github.com/steveloughran/winutils

别忘了加入代码:

System.setProperty("hadoop.home.dir", "E:\\Program Files\\hadoop-2.7.0");
最好把下载的winutils.exe也加入hadoop/bin中。

3、java.io.FileNotFoundException: \home (拒绝访问。)

java.io.FileNotFoundException: \home (拒绝访问。)
	at java.io.FileOutputStream.open(Native Method)
	at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
	at java.io.FileOutputStream.<init>(FileOutputStream.java:110)
	at com.xbdp.hdfs.Oper2hdfs.readFromHdfs(Oper2hdfs.java:79)
	at com.xbdp.hdfs.Oper2hdfs.main(Oper2hdfs.java:149)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
这个简单,意思就是你要操作的本来是文件,但是你这里只指定了文件的目录,当然拒绝你啊!把文件名补上即可。

标签: Java API Hadoop hdfs

分享:

上一篇idea打jar包方法

下一篇Hadoop YARN配置参数—权限与日志聚集相关参数

关于我

崇尚极简,热爱技术,喜欢唱歌,热衷旅行,爱好电子产品的一介码农。

座右铭

当你的才华还撑不起你的野心的时候,你就应该静下心来学习,永不止步!

人生之旅历途甚长,所争决不在一年半月,万不可因此着急失望,招精神之萎葸。

Copyright 2015- 芒果酷(mangocool.com) All rights reserved. 湘ICP备14019394号

免责声明:本网站部分文章转载其他媒体,意在为公众提供免费服务。如有信息侵犯了您的权益,可与本网站联系,本网站将尽快予以撤除。