ぽこやかざん雑記

データエンジニア / 下町モルモット / 広島カープファン / 深夜の馬鹿力 / おくやま

BigQueryで取得した時系列データをPub/Subに流す

Pub/Sub に対して "Hello World" といったメッセージをメッセージを流すサンプルは公式からも出ているが、もう少し凝った時系列データを流したい時のメモ。
ローカル環境で BigQuery に対してクエリを実行し、クエリ結果をレコードごとに Pub/Sub に流すコードを Python で実装していく。

実現したいこと

検証などで BigQuery 上の TIMESTAMP 型のカラムの値によって Pub/Sub にレコードを流すタイミングを制御したい場合がある。
具体的には TIMESTAMP 型のカラムの値が 2023-11-01 HH:MM:SS レコードが複数あったとして、
2023-11-01 00:15:00 までのデータを Pub/Sub に流し、それ以降のデータはまだ流さない」といったように、擬似的に時系列データの投入タイミングを制御していく。

使用するデータ

bigquery-public-data.chicago_taxi_trips.taxi_trips のデータを取得する。
こちらのテーブルに対して以下のようなクエリを実行して 2023-11-01 分のデータを取得する。

SELECT
    unique_key,
    DATETIME(trip_start_timestamp, 'Asia/Tokyo') AS trip_start_timestamp_jst,
    trip_seconds,
    trip_miles,
    trip_total,
FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips`
WHERE DATE(trip_start_timestamp, 'Asia/Tokyo') = "2023-11-01"
ORDER BY trip_start_timestamp_jst

trip_start_timestamp の値によって Pub/Sub へのデータ投入タイミングを制御する。
trip_start_timestamp の値は 15 分ごとに丸め込まれているので、実行の流れを以下のようにしていく。

  • trip_start_timestamp の値が 2023-11-01 00:00:00 のデータを Pub/Sub に流し1分間スリープ
  • trip_start_timestamp の値が 2023-11-01 00:15:00 のデータを Pub/Sub に流し1分間スリープ
  • trip_start_timestamp の値が 2023-11-01 00:30:00 のデータを Pub/Sub に流し1分間スリープ
  • ...

事前準備

pip で google-cloud-bigquerygoogle-cloud-pubsub をインストール。

$ pip install google-cloud-bigquery
$ pip install google-cloud-pubsub

Pub/Sub のトピックを作成(トピック名: test-pokoyakazan-topic) - Add a default subscriptio にチェックを入れる(${トピック名}-subというサブスクリプションが自動でできる)

全体のコード

引数として、project_id と、投入先となる Pub/Sub の topic_id を受け取る。

bq_to_pubsub.py

import argparse
from datetime import datetime
import json
import time

from google.cloud import bigquery, pubsub_v1

SLEEP_SECOND = 30


def run_bq_query(project_id, query):
    bq_client = bigquery.Client(project=project_id)
    query_job = bq_client.query(query=query)
    query_results = list(query_job.result())
    return query_results


def json_serial(obj):
    if isinstance(obj, datetime):
        return obj.isoformat()
    raise TypeError("Type %s not serializable" % type(obj))


def msg_publish(project_id, topic_id, msg):
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_id)
    publish_msg = msg.encode("utf-8")
    future = publisher.publish(topic_path, publish_msg)
    message_id = future.result(timeout=300)
    print(f"Published {publish_msg.decode()} to {topic_path}: {message_id}")


def publish_query_results_to_pubsub(project_id, topic_id, query_results):
    index = 0
    for hour in range(0, 24):
        for minute in range(0, 60, 15):
            pseudo_now = datetime(
                2023, 11, 1, hour, minute
            )
            while True:
                row = query_results[index]
                trip_start_timestamp_jst = row['trip_start_timestamp_jst']
                if trip_start_timestamp_jst != pseudo_now:
                    break
                row_dict = dict(row.items())
                msg = json.dumps(row_dict, default=json_serial)
                msg_publish(project_id, topic_id, msg)
                index += 1

            print(f'Sleep {SLEEP_SECOND} seconds')
            print(f'Now: {pseudo_now}')
            time.sleep(SLEEP_SECOND)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("project_id", help="Google Cloud project ID")
    parser.add_argument("topic_id", help="Pub/Sub topic ID")
    args = parser.parse_args()

    with open("sql/get_taxi_trips.sql", mode='r') as rf:
        query = rf.read()
    query_results = run_bq_query(args.project_id, query)

    publish_query_results_to_pubsub(args.project_id, args.topic_id, query_results)

sql/get_taxi_trips.sql の中身

SELECT
    unique_key,
    DATETIME(trip_start_timestamp, 'Asia/Tokyo') AS trip_start_timestamp_jst,
    trip_seconds,
    trip_miles,
    trip_total,
FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips`
WHERE DATE(trip_start_timestamp, 'Asia/Tokyo') = "2023-11-01"
ORDER BY trip_start_timestamp_jst

BigQuery 実行部分

BigQuery のクエリを query として受け取り BigQuery クライアント経由でクエリを実行している。

def run_bq_query(project_id, query):
    bq_client = bigquery.Client(project=project_id)
    query_job = bq_client.query(query=query)
    query_results = list(query_job.result())
    return query_results

Pub/Sub 投入部分

投入順序の制御

15分間隔で擬似的な時計を進めて、15分間分投入すると SLEEP_SECOND 秒スリープする。

def publish_query_results_to_pubsub(project_id, topic_id, query_results):
    index = 0
    for hour in range(0, 24):
        for minute in range(0, 60, 15):
            pseudo_now = datetime(
                2023, 11, 1, hour, minute
            )
            while True:
                row = query_results[index]
                trip_start_timestamp_jst = row['trip_start_timestamp_jst']
                if trip_start_timestamp_jst != pseudo_now:
                    break
                row_dict = dict(row.items())
                msg = json.dumps(row_dict, default=json_serial)
                msg_publish(project_id, topic_id, msg)
                index += 1

            print(f'Sleep {SLEEP_SECOND} seconds')
            print(f'Now: {pseudo_now}')
            time.sleep(SLEEP_SECOND)

注意点として、datetime 型の trip_start_timestamp_jstJSON にダンプするために以下の関数で STRING に変換している。

def json_serial(obj):
    if isinstance(obj, datetime):
        return obj.isoformat()
    raise TypeError("Type %s not serializable" % type(obj))

実際の投入

受け取ったメッセージをバイト変換して Pub/Sub トピックに投げる。

def msg_publish(project_id, topic_id, msg):
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_id)
    publish_msg = msg.encode("utf-8")
    future = publisher.publish(topic_path, publish_msg)
    message_id = future.result(timeout=300)
    print(f"Published {publish_msg.decode()} to {topic_path}: {message_id}")

実行

Pub/Sub に投入されたデータを PULL するためのサブスクライバーを用意する。

sub.py

import argparse

from google.cloud import pubsub_v1


def msg_pull(project_id, subscription_id):
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(project_id, subscription_id)

    def callback(message):
        print(f"Received {message}.")
        message.ack()
        print(f"Acknowledged {message.message_id}.")

    future = subscriber.subscribe(
        subscription_path, callback=callback
    )
    print(f"Listening for messages on {subscription_path}..\n")

    try:
        future.result(timeout=300)
    except:
        future.cancel()
        future.result()

    subscriber.close()


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "project_id", help="Google Cloud project ID"
    )
    parser.add_argument(
        "subscription_id", help="Pub/Sub subscription ID"
    )
    args = parser.parse_args()
    msg_pull(args.project_id, args.subscription_id)

サブスクライバー(sub.py)の実行

$ python3 sub.py ${project_id} test-pokoyakazan-topic-sub

パブリッシャー(bq_to_pubsub.py)の実行

$ python3 bq_to_pubsub.py ${project_id} test-pokoyakazan-topic