免费注册
帮助文档(华北一、二)

  • 以下步骤以flume-1.6.0为例。

    限制说明

    ● 只支持orc存储格式的hive表

    ● 支持带有buckets的表

     
    CREATE EXTERNAL TABLE stocks (     
      date STRING,     
      open DOUBLE,     
      high DOUBLE,     
      low DOUBLE,     
      close DOUBLE,     
      volume BIGINT,     
      adj_close DOUBLE) 
    PARTITIONED BY(year STRING) 
    CLUSTERED BY (date) into 3 buckets 
    STORED AS ORC;

    ● partition是可选项

    ● 数据源只支持csv和json两种格式

    ● hivesink使兼容的版本是hive1.0.0

    ● metastore 增加下面的配置,然后重启metastore

     
    hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager 
    hive.compactor.initiator.on = true 
    hive.compactor.worker.threads = 5

    Flume配置

    ● flume.conf

     
    a1.sources = src1 
    a1.channels = chan1 
    a1.sinks = sink1 
    a1.sources.src1.type = spooldir 
    a1.sources.src1.channels = chan1 
    a1.sources.src1.spoolDir = /root/stk 
    a1.sources.src1.interceptors = skipHeadI 
    dateI a1.sources.src1.interceptors.skipHeadI.type = regex_filter 
    a1.sources.src1.interceptors.skipHeadI.regex = ^Date.* 
    a1.sources.src1.interceptors.skipHeadI.excludeEvents = true 
    a1.sources.src1.interceptors.dateI.type = regex_extractor 
    a1.sources.src1.interceptors.dateI.regex = ^(\\d+)-.* 
    a1.sources.src1.interceptors.dateI.serializers = y 
    a1.sources.src1.interceptors.dateI.serializers.y.name = year 
    a1.channels.chan1.type = memory 
    a1.channels.chan1.capacity = 1000 
    a1.channels.chan1.transactionCapacity = 100 
    a1.sinks.sink1.type = hive       
    a1.sinks.sink1.channel = chan1 
    a1.sinks.sink1.hive.metastore = thrift://ip1:9083,thrift://ip2:9083 
    a1.sinks.sink1.hive.database = default 
    a1.sinks.sink1.hive.table = stocks 
    a1.sinks.sink1.hive.partition = year 
    a1.sinks.sink1.hive.txnsPerBatchAsk = 2 
    a1.sinks.sink1.batchSize = 10 
    a1.sinks.sink1.serializer = delimited 
    a1.sinks.sink1.serializer.delimiter = , 
    a1.sinks.sink1.serializer.fieldnames = 
    date,open,high,low,close,volume,adj_close

    注意:
    a1.sinks.sink1.hive.metastore = thrift://ip1:9083,thrift://ip2:9083 中的ip1,ip2需要修改成具体的ip

    下载依赖包

    http://downloadupt.infile.inspurcloud.cn/hivesink.gz

    把jar包解压到flume的lib下,启动flume。

    示例文件

     
        Date,Open,High,Low,Close,Volume,Adj Close 
    2006-07-21,75.489998,75.50,74.50,74.860001,8372500,59.86873 
    2006-07-20,75.730003,75.879997,75.199997,75.480003,12214900,60.364573 
    2006-07-19,76.00,77.059998,76.00,76.07,14536900,60.836418

    启动命令

     /bin/flume-ng agent -n a1 -f conf/flume-conf

    -n 指定的是 config 文件中启动的agent的名字
    -f 指定了配置文件


文档是否已解决您的问题?

  已解决   未解决

如您有其它疑问,您也可以与我们技术专家联系探讨。

联系技术专家