のんびりSEの議事録

プログラミング系のポストからアプリに関してのポストなどをしていきます。まれにアニメ・マンガなど

AthenaでS3のデータをSQLで操作する

この記事はAmazon Web Services Advent Calendar 2017 24日目の記事です。

Amazon Athana概要

Amazon Athanaとは、S3のデータをSQLを利用して簡単に分析可能なサービスで、実行したクエリに対してのみ料金が発生します。 サーバレスで、EMR等立ち上げる必要なく、S3のデータを分析出来ます。

Amazon Athena – Amazon S3上のデータに対話的にSQLクエリを | Amazon Web Services ブログ

データ形式

  • primitive_type
    • TINYINT
    • SMALLINT
    • INT
    • BIGINT
    • BOOLEAN
    • DOUBLE
    • STRING
    • TIMESTAMP
    • DECIMAL
    • DATE
    • VARCHAR
  • array_type
    • ARRAY < data_type >
  • map_type
    • MAP < primitive_type, data_type >
  • struct_type
    • STRUCT < col_name : data_type [COMMENT col_comment] [, ...] >

CSVやTSVファイルではprimitive_typeで事足りるが、JSONファイル等だと、array_typeやmap_type、struct_typeが役に立つ。

サポートしているフォーマット

  • CSV
  • TSV
  • Apache Avro
  • ORC
  • Apache Parquet
  • Logstash log files
  • Apache WebServer log files
  • CloudTrail log files

パーティション

パーティションを利用することで、各クエリでスキャンされるデータの量を制限することができ、パフォーマンスが向上し、コストが削減されます。

パーティションを簡単に利用するには、フォルダ名にkey=dateの様な形式で保存されている必要があります。

aws s3 ls s3://elasticmapreduce/samples/hive-ads/tables/impressions/

    PRE dt=2009-04-12-13-00/
    PRE dt=2009-04-12-13-05/
    PRE dt=2009-04-12-13-10/
    PRE dt=2009-04-12-13-15/
    PRE dt=2009-04-12-13-20/
    PRE dt=2009-04-12-14-00/
    PRE dt=2009-04-12-14-05/
    PRE dt=2009-04-12-14-10/
    PRE dt=2009-04-12-14-15/
    PRE dt=2009-04-12-14-20/
    PRE dt=2009-04-12-15-00/
    PRE dt=2009-04-12-15-05/
  • CRATE TABLEでの指定

PARTITIONED BYで指定する

CREATE EXTERNAL TABLE impressions (
    requestBeginTime string,
    adId string,
    impressionId string,
    referrer string,
    userAgent string,
    userCookie string,
    ip string,
    number string,
    processId string,
    browserCookie string,
    requestEndTime string,
    timers struct<modelLookup:string, requestTime:string>,
    threadId string,
    hostname string,
    sessionId string)
PARTITIONED BY (dt string)
ROW FORMAT  serde 'org.apache.hive.hcatalog.data.JsonSerDe'
    with serdeproperties ( 'paths'='requestBeginTime, adId, impressionId, referrer, userAgent, userCookie, ip' )
LOCATION 's3://elasticmapreduce/samples/hive-ads/tables/impressions/' ;

dev.classmethod.jp

nginxのログで実践

logをS3に格納

fluentdを利用したりする方法は割愛します。以下のような記事を参考に、s3://bucket/logs/dt=20171224/access.log と配置した想定で行います。

developer.smartnews.com

テーブルの作成

  • Athanaのコンソールから Create Table > Manuallyと選択します。
  • Databaseはとりあえず、sampledbを選択し、Table Nameを入力し、s3のパスをnginxをアップロードしたパス s3://bucket/logs/ を入力します。

f:id:tatsu_tora:20171224170304p:plain

  • Data Formatは Apache Web Logs を選択し、 Regexに以下のパターンを入力します。
^(\d+\.\d+\.\d+\.\d+) - - \[(.*)\] "(GET|POST|DELETE|PUT|PATCH|HEAD) (.+) (HTTP\/.+) (\d{3}) (\d{3}) "(-|.+)" "(.*)"

f:id:tatsu_tora:20171224170739p:plain

  • Bulk add columnsを選択し、以下のフィールドパターンを入力します。
remote_address string, date_time date, method string, url string, http_version string, status_code int, body_bytes_sent int, referer string, user_agent string

f:id:tatsu_tora:20171224171430p:plain

  • Partition Column nameにdtと入力し、Column typeはstringを指定します。

f:id:tatsu_tora:20171224171844p:plain

以下のようなTableが作成されれば成功です。AthanaからSQLを実行して、結果を取得することが出来るようになります。

CREATE EXTERNAL TABLE IF NOT EXISTS sampledb.nginx_logs (
  `remote_address` string,
  `date_time` date,
  `method` string,
  `url` string,
  `http_version` string,
  `status_code` int,
  `body_bytes_sent` int,
  `referer` string,
  `user_agent` string 
) PARTITIONED BY (
  dt string 
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1',
  'input.regex' = '^(\\d+\\.\\d+\\.\\d+\\.\\d+) - - \\[(.*)\\] \"(GET|POST|DELETE|PUT|PATCH|HEAD) (.+) (HTTP\\/.+) (\\d{3}) (\\d{3}) \"(-|.+)\" \"(.*)\"'
) LOCATION 's3://bucket/logs/'
TBLPROPERTIES ('has_encrypted_data'='false');

パーティショニングされたS3のデータをロードするには Load Partition (MSCK REPAIR TABLE)を実行する必要があります。

Python(boto3)からクエリを叩いて結果を取得する

Athanaで実行した結果をプログラムから得る場合には、JDBCAPIで取得する事ができます。

Connecting to Amazon Athena with JDBC - Amazon Athena

ただ、JDBCの場合はまだまだ発展途上であり、足りない機能等あるので、APIで取得する方法を紹介します。

  • boto3が必要
import boto3

athena = boto3.client("athena") 

sql = """
SELECT id
FROM schema."table"
WHERE dt = DATE_FORMAT(NOW(), '%Y%m%d')
"""

# start_query_executionでクエリを実行
response = athena.start_query_execution(
    QueryString=sql,
    QueryExecutionContext={ "Database": "schema" },
    ResultConfiguration={
            "OutputLocation": "s3://athena_log/",
            "EncryptionConfiguration": { "EncryptionOption": "SSE_S3" }
    }
)

# QueryExecutionIdを取得
result = athena.get_query_results(
    QueryExecutionId=response["QueryExecutionId"]
)

# QueryExecutionIdを元にクエリ結果を取得
# 次ページあれば"NextToken"に値が入る
result = athena.get_query_results(
    QueryExecutionId=response["QueryExecutionId"],
    NextToken=response["NextToken"]
)

Athena — Boto 3 Docs 1.5.6 documentation

注意点

クエリを実行すると、Historyに履歴が残り、実行結果も保存されるわけですが、これはAthana利用時に最初に指定した、Query result location のS3のバケット内に、メタデータと結果が保存される仕組みのようで、S3からデータを消さない限りは、永久的に残っていきます。 頻繁に実行したり、S3の課金等が気になるのであれば、S3側のバケットの設定でファイルの有効期限を指定しておくと良いかと思います。