今回はhadoopのhiveとhdfs、そしてsparkを連携させてみます。
[hdspark@node01 ~]$ hive
・
・
・
Logging initialized using configuration in jar:file:/opt/hive/lib/hive-common-1.2.1.jar!/hive-log4j.properties
hive> use mydb;
OK
Time taken: 0.075 seconds
hive> SELECT COUNT(*) FROM uryo;
・
・
・
30240
Time taken: 5.134 seconds, Fetched: 1 row(s)
hiveの項で作成したuryoテーブルを利用します。
hive> INSERT OVERWRITE DIRECTORY '/output/'
> SELECT text,time,SUM(uryo) FROM uryo GROUP BY text,time ORDER BY text,time;
・
・
・
Time taken: 5.683 seconds
SELCT集計結果をhdfs上のファイルに出力します。
[hdspark@node01 ~]$ hadoop fs -ls -R /output/
-rwxr-xr-x 1 hdspark supergroup 302257 2015-12-22 00:43 /output/000000_0
[hdspark@node01 ~]$ hadoop fs -cat /output/000000_0 | more
一里塚橋00:0070
一里塚橋00:1070
一里塚橋00:2070
一里塚橋00:3070
・
・
・
出力された結果をhadoop上から確認しました。
[hdspark@node01 ~]$ hadoop fs -mv /output/000000_0 /output/hive_output
[hdspark@node01 ~]$ hadoop fs -ls -R /output
-rwxr-xr-x 1 hdspark supergroup 302257 2015-12-22 00:43 /output/hive_output
今度はspark上から確認してみます。
[hdspark@node01 ~]$ /opt/spark/bin/spark-shell
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.5.1
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_65)
Type in expressions to have them evaluated.
Type :help for more information.
・
・
・
sparkコンソールに入ります。
scala> val hive_output = sc.textFile("hdfs://127.0.0.1:9000/output/hive_output")
hive_output: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at <console>:21
scala> hive_output.count()
res1: Long = 15120
hdfs上のファイルを参照し行数をカウント
scala> hive_output.filter(line => line.contains("渋谷")).foreach(println)
渋谷橋00:0030
渋谷橋00:1030
渋谷橋00:2030
渋谷橋00:3029
渋谷橋00:4030
渋谷橋00:5028
・
・
・
scala> hive_output.filter(line => line.contains("渋谷")).map(line => line.split(001")).map(parts => (parts(0), parts(2).toInt)).reduceByKey(_ + _,1).foreach(println)
(渋谷橋,4189)
sparkのフィルタ機能、集計機能を利用してみました。
結果は良好です。
[hdspark@node01 ~]$ hive
・
・
・
Logging initialized using configuration in jar:file:/opt/hive/lib/hive-common-1.2.1.jar!/hive-log4j.properties
hive> use mydb;
OK
Time taken: 0.075 seconds
hive> SELECT COUNT(*) FROM uryo;
・
・
・
30240
Time taken: 5.134 seconds, Fetched: 1 row(s)
hiveの項で作成したuryoテーブルを利用します。
hive> INSERT OVERWRITE DIRECTORY '/output/'
> SELECT text,time,SUM(uryo) FROM uryo GROUP BY text,time ORDER BY text,time;
・
・
・
Time taken: 5.683 seconds
SELCT集計結果をhdfs上のファイルに出力します。
[hdspark@node01 ~]$ hadoop fs -ls -R /output/
-rwxr-xr-x 1 hdspark supergroup 302257 2015-12-22 00:43 /output/000000_0
[hdspark@node01 ~]$ hadoop fs -cat /output/000000_0 | more
一里塚橋00:0070
一里塚橋00:1070
一里塚橋00:2070
一里塚橋00:3070
・
・
・
出力された結果をhadoop上から確認しました。
[hdspark@node01 ~]$ hadoop fs -mv /output/000000_0 /output/hive_output
[hdspark@node01 ~]$ hadoop fs -ls -R /output
-rwxr-xr-x 1 hdspark supergroup 302257 2015-12-22 00:43 /output/hive_output
今度はspark上から確認してみます。
[hdspark@node01 ~]$ /opt/spark/bin/spark-shell
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.5.1
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_65)
Type in expressions to have them evaluated.
Type :help for more information.
・
・
・
sparkコンソールに入ります。
scala> val hive_output = sc.textFile("hdfs://127.0.0.1:9000/output/hive_output")
hive_output: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at <console>:21
scala> hive_output.count()
res1: Long = 15120
hdfs上のファイルを参照し行数をカウント
scala> hive_output.filter(line => line.contains("渋谷")).foreach(println)
渋谷橋00:0030
渋谷橋00:1030
渋谷橋00:2030
渋谷橋00:3029
渋谷橋00:4030
渋谷橋00:5028
・
・
・
scala> hive_output.filter(line => line.contains("渋谷")).map(line => line.split(001")).map(parts => (parts(0), parts(2).toInt)).reduceByKey(_ + _,1).foreach(println)
(渋谷橋,4189)
sparkのフィルタ機能、集計機能を利用してみました。
結果は良好です。




