编译支持 spark 读写 oss(cdh 5.x)

共 17233字,需浏览 35分钟

 ·

2021-07-28 11:07

作者:来世愿做友人_A

来源:SegmentFault 思否社区

前言

背景:使用 spark 读取 hdfs 文件写入到 oss
hadoop : 2.6.0-cdh5.15.1
spark : 2.4.1
增加了注意点和坑点

编译 hadoop-aliyun

hadoop 高版本已经默认支持 aliyun-oss 的访问,而本版本不支持,需要编译支持下

  • 拉取 hadoop trunk 分支代码,copy hadoop-tools/hadoop-aliyun 模块代码到 cdh 对应的项目模块中
  • 修改 hadoop-tools pom.xml
    • <module>hadoop-aliyun</module> 添加 hadoop-aliyun 子 module
  • 修改根 pom.xml 中的 java 版本为 1.8,hadoop-aliyun 使用了 1.8 的 lambda 语法,也可以直接修改代码支持
  • 修改 hadoop-aliyun pom.xml,修改 version,以及相关的 oss,http 依赖包,使用 shade 插件将相关依赖打进去
  • 代码修改
    • import org.apache.commons.lang3 改为 import org.apache.commons.lang
    • 复制(cdh版本) hadoop-aws 模块下的 BlockingThreadPoolExecutorService 和 SemaphoredDelegatingExecutor 两个类 到 org.apache.hadoop.util 目录下
  • 编译模块 hadoop-aliyun
    • mvn clean package -pl hadoop-tools/hadoop-aliyun
最终的配置文件如下
<?xml version="1.0" encoding="UTF-8"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-project</artifactId>
    <version>2.6.0-cdh5.15.1</version>
    <relativePath>../../hadoop-project</relativePath>
  </parent>
  <artifactId>hadoop-aliyun</artifactId>
  <name>Apache Hadoop Aliyun OSS support</name>
  <packaging>jar</packaging>

  <properties>
    <file.encoding>UTF-8</file.encoding>
    <downloadSources>true</downloadSources>
  </properties>

  <profiles>
    <profile>
      <id>tests-off</id>
      <activation>
        <file>
          <missing>src/test/resources/auth-keys.xml</missing>
        </file>
      </activation>
      <properties>
        <maven.test.skip>true</maven.test.skip>
      </properties>
    </profile>
    <profile>
      <id>tests-on</id>
      <activation>
        <file>
          <exists>src/test/resources/auth-keys.xml</exists>
        </file>
      </activation>
      <properties>
        <maven.test.skip>false</maven.test.skip>
      </properties>
    </profile>
  </profiles>

  <build>
    <plugins>
      <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>findbugs-maven-plugin</artifactId>
        <configuration>
          <findbugsXmlOutput>true</findbugsXmlOutput>
          <xmlOutput>true</xmlOutput>
          <excludeFilterFile>${basedir}/dev-support/findbugs-exclude.xml
          </excludeFilterFile>
          <effort>Max</effort>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <configuration>
          <forkedProcessTimeoutInSeconds>3600</forkedProcessTimeoutInSeconds>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-dependency-plugin</artifactId>
        <executions>
          <execution>
            <id>deplist</id>
            <phase>compile</phase>
            <goals>
              <goal>list</goal>
            </goals>
            <configuration>
              <!-- build a shellprofile -->
              <outputFile>
                ${project.basedir}/target/hadoop-tools-deps/${project.artifactId}.tools-optional.txt
              </outputFile>
            </configuration>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.0</version>
        <executions>
          <execution>
            <id>shade-aliyun-sdk-oss</id>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <shadedArtifactAttached>false</shadedArtifactAttached>
              <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
              <createDependencyReducedPom>true</createDependencyReducedPom>
              <createSourcesJar>true</createSourcesJar>
              <relocations>
                <relocation>
                  <pattern>org.apache.http</pattern>
                  <shadedPattern>com.xxx.thirdparty.org.apache.http</shadedPattern>
                </relocation>
              </relocations>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>com.aliyun.oss</groupId>
      <artifactId>aliyun-sdk-oss</artifactId>
      <version>3.4.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.httpcomponents</groupId>
      <artifactId>httpclient</artifactId>
      <version>4.4.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.httpcomponents</groupId>
      <artifactId>httpcore</artifactId>
      <version>4.4.1</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <exclusions>
        <exclusion>
          <groupId>org.apache.httpcomponents</groupId>
          <artifactId>httpclient</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.apache.httpcomponents</groupId>
          <artifactId>httpcore</artifactId>
        </exclusion>
      </exclusions>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <scope>test</scope>
      <type>test-jar</type>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-distcp</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-distcp</artifactId>
      <scope>test</scope>
      <type>test-jar</type>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-yarn-server-tests</artifactId>
      <scope>test</scope>
      <type>test-jar</type>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-examples</artifactId>
      <scope>test</scope>
      <type>jar</type>
    </dependency>
  </dependencies>

</project>

spark 读写取 oss 文件

val inputPath = "hdfs:///xxx"
val outputPath = "oss://bucket/OSS_FILES"

val conf = new SparkConf()
    conf.set("spark.hadoop.fs.oss.endpoint""oss-cn-xxx")
    conf.set("spark.hadoop.fs.oss.accessKeyId""xxx")
    conf.set("spark.hadoop.fs.oss.accessKeySecret""xxx")
    conf.set("spark.hadoop.fs.oss.impl""org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem")
    conf.set("spark.hadoop.fs.oss.buffer.dir""/tmp/oss")
    conf.set("spark.hadoop.fs.oss.connection.secure.enabled""false")
    conf.set("spark.hadoop.fs.oss.connection.maximum""2048")
    
spark.write.format("orc").mode("overwrite").save(outputPath)
其它以 spark sql 以及 hdfs 读取 oss 的方式可以参考后面的第三个链接

spark submit

spark-submit \
--class org.example.HdfsToOSS \
--master yarn \
--deploy-mode cluster \
--num-executors 2 \
--executor-cores 2 \
--executor-memory 3G \
--driver-cores 1  \
--driver-memory 3G \
--conf "spark.driver.extraClassPath=hadoop-common-2.6.0-cdh5.15.1.jar" \
--conf "spark.executor.extraClassPath=hadoop-common-2.6.0-cdh5.15.1.jar" \
--jars ./hadoop-aliyun-2.6.0-cdh5.15.1.jar,./hadoop-common-2.6.0-cdh5.15.1.jar \
./spark-2.4-worker-1.0-SNAPSHOT.jar
注意下 extraClassPath 这个参数,如果没有特殊的配置,spark 会默认加载自身的 hadoop-common 包,如果版本不对,可能会导致 ClassNotFound,需要 extraClassPath 指定,会优先加载

才疏学浅,如果有错误的地方,欢迎指正


点击左下角阅读原文,到 SegmentFault 思否社区 和文章作者展开更多互动和交流,扫描下方”二维码“或在“公众号后台回复“ 入群 ”即可加入我们的技术交流群,收获更多的技术文章~

- END -


浏览 25
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报