今回はhadoopのhiveとhdfs、そしてsparkを連携させてみます。

[hdspark@node01 ~]$ hive



Logging initialized using configuration in jar:file:/opt/hive/lib/hive-common-1.2.1.jar!/hive-log4j.properties

hive> use mydb;
OK
Time taken: 0.075 seconds
hive> SELECT COUNT(*) FROM uryo;



30240
Time taken: 5.134 seconds, Fetched: 1 row(s)


hiveの項で作成したuryoテーブルを利用します。

hive> INSERT OVERWRITE DIRECTORY '/output/'
    > SELECT text,time,SUM(uryo) FROM uryo GROUP BY text,time ORDER BY text,time;



Time taken: 5.683 seconds


SELCT集計結果をhdfs上のファイルに出力します。

[hdspark@node01 ~]$ hadoop fs -ls -R /output/
-rwxr-xr-x   1 hdspark supergroup     302257 2015-12-22 00:43 /output/000000_0

[hdspark@node01 ~]$ hadoop fs -cat /output/000000_0 | more
一里塚橋00:0070
一里塚橋00:1070
一里塚橋00:2070
一里塚橋00:3070




出力された結果をhadoop上から確認しました。

[hdspark@node01 ~]$ hadoop fs -mv /output/000000_0 /output/hive_output
[hdspark@node01 ~]$ hadoop fs -ls -R /output
-rwxr-xr-x   1 hdspark supergroup     302257 2015-12-22 00:43 /output/hive_output

今度はspark上から確認してみます。

[hdspark@node01 ~]$ /opt/spark/bin/spark-shell
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.5.1
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_65)
Type in expressions to have them evaluated.
Type :help for more information.




sparkコンソールに入ります。

scala> val hive_output = sc.textFile("hdfs://127.0.0.1:9000/output/hive_output")
hive_output: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at <console>:21

scala> hive_output.count()
res1: Long = 15120


hdfs上のファイルを参照し行数をカウント

scala> hive_output.filter(line => line.contains("渋谷")).foreach(println)
渋谷橋00:0030
渋谷橋00:1030
渋谷橋00:2030
渋谷橋00:3029
渋谷橋00:4030
渋谷橋00:5028




scala> hive_output.filter(line => line.contains("渋谷")).map(line => line.split(001")).map(parts => (parts(0), parts(2).toInt)).reduceByKey(_ + _,1).foreach(println)
(渋谷橋,4189)

sparkのフィルタ機能、集計機能を利用してみました。
結果は良好です。