免费注册
帮助文档(华北一、二)

  • 3.1 JAVA - WordCount示例

    在examples中spark提供了这样几个java例子:

    仅以wordcount为例展示运行方法:

     
    spark-submit --class org.apache.spark.examples.JavaWordCount --master yarn 
    --deploy-mode cluster /home/hadoop/spark/lib/spark-examples*.jar 
    /input/kv.txt /output/wordcount

    注解:

    1、在hdfs上面准备好/input/kv.txt数据,随便一个文本什么的都行。

    2、/input/kv.txt /output/wordcount为运行参数,如果是其他程序,需要替换为对应的参数。

    3、这里没有指定分配的资源,在用户实际测试过程中,需要根据测试时使用的运算量,合理的设定资源。

    利用eclipse开发过程spark java程序过程:

    ● 创建Java Project

    ● 创建lib目录把工程依赖包spark-assembly-1.4.1-hadoop2.6.0-cdh5.4.4.jar加入lib包内,并add to build path

    ● 完成功能代码

    代码内容如下:

     
      package test.java.spark.job;   
    import java.util.Arrays;   
    import java.util.List;   
    import java.util.regex.Pattern;   
    import org.apache.spark.SparkConf;   
    import org.apache.spark.api.java.JavaPairRDD;   
    import org.apache.spark.api.java.JavaRDD;   
    import org.apache.spark.api.java.JavaSparkContext;   
    import org.apache.spark.api.java.function.FlatMapFunction;   
    import org.apache.spark.api.java.function.Function2;   
    import org.apache.spark.api.java.function.PairFunction;   
    import scala.Tuple2;     
    public final class JavaWordCount {       
      private static final Pattern SPACE = Pattern.compile(" ");         
      public static void main(String[] args) throws Exception {           
      if (args.length < 2) {           
        System.err.println("Usage: JavaWordCount <input> <output>");           
        System.exit(1);         
      }           
      SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");         
      JavaSparkContext ctx = new JavaSparkContext(sparkConf);         
      JavaRDD<String> lines = ctx.textFile(args[0], 1);         
      JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {           
        @Override           
        public Iterable<String> call(String s) {             
        return Arrays.asList(SPACE.split(s));           
      }         
      });           
      JavaPairRDD<String, Integer> ones = words.mapToPair(new 
    PairFunction<String, String, Integer>() {           
      @Override           
      public Tuple2<String, Integer> call(String s) {             
        return new Tuple2<String, Integer>(s, 1);           
      }         
      });          
      JavaPairRDD<String, Integer> counts = ones.reduceByKey(new 
    Function2<Integer, Integer, Integer>() {           
      @Override           
      public Integer call(Integer i1, Integer i2) {              
        return i1 + i2;           
      }         
      });           
      List<Tuple2<String, Integer>> output = counts.collect();         
      counts.saveAsTextFile(args[1]);         
      for (Tuple2<?,?> tuple : output) {           
      System.out.println(tuple._1() + ": " + tuple._2());         
      }         
      ctx.stop();      
     }   
    }

    ● 导出工程文件

    ● 数据准备

    将一段文本上传到 hdfs的这个位置 /input/kv1.txt

     hdfs dfs –put kv1.txt /input/kv1.txt

    提交任务运行

     
    spark-submit --class test.java.spark.JavaWordCount --master yarn --deploy-
    mode client --num-executors 1 --driver-memory 1g --executor-memory 1g --
    executor-cores 1  /home/hadoop/sparkJavaExample.jar /input/kv1.txt 
    /output/wordcount

    注解:

    资源配置不要超过当前机器的配额。

    3.2 Scala - HiveFromSpark示例

    ● 安装sbt

     
    curl https://bintray.com/sbt/rpm/rpm > bintray-sbt-rpm.repo 
    sudo mv bintray-sbt-rpm.repo /etc/yum.repos.d/ 
    sudo yum install sbt -y

    ● 构建代码

    以Spark example的HiveFromSpark为例:

     
    mkdir -p /data/HiveFromSpark/src/main/scala/com/ucloud/spark/examples 
    cd /data/HiveFromSpark/src/main/scala/com/ucloud/spark/examples 
    touch HiveFromSpark.scala;

    以Spark1.6.0、scala-2.10.5为例,可将下述代码添加进HiveFromSpark.scala。

     
    package com.ucloud.spark.examples   
    import com.google.common.io.{ByteStreams, Files} 
    import java.io.File 
    import org.apache.spark.{SparkConf, SparkContext} 
    import org.apache.spark.sql._ 
    import org.apache.spark.sql.hive.HiveContext   
    object HiveFromSpark {   
      case class Record(key: Int, value: String)     
      def main(args: Array[String]) {     
        val sparkConf = new SparkConf().setAppName("HiveFromSpark")     
        val sc = new SparkContext(sparkConf)       
        val hiveContext = new HiveContext(sc)     
        import hiveContext.implicits._     
        import hiveContext.sql       
        // Queries are expressed in HiveQL     
        println("Result of 'SELECT *': ")     
        sql("SELECT * FROM src").collect().foreach(println)       
        // Aggregation queries are also supported.     
        val count = sql("SELECT COUNT(*) FROM src").collect().head.getLong(0)     
        println(s"COUNT(*): $count")       
        // The results of SQL queries are themselves RDDs and support all normal RDD functions.  The     
        // items in the RDD are of type Row, which allows you to access each column by ordinal.     
        val rddFromSql = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")       
        println("Result of RDD.map:")     
        val rddAsStrings = rddFromSql.map {       
          case Row(key: Int, value: String) => s"Key: $key, Value: $value"     
        }   
        // You can also regi
        ster RDDs as temporary tables within a HiveContext.     
        val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))      
        // You can also register RDDs as temporary tables within a HiveContext.     
        val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))     
        rdd.toDF().registerTempTable("records")       
        // Queries can then join RDD data with data stored in Hive.     
        println("Result of SELECT *:")     
        sql("SELECT * FROM records r JOIN src s ON r.key = s.key").collect().foreach(println)       
        sc.stop()   
      } 
    }

    ● 构建sbt文件

     
    cd /data/HiveFromSpark/ 
    touch  HiveFromSpark.sbt;

    ● 将下述内容添加到文件:

     
    name := "HiveFromSpark" 
    version := "1.0" 
    scalaVersion := "2.10.5"   
    libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0" 
    libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.6.0" 
    libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.6.0"

    ● 编译

     
    cd /data/HiveFromSpark/ 
    sbt package

    由于需要连接maven下载相关依赖包,编译时间受网络环境限制,请耐心等待, 编译后文件位于 /data/HiveFromSpark/target/scala-2.10/hivefromspark_2.10-1.0.jar。

    ● 执行

    client模式:

     
    spark-submit --class com.ucloud.spark.examples.HiveFromSpark --master yarn 
    --deploy-mode client --num-executors 4 --executor-cores 1 
    /data/HiveFromSpark/target/scala-2.10/hivefromspark_2.10-1.0.jar

    cluster模式:

     
    spark-submit --class com.ucloud.spark.examples.HiveFromSpark --master yarn 
    --deploy-mode cluster --num-executors 4 --executor-cores 1 --files 
    /home/hadoop/hive/conf/hive-site.xml --jars 
    /home/hadoop/spark/lib/datanucleus-api-jdo-
    3.2.6.jar,/home/hadoop/spark/lib/datanucleus-rdbms-
    3.2.9.jar,/home/hadoop/spark/lib/datanucleus-core-3.2.10.jar 
    /data/HiveFromSpark/target/scala-2.10/hivefromspark_2.10-1.0.jar

    3.3 Python - PI示例

    注解:请参考spark安装目录下examples/src/main/python目录下的实例程序。

    示例代码:

     
    from __future__ import print_function   
    import sys 
    from random import random 
    from operator import add   
    from pyspark.sql import SparkSession   
    if __name__ == "__main__":     
      """         
        Usage: pi [partitions]     
      """     
      spark = SparkSession\         
        .builder\         
        .appName("PythonPi")\         
        .getOrCreate()   
      partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2     
        n = 100000 * partitions       
      def f(_):         
        x = random() * 2 - 1         
        y = random() * 2 - 1         
        return 1 if x ** 2 + y ** 2 < 1 else 0       
      count = spark.sparkContext.parallelize(range(1, n + 1), 
    partitions).map(f).reduce(add)     
      print("Pi is roughly %f" % (4.0 * count / n))       
      spark.stop()

    可通过以下命令提交Spark PI任务

     
    spark-submit  --master yarn --deploy-mode client --num-executors 4 --
    executor-cores 1 --executor-memory 2G 
    $SPARK_HOME/examples/src/main/python/pi.py 100

    最终在console的日志中会出现类似“Pi is roughly 3.141039”的结果。


文档是否已解决您的问题?

  已解决   未解决

如您有其它疑问,您也可以与我们技术专家联系探讨。

联系技术专家