のんびりSEの議事録

プログラミング系のポストからアプリに関してのポストなどをしていきます。まれにアニメ・マンガなど

Amazon EMRでSparkをやってみて

8月中はSparkを利用したレコメンドシステムの構築を主にやっていたので、今回はそのことについて記載していきます。

Sparkとは

BigDataを高速に分散処理を行うフレームワークで、以下のパッケージで構成されている。

  • Spark Core
  • Spark SQL
  • Spark Streaming
  • MLib(Machine Learning Library)
  • GraphX

サポートしている言語

1.6以降からデータフレームの機能が追加され、以降アップデートの際、RDDで提供されている機能が徐々に移植されている。
2.0以降はMLibでRDDはメンテナンスモードに入り、新規機能の追加はRDDでは行われなくなる模様。

EMR on Spark

EMRとは

Amazon EMR は、AWSビッグデータフレームワーク (Apache HadoopApache Spark など) の実行を簡素化して、大量のデータを処理および分析するマネージド型クラスタープラットフォーム
S3を始め、AWS上のサービスの連携がデフォルトでついてくる
1度構築したクラスタは再利用可能(CLIもアウトプット可能)

クラスタの作成

  1. コンソール -> EMR -> クラスタの作成
  2. 一般設定 > クラスタ名を入力、必要に応じてログを記録するS3バケットを入力
  3. ソフトウェア設定 > アプリケーションのSparkを選択
  4. ハードウェア構成 > 試しで動かす程度なら、最小のインスタンスで1インスタンスに指定しておくと良いかと
  5. セキュリティとアクセス > デフォルト設定にしておくことで、ロールがない場合は自動的に作成してくれる

f:id:tatsu_tora:20170910005611p:plain

ステップの追加

f:id:tatsu_tora:20170910010436p:plain

予め実行Jar(PySparkの場合はPythonスクリプトファイル)をS3にアップロードしておく必要がある

Command Runner

ステップ実行の際に、クラスタ内のcommand-runner.jarが利用される。 各コマンドは下記ドキュメントを参照。

docs.aws.amazon.com

クラスタの削除

クラスタを削除すると、クラスタに利用したインスタンスも削除されるので、この時点で課金されなくなる。 履歴として削除した状態のものも一定期間残る。

チューニングに関して

WebUI

EMRクラスタの詳細でウェブ接続の有効化をし、指示どおりにブラウザとSSHで設定すると、アクセス出来る。
主に以下のような項目が確認できる

  • スケジューラのステージおよびタスクのリスト
  • RDDサイズおよびメモリの使用量の概要
  • 環境情報
  • 実行中のexecutorの情報

キャッシュ

RDDを再利用する際に、このキャッシュ機能を利用するかしないかでパフォーマンスが大きく変わる事がある。
キャッシュを利用すると、RDDが永続化されるので、時には明示的に破棄しておかないと、OutOfMemoryErrorになることもある。

キャッシュ構造については以下の記事が詳しく書かれている。

qiita.com

キャッシュの効かせ方は、それぞれの以下の通り

  • DataFrame
df.cache()     # キャッシュ利用
df.is_cached   # キャッシュ確認
df.unpersist() # キャッシュ廃棄
spark.sql("CACHE [LAZY] TABLE [db_name.]table_name")  # キャッシュ利用
spark.sql("UNCACHE TABLE [db_name.]table_name")       # キャッシュ廃棄

S3データロード

EMR上のSpark内で直接S3を利用するより、Command-Runnerのコマンドにs3-dist-cpというコマンドがあるので、それを利用し、事前にS3からクラスタ内のHDFSにコピーし、それをロードする方が、S3へのネットワークを気にしないで済むので、データロードが早くなる。

docs.aws.amazon.com

Sparkの設定

spark.executor.meory、spark.executor.cores、spark.dynamicAllocation.enabledについてはEMR側でデフォルトで設定されるので、EMRに任せておいて良さそう

docs.aws.amazon.com

SQL周りの設定では以下の項目の調整で数分短縮の効果あり

  • spark.rdd.compress
  • spark.sql.shuffle.partitions
  • spark.sql.inMemoryColumnarStorage.batchSize
  • spark.sql.broadcastTimeout
  • spark.sql.files.openCostInBytes

社内勉強会で発表した資料も上げているので、合わせて記載していきます。

speakerdeck.com

当初、GraphXとSpark Streamingは使用していなかったので、スライドでは説明がないですが、 グラフ等の画像を出力してくれるのが、GraphXで、ストリーム処理を行うのがSpark Streamingとのことですが、これらはまだそこまで詳しくないです。。。