AthenaでS3のデータをSQLで操作する
この記事はAmazon Web Services Advent Calendar 2017 24日目の記事です。
Amazon Athana概要
Amazon Athanaとは、S3のデータをSQLを利用して簡単に分析可能なサービスで、実行したクエリに対してのみ料金が発生します。 サーバレスで、EMR等立ち上げる必要なく、S3のデータを分析出来ます。
Amazon Athena – Amazon S3上のデータに対話的にSQLクエリを | Amazon Web Services ブログ
データ形式
- primitive_type
- TINYINT
- SMALLINT
- INT
- BIGINT
- BOOLEAN
- DOUBLE
- STRING
- TIMESTAMP
- DECIMAL
- DATE
- VARCHAR
- array_type
- ARRAY < data_type >
- map_type
- MAP < primitive_type, data_type >
- struct_type
- STRUCT < col_name : data_type [COMMENT col_comment] [, ...] >
CSVやTSVファイルではprimitive_typeで事足りるが、JSONファイル等だと、array_typeやmap_type、struct_typeが役に立つ。
サポートしているフォーマット
- CSV
- TSV
- Apache Avro
- ORC
- Apache Parquet
- Logstash log files
- Apache WebServer log files
- CloudTrail log files
パーティション
パーティションを利用することで、各クエリでスキャンされるデータの量を制限することができ、パフォーマンスが向上し、コストが削減されます。
パーティションを簡単に利用するには、フォルダ名にkey=date
の様な形式で保存されている必要があります。
aws s3 ls s3://elasticmapreduce/samples/hive-ads/tables/impressions/ PRE dt=2009-04-12-13-00/ PRE dt=2009-04-12-13-05/ PRE dt=2009-04-12-13-10/ PRE dt=2009-04-12-13-15/ PRE dt=2009-04-12-13-20/ PRE dt=2009-04-12-14-00/ PRE dt=2009-04-12-14-05/ PRE dt=2009-04-12-14-10/ PRE dt=2009-04-12-14-15/ PRE dt=2009-04-12-14-20/ PRE dt=2009-04-12-15-00/ PRE dt=2009-04-12-15-05/
- CRATE TABLEでの指定
PARTITIONED BY
で指定する
CREATE EXTERNAL TABLE impressions ( requestBeginTime string, adId string, impressionId string, referrer string, userAgent string, userCookie string, ip string, number string, processId string, browserCookie string, requestEndTime string, timers struct<modelLookup:string, requestTime:string>, threadId string, hostname string, sessionId string) PARTITIONED BY (dt string) ROW FORMAT serde 'org.apache.hive.hcatalog.data.JsonSerDe' with serdeproperties ( 'paths'='requestBeginTime, adId, impressionId, referrer, userAgent, userCookie, ip' ) LOCATION 's3://elasticmapreduce/samples/hive-ads/tables/impressions/' ;
nginxのログで実践
logをS3に格納
fluentdを利用したりする方法は割愛します。以下のような記事を参考に、s3://bucket/logs/dt=20171224/access.log
と配置した想定で行います。
テーブルの作成
- Athanaのコンソールから Create Table > Manuallyと選択します。
- Databaseはとりあえず、
sampledb
を選択し、Table Nameを入力し、s3のパスをnginxをアップロードしたパスs3://bucket/logs/
を入力します。
- Data Formatは
Apache Web Logs
を選択し、 Regexに以下のパターンを入力します。
^(\d+\.\d+\.\d+\.\d+) - - \[(.*)\] "(GET|POST|DELETE|PUT|PATCH|HEAD) (.+) (HTTP\/.+) (\d{3}) (\d{3}) "(-|.+)" "(.*)"
- Bulk add columnsを選択し、以下のフィールドパターンを入力します。
remote_address string, date_time date, method string, url string, http_version string, status_code int, body_bytes_sent int, referer string, user_agent string
- Partition Column nameに
dt
と入力し、Column typeはstring
を指定します。
以下のようなTableが作成されれば成功です。AthanaからSQLを実行して、結果を取得することが出来るようになります。
CREATE EXTERNAL TABLE IF NOT EXISTS sampledb.nginx_logs ( `remote_address` string, `date_time` date, `method` string, `url` string, `http_version` string, `status_code` int, `body_bytes_sent` int, `referer` string, `user_agent` string ) PARTITIONED BY ( dt string ) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe' WITH SERDEPROPERTIES ( 'serialization.format' = '1', 'input.regex' = '^(\\d+\\.\\d+\\.\\d+\\.\\d+) - - \\[(.*)\\] \"(GET|POST|DELETE|PUT|PATCH|HEAD) (.+) (HTTP\\/.+) (\\d{3}) (\\d{3}) \"(-|.+)\" \"(.*)\"' ) LOCATION 's3://bucket/logs/' TBLPROPERTIES ('has_encrypted_data'='false');
パーティショニングされたS3のデータをロードするには Load Partition
(MSCK REPAIR TABLE
)を実行する必要があります。
Python(boto3)からクエリを叩いて結果を取得する
Athanaで実行した結果をプログラムから得る場合には、JDBCかAPIで取得する事ができます。
Connecting to Amazon Athena with JDBC - Amazon Athena
ただ、JDBCの場合はまだまだ発展途上であり、足りない機能等あるので、APIで取得する方法を紹介します。
- boto3が必要
import boto3 athena = boto3.client("athena") sql = """ SELECT id FROM schema."table" WHERE dt = DATE_FORMAT(NOW(), '%Y%m%d') """ # start_query_executionでクエリを実行 response = athena.start_query_execution( QueryString=sql, QueryExecutionContext={ "Database": "schema" }, ResultConfiguration={ "OutputLocation": "s3://athena_log/", "EncryptionConfiguration": { "EncryptionOption": "SSE_S3" } } ) # QueryExecutionIdを取得 result = athena.get_query_results( QueryExecutionId=response["QueryExecutionId"] ) # QueryExecutionIdを元にクエリ結果を取得 # 次ページあれば"NextToken"に値が入る result = athena.get_query_results( QueryExecutionId=response["QueryExecutionId"], NextToken=response["NextToken"] )
Athena — Boto 3 Docs 1.5.6 documentation
注意点
クエリを実行すると、Historyに履歴が残り、実行結果も保存されるわけですが、これはAthana利用時に最初に指定した、Query result location
のS3のバケット内に、メタデータと結果が保存される仕組みのようで、S3からデータを消さない限りは、永久的に残っていきます。 頻繁に実行したり、S3の課金等が気になるのであれば、S3側のバケットの設定でファイルの有効期限を指定しておくと良いかと思います。