にせねこメモ

はてなダイアリーがUTF-8じゃないので移ってきました。

Amazon Timestreamからboto3で一度に大量のデータを取得しようとしたが空データが返ってきた

問題の概要

AWSAmazon Timestreamというデータベースにセンサーデータを蓄積している。このデータをローカルにダウンロードして利用したい。
Pythonとboto3ライブラリを利用して、Timestreamのデータベースからそこそこ大量のデータを取得しようとしたところ、レスポンスのRowsが空だった。
データがある程度小さくなるクエリでやってみると、問題なくデータが取得できるのが確認できた。

原因

TimestreamQuety.Client.query()を呼び出す際に、MaxRowsキーワード引数を指定しないと、取得結果のサイズが1MB以上になる場合に、Rowsが空の状態でレスポンスが返ってくる。その代わり、NextTokenがレスポンスに含まれる。

Otherwise, the initial invocation of Query only returns a NextToken , which can then be used in subsequent calls to fetch the result set. To resume pagination, provide the NextToken value in the subsequent command.

TimestreamQuery — Boto3 Docs 1.21.37 documentation

要するに、取得するデータがでかいと1回で取得することができないようになっている。

対応

ページネーションする。

TimestreamQuety.Client.query()NextTokenを指定して呼び出すと、サイズが1MB未満になるような個数のデータ(Rows)が返される。
更にまだ未取得のデータがある場合には、次のデータに対応するNextTokenもレスポンスに含まれる。データをすべて取得してしまった場合には、レスポンスにはNextTokenが存在しないので、終了判定にも使える。


実際にはこういう手順になる。

  1. 取得したいデータが1MB以上になる場合、最初に呼び出すとNextTokenと空のRowsが得られる。
  2. このNextTokenを利用して1MB程度分のデータ(Rows)と次のデータに対応するNextTokenを得る。
  3. これをNextTokenがレスポンスに含まれなくなるまで続け、今までに返されたRowsをすべて合体させれば欲しいデータが得られる。

Python 3コード

Rowsだけ連結して返すみたいなPython 3コードを示す。

import json
import boto3
from botocore.config import Config

#リージョン名, ID, secret keyは省略
config = Config(region_name = '……') 
config.endpoint_discovery_enabled = True
timestream_query_client = boto3.client('timestream-query', 
        aws_access_key_id="……",
        aws_secret_access_key="……",
        config=config)

#クエリの例
QUERY = """SELECT "time", "measure_name", "measure_value::double" FROM "mydatabase"."mytable" AND time between '2021-01-01 00:00' and '2022-01-01 00:00' ORDER BY time ASC""" 

#データ取得用の関数
def getdata(next_token = None):
    if next_token: #next_tokenが指定された場合はNextTokenを指定してクエリ
        result = timestream_query_client.query(
            QueryString=QUERY,
            NextToken=next_token
        )
    else: #next_tokenなし=初回呼び出し
        result = timestream_query_client.query(
            QueryString=QUERY
        )
    
    ret_rows = result["Rows"] #list of data
    if "NextToken" in result: #次のページ(未取得データ)が存在する場合
        return ret_rows + getdata(result["NextToken"]) #再帰呼び出し
    else: #欲しいデータはすべて取得した場合
        return ret_rows 

rows = getdata()
print(len(rows))
print(rows[0])
問題点

途中経過を保存していないので、ダウンロード途中で失敗した際に、やり直すときには最初から取得しなおしになる。
かなりたくさんのデータを取ってくる必要があるのであれば、呼び出しの度に毎回(あるいは何回かに一度)データとNextTokenを保存するなどとして、レジュームができるようにした方がよさそう。

また、再帰呼び出ししてるけど末尾再帰になってないので再帰深くなると大変かもしれない。メモリも食うし。普通にループで処理した方が適してるかも。