2016年8月11日木曜日

Apache Beam in Debian で Word count

Apache Beamでword countしてみました。

なぜ Apache Beam なのか : Dataflow のライバル参入を促す理由 http://googlecloudplatform-japan.blogspot.jp/2016/05/apache-beam-dataflow.html

Cloud Dataflow の Python SDK がベータに http://googlecloudplatform-japan.blogspot.jp/2016/08/cloud-dataflow-python-sdk.html

Dataflow Quickstarts https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven?hl=ja

Apache Beam は、Google Cloud Dataflow をオープンソース化したプロジェクトです。

http://beam.incubator.apache.org/

ビッグデータ処理の最も抽象化の高い層に位置するフレームワークです。

ビックデータのバッチ処理とストリーミング処理双方をサポートし、 処理基盤としてApache Sparkなどを使えます。

また、Googleの各種ビッグデータ処理サービスと連携して機能します。

現在は、Apache incubator プロジェクトで、ドキュメントは必要最小限です。 また、日本語の資料も少ないのが現状です。

Mavenを日常的に使っていればすぐに試せますが、Hello worldがサクッとできるREADMEが用意されているわけではないので、まとめました。

環境について

今回は、Cloud Dataflow を処理基盤(Runner)に指定しました。 そのため、Dataflowジョブを実行するプロジェクト内に、GCEインスタンス、Storageの作業バケット/ディレクトリを作りました。

maven-3.3.Xをインストールする

apt-get install mavenでは、3.0.5など古いのが入ました。これだとBeamのビルドができません。

以下を参考に、最新のMavenをインストールします。

http://maven.apache.org/install.html http://maven.apache.org/download.cgi

$ curl -OL http://ftp.yz.yamagata-u.ac.jp/pub/network/apache/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.zip
$ unzip apache-maven-3.3.9-bin.zip
$ sudo mv apache-maven-3.3.9 /opt
$ PATH=$PATH:/opt/apache-maven-3.3.9/bin
$ mvn -v

Maven3のはじめかた https://www.gitbook.com/book/kengotoda/what-is-maven/details

Beam のセットアップ

githubから取得して、実行してみます。

$ git clone https://github.com/apache/incubator-beam.git
$ cd incubator-beam
$ mvn test

Spark関連のテストでエラーが出てしまいました。 が、コンパイルは通っているようなので、一旦進めました。

 -------------------------------------------------------
   T E S T S
 -------------------------------------------------------
 Running org.apache.beam.runners.spark.EmptyInputTest

サンプルを動かす

example内にサンプルコードがあります。 ここにもpom.xmlが設定してあるので、必要な箇所を修正すれば、実際に動かすことができます。

GCP Dataflowをrunnerとして設定する

基本、コメント欄のコピペでいいのですが、以下のsetTempLocationが使えなくなっています。

    //   dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY");

setGcpTempLocationを使用します。

dataflowOptions.setGcpTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY");

また、必要なライブラリをimportします。

+++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
@@ -28,6 +28,9 @@ import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TypeDescriptors;

+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.BlockingDataflowRunner;
+

あとは、プロジェクトID, Storageのパスを設定します。

 import java.util.Arrays;

 /**
@@ -53,8 +56,14 @@ public class MinimalWordCountJava8 {
     // for more details.
     //   options.as(FlinkPipelineOptions.class)
     //      .setRunner(FlinkRunner.class);
+    DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
+
+    dataflowOptions.setRunner(BlockingDataflowRunner.class);
+    dataflowOptions.setProject("project-id");
+    dataflowOptions.setGcpTempLocation("gs://");

-    Pipeline p = Pipeline.create(options);
+    Pipeline p = Pipeline.create(dataflowOptions);

     p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
      .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
@@ -66,7 +75,7 @@ public class MinimalWordCountJava8 {
          .withOutputType(TypeDescriptors.strings()))

      // CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to.
-     .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
+     .apply(TextIO.Write.to("gs://"));

     p.run();
   }

compileして確認します。

$ mvn compile

認証設定をする

JavaからCloud Storageにアクセスするために、認証情報を設定します。 まず実行してみます。

$ mvn exec:java -Dexec.mainClass=org.apache.beam.examples.MinimalWordCountJava8

認証がうまくいってないと、ERRORメッセージが出ます。解消方法の例も出るので、参考にします。

今回は、GCEのインスタンスからアクセスしたので、gcloudコマンドで認証をしました。

$ gcloud auth login

再度、上記コマンドで実行します。SUCCESSになったら、GCP上で結果を確認します。

  • Cloud Dataflowのジョブ一覧に出てきます。
  • アウトプットは、Storageにでます。

上記でうまくいかない場合は、GCP SDKの認証方法が幾つかあるので、この辺りが参考になりそうです。

Google Cloud Dataflowを試してみた http://qiita.com/harukasan/items/019da8a6e76b341f6c73

Mavenリポジトリにもあります

すでに、Mavenリポジトリに上がっているので、ライブラリはそのまま使うことも可能です。

https://mvnrepository.com/artifact/org.apache.beam

0 件のコメント:

コメントを投稿