Apache Beamでword count
してみました。
なぜ Apache Beam なのか : Dataflow のライバル参入を促す理由 http://googlecloudplatform-japan.blogspot.jp/2016/05/apache-beam-dataflow.html
Cloud Dataflow の Python SDK がベータに http://googlecloudplatform-japan.blogspot.jp/2016/08/cloud-dataflow-python-sdk.html
Dataflow Quickstarts https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven?hl=ja
Apache Beam は、Google Cloud Dataflow をオープンソース化したプロジェクトです。
http://beam.incubator.apache.org/
ビッグデータ処理の最も抽象化の高い層に位置するフレームワークです。
ビックデータのバッチ処理とストリーミング処理双方をサポートし、 処理基盤としてApache Sparkなどを使えます。
また、Googleの各種ビッグデータ処理サービスと連携して機能します。
現在は、Apache incubator プロジェクトで、ドキュメントは必要最小限です。 また、日本語の資料も少ないのが現状です。
Mavenを日常的に使っていればすぐに試せますが、Hello worldがサクッとできるREADMEが用意されているわけではないので、まとめました。
環境について
今回は、Cloud Dataflow を処理基盤(Runner)に指定しました。 そのため、Dataflowジョブを実行するプロジェクト内に、GCEインスタンス、Storageの作業バケット/ディレクトリを作りました。
maven-3.3.Xをインストールする
apt-get install mavenでは、3.0.5など古いのが入ました。これだとBeamのビルドができません。
以下を参考に、最新のMavenをインストールします。
http://maven.apache.org/install.html http://maven.apache.org/download.cgi
$ curl -OL http://ftp.yz.yamagata-u.ac.jp/pub/network/apache/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.zip
$ unzip apache-maven-3.3.9-bin.zip
$ sudo mv apache-maven-3.3.9 /opt
$ PATH=$PATH:/opt/apache-maven-3.3.9/bin
$ mvn -v
Maven3のはじめかた https://www.gitbook.com/book/kengotoda/what-is-maven/details
Beam のセットアップ
githubから取得して、実行してみます。
$ git clone https://github.com/apache/incubator-beam.git
$ cd incubator-beam
$ mvn test
Spark関連のテストでエラーが出てしまいました。 が、コンパイルは通っているようなので、一旦進めました。
-------------------------------------------------------
T E S T S
-------------------------------------------------------
Running org.apache.beam.runners.spark.EmptyInputTest
サンプルを動かす
example内にサンプルコードがあります。 ここにもpom.xmlが設定してあるので、必要な箇所を修正すれば、実際に動かすことができます。
GCP Dataflowをrunnerとして設定する
基本、コメント欄のコピペでいいのですが、以下のsetTempLocationが使えなくなっています。
// dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY");
setGcpTempLocationを使用します。
dataflowOptions.setGcpTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY");
また、必要なライブラリをimport
します。
+++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
@@ -28,6 +28,9 @@ import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.BlockingDataflowRunner;
+
あとは、プロジェクトID, Storageのパスを設定します。
import java.util.Arrays;
/**
@@ -53,8 +56,14 @@ public class MinimalWordCountJava8 {
// for more details.
// options.as(FlinkPipelineOptions.class)
// .setRunner(FlinkRunner.class);
+ DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
+
+ dataflowOptions.setRunner(BlockingDataflowRunner.class);
+ dataflowOptions.setProject("project-id");
+ dataflowOptions.setGcpTempLocation("gs://");
- Pipeline p = Pipeline.create(options);
+ Pipeline p = Pipeline.create(dataflowOptions);
p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
.apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
@@ -66,7 +75,7 @@ public class MinimalWordCountJava8 {
.withOutputType(TypeDescriptors.strings()))
// CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to.
- .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
+ .apply(TextIO.Write.to("gs://"));
p.run();
}
compileして確認します。
$ mvn compile
認証設定をする
JavaからCloud Storageにアクセスするために、認証情報を設定します。 まず実行してみます。
$ mvn exec:java -Dexec.mainClass=org.apache.beam.examples.MinimalWordCountJava8
認証がうまくいってないと、ERRORメッセージが出ます。解消方法の例も出るので、参考にします。
今回は、GCEのインスタンスからアクセスしたので、gcloudコマンドで認証をしました。
$ gcloud auth login
再度、上記コマンドで実行します。SUCCESSになったら、GCP上で結果を確認します。
- Cloud Dataflowのジョブ一覧に出てきます。
- アウトプットは、Storageにでます。
上記でうまくいかない場合は、GCP SDKの認証方法が幾つかあるので、この辺りが参考になりそうです。
Google Cloud Dataflowを試してみた http://qiita.com/harukasan/items/019da8a6e76b341f6c73
Mavenリポジトリにもあります
すでに、Mavenリポジトリに上がっているので、ライブラリはそのまま使うことも可能です。
0 件のコメント:
コメントを投稿