免费注册
帮助文档(华北一、二)

  • 3.1 JAVA - WordCount示例

    在examples中spark提供了这样几个java例子:

    仅以wordcount为例展示运行方法:

     
    spark-submit --class org.apache.spark.examples.JavaWordCount --master yarn 
    --deploy-mode cluster /home/hadoop/spark/lib/spark-examples*.jar 
    /input/kv.txt /output/wordcount

    注解:

    1、在hdfs上面准备好/input/kv.txt数据,随便一个文本什么的都行。

    2、/input/kv.txt /output/wordcount为运行参数,如果是其他程序,需要替换为对应的参数。

    3、这里没有指定分配的资源,在用户实际测试过程中,需要根据测试时使用的运算量,合理的设定资源。

    利用eclipse开发过程spark java程序过程:

    ● 创建Java Project

    ● 创建lib目录把工程依赖包spark-assembly-1.4.1-hadoop2.6.0-cdh5.4.4.jar加入lib包内,并add to build path

    ● 完成功能代码

    代码内容如下:

     
      package test.java.spark.job;   
    import java.util.Arrays;   
    import java.util.List;   
    import java.util.regex.Pattern;   
    import org.apache.spark.SparkConf;   
    import org.apache.spark.api.java.JavaPairRDD;   
    import org.apache.spark.api.java.JavaRDD;   
    import org.apache.spark.api.java.JavaSparkContext;   
    import org.apache.spark.api.java.function.FlatMapFunction;   
    import org.apache.spark.api.java.function.Function2;   
    import org.apache.spark.api.java.function.PairFunction;   
    import scala.Tuple2;     
    public final class JavaWordCount {       
      private static final Pattern SPACE = Pattern.compile(" ");         
      public static void main(String[] args) throws Exception {           
      if (args.length < 2) {           
        System.err.println("Usage: JavaWordCount <input> <output>");           
        System.exit(1);         
      }           
      SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");         
      JavaSparkContext ctx = new JavaSparkContext(sparkConf);         
      JavaRDD<String> lines = ctx.textFile(args[0], 1);         
      JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {           
        @Override           
        public Iterable<String> call(String s) {             
        return Arrays.asList(SPACE.split(s));           
      }         
      });           
      JavaPairRDD<String, Integer> ones = words.mapToPair(new 
    PairFunction<String, String, Integer>() {           
      @Override           
      public Tuple2<String, Integer> call(String s) {             
        return new Tuple2<String, Integer>(s, 1);           
      }         
      });          
      JavaPairRDD<String, Integer> counts = ones.reduceByKey(new 
    Function2<Integer, Integer, Integer>() {           
      @Override           
      public Integer call(Integer i1, Integer i2) {              
        return i1 + i2;           
      }         
      });           
      List<Tuple2<String, Integer>> output = counts.collect();         
      counts.saveAsTextFile(args[1]);         
      for (Tuple2<?,?> tuple : output) {           
      System.out.println(tuple._1() + ": " + tuple._2());         
      }         
      ctx.stop();      
     }   
    }

    ● 导出工程文件

    ● 数据准备

    将一段文本上传到 hdfs的这个位置 /input/kv1.txt

     hdfs dfs –put kv1.txt /input/kv1.txt

    提交任务运行

     
    spark-submit --class test.java.spark.JavaWordCount --master yarn --deploy-
    mode client --num-executors 1 --driver-memory 1g --executor-memory 1g --
    executor-cores 1  /home/hadoop/sparkJavaExample.jar /input/kv1.txt 
    /output/wordcount

    注解:

    资源配置不要超过当前机器的配额。

    3.2 Scala - HiveFromSpark示例

    ● 安装sbt

     
    curl https://bintray.com/sbt/rpm/rpm > bintray-sbt-rpm.repo 
    sudo mv bintray-sbt-rpm.repo /etc/yum.repos.d/ 
    sudo yum install sbt -y

    ● 构建代码

    以Spark example的HiveFromSpark为例:

     
    mkdir -p /data/HiveFromSpark/src/main/scala/com/ucloud/spark/examples 
    cd /data/HiveFromSpark/src/main/scala/com/ucloud/spark/examples 
    touch HiveFromSpark.scala;

    以Spark1.6.0、scala-2.10.5为例,可将下述代码添加进HiveFromSpark.scala。

     
    package com.ucloud.spark.examples   
    import com.google.common.io.{ByteStreams, Files} 
    import java.io.File 
    import org.apache.spark.{SparkConf, SparkContext} 
    import org.apache.spark.sql._ 
    import org.apache.spark.sql.hive.HiveContext   
    object HiveFromSpark {   
      case class Record(key: Int, value: String)     
      def main(args: Array[String]) {     
        val sparkConf = new SparkConf().setAppName("HiveFromSpark")     
        val sc = new SparkContext(sparkConf)       
        val hiveContext = new HiveContext(sc)     
        import hiveContext.implicits._     
        import hiveContext.sql       
        // Queries are expressed in HiveQL     
        println("Result of 'SELECT *': ")