Pub/Sub に対して "Hello World" といったメッセージをメッセージを流すサンプルは公式からも出ているが、もう少し凝った時系列データを流したい時のメモ。
ローカル環境で BigQuery に対してクエリを実行し、クエリ結果をレコードごとに Pub/Sub に流すコードを Python で実装していく。
実現したいこと
検証などで BigQuery 上の TIMESTAMP 型のカラムの値によって Pub/Sub にレコードを流すタイミングを制御したい場合がある。
具体的には TIMESTAMP 型のカラムの値が 2023-11-01 HH:MM:SS
レコードが複数あったとして、
「2023-11-01 00:15:00
までのデータを Pub/Sub に流し、それ以降のデータはまだ流さない」といったように、擬似的に時系列データの投入タイミングを制御していく。
使用するデータ
bigquery-public-data.chicago_taxi_trips.taxi_trips のデータを取得する。
こちらのテーブルに対して以下のようなクエリを実行して 2023-11-01 分のデータを取得する。
SELECT unique_key, DATETIME(trip_start_timestamp, 'Asia/Tokyo') AS trip_start_timestamp_jst, trip_seconds, trip_miles, trip_total, FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips` WHERE DATE(trip_start_timestamp, 'Asia/Tokyo') = "2023-11-01" ORDER BY trip_start_timestamp_jst
trip_start_timestamp
の値によって Pub/Sub へのデータ投入タイミングを制御する。
trip_start_timestamp
の値は 15 分ごとに丸め込まれているので、実行の流れを以下のようにしていく。
trip_start_timestamp
の値が2023-11-01 00:00:00
のデータを Pub/Sub に流し1分間スリープtrip_start_timestamp
の値が2023-11-01 00:15:00
のデータを Pub/Sub に流し1分間スリープtrip_start_timestamp
の値が2023-11-01 00:30:00
のデータを Pub/Sub に流し1分間スリープ- ...
事前準備
pip で google-cloud-bigquery
と google-cloud-pubsub
をインストール。
$ pip install google-cloud-bigquery $ pip install google-cloud-pubsub
Pub/Sub のトピックを作成(トピック名: test-pokoyakazan-topic
)
- Add a default subscriptio
にチェックを入れる(${トピック名}-sub
というサブスクリプションが自動でできる)
全体のコード
引数として、project_id
と、投入先となる Pub/Sub の topic_id
を受け取る。
bq_to_pubsub.py
import argparse from datetime import datetime import json import time from google.cloud import bigquery, pubsub_v1 SLEEP_SECOND = 30 def run_bq_query(project_id, query): bq_client = bigquery.Client(project=project_id) query_job = bq_client.query(query=query) query_results = list(query_job.result()) return query_results def json_serial(obj): if isinstance(obj, datetime): return obj.isoformat() raise TypeError("Type %s not serializable" % type(obj)) def msg_publish(project_id, topic_id, msg): publisher = pubsub_v1.PublisherClient() topic_path = publisher.topic_path(project_id, topic_id) publish_msg = msg.encode("utf-8") future = publisher.publish(topic_path, publish_msg) message_id = future.result(timeout=300) print(f"Published {publish_msg.decode()} to {topic_path}: {message_id}") def publish_query_results_to_pubsub(project_id, topic_id, query_results): index = 0 for hour in range(0, 24): for minute in range(0, 60, 15): pseudo_now = datetime( 2023, 11, 1, hour, minute ) while True: row = query_results[index] trip_start_timestamp_jst = row['trip_start_timestamp_jst'] if trip_start_timestamp_jst != pseudo_now: break row_dict = dict(row.items()) msg = json.dumps(row_dict, default=json_serial) msg_publish(project_id, topic_id, msg) index += 1 print(f'Sleep {SLEEP_SECOND} seconds') print(f'Now: {pseudo_now}') time.sleep(SLEEP_SECOND) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("project_id", help="Google Cloud project ID") parser.add_argument("topic_id", help="Pub/Sub topic ID") args = parser.parse_args() with open("sql/get_taxi_trips.sql", mode='r') as rf: query = rf.read() query_results = run_bq_query(args.project_id, query) publish_query_results_to_pubsub(args.project_id, args.topic_id, query_results)
sql/get_taxi_trips.sql
の中身
SELECT unique_key, DATETIME(trip_start_timestamp, 'Asia/Tokyo') AS trip_start_timestamp_jst, trip_seconds, trip_miles, trip_total, FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips` WHERE DATE(trip_start_timestamp, 'Asia/Tokyo') = "2023-11-01" ORDER BY trip_start_timestamp_jst
BigQuery 実行部分
BigQuery のクエリを query
として受け取り BigQuery クライアント経由でクエリを実行している。
def run_bq_query(project_id, query): bq_client = bigquery.Client(project=project_id) query_job = bq_client.query(query=query) query_results = list(query_job.result()) return query_results
Pub/Sub 投入部分
投入順序の制御
15分間隔で擬似的な時計を進めて、15分間分投入すると SLEEP_SECOND
秒スリープする。
def publish_query_results_to_pubsub(project_id, topic_id, query_results): index = 0 for hour in range(0, 24): for minute in range(0, 60, 15): pseudo_now = datetime( 2023, 11, 1, hour, minute ) while True: row = query_results[index] trip_start_timestamp_jst = row['trip_start_timestamp_jst'] if trip_start_timestamp_jst != pseudo_now: break row_dict = dict(row.items()) msg = json.dumps(row_dict, default=json_serial) msg_publish(project_id, topic_id, msg) index += 1 print(f'Sleep {SLEEP_SECOND} seconds') print(f'Now: {pseudo_now}') time.sleep(SLEEP_SECOND)
注意点として、datetime
型の trip_start_timestamp_jst
を JSON にダンプするために以下の関数で STRING に変換している。
def json_serial(obj): if isinstance(obj, datetime): return obj.isoformat() raise TypeError("Type %s not serializable" % type(obj))
実際の投入
受け取ったメッセージをバイト変換して Pub/Sub トピックに投げる。
def msg_publish(project_id, topic_id, msg): publisher = pubsub_v1.PublisherClient() topic_path = publisher.topic_path(project_id, topic_id) publish_msg = msg.encode("utf-8") future = publisher.publish(topic_path, publish_msg) message_id = future.result(timeout=300) print(f"Published {publish_msg.decode()} to {topic_path}: {message_id}")
実行
Pub/Sub に投入されたデータを PULL するためのサブスクライバーを用意する。
sub.py
import argparse from google.cloud import pubsub_v1 def msg_pull(project_id, subscription_id): subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path(project_id, subscription_id) def callback(message): print(f"Received {message}.") message.ack() print(f"Acknowledged {message.message_id}.") future = subscriber.subscribe( subscription_path, callback=callback ) print(f"Listening for messages on {subscription_path}..\n") try: future.result(timeout=300) except: future.cancel() future.result() subscriber.close() if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( "project_id", help="Google Cloud project ID" ) parser.add_argument( "subscription_id", help="Pub/Sub subscription ID" ) args = parser.parse_args() msg_pull(args.project_id, args.subscription_id)
サブスクライバー(sub.py
)の実行
$ python3 sub.py ${project_id} test-pokoyakazan-topic-sub
パブリッシャー(bq_to_pubsub.py
)の実行
$ python3 bq_to_pubsub.py ${project_id} test-pokoyakazan-topic