Dive into Apache Arrow(その2)- pg2arrow

前回のエントリApache Arrow のフォーマットについて調べていたが、これのゴールは、外部テーブル(Foreign Table)を介してApache Arrowファイルを読み出し、高速に集計・解析処理を実行する事にある。
特にPG-Stromの場合はSSD-to-GPU Direct SQLという飛び道具が使えるため、NVME-SSD上のApache Arrowファイルを直接GPUへ転送し、列指向データをGPUの大量のコアでぐわーーっと処理するという構成が取れるハズである。

で、FDWモジュールがApache Arrowファイルを読むためには、まずメタデータを解読してどの列がどういったデータ型を持っており、どこにどういう形式で配置されているのか特定できる必要がある。
そのために書いたコードを元に、先ずPostgreSQLのデータをApache Arrowファイルとしてダンプするためのツールを作ってみた。



ある程度 psqlpg_dump のオプションを参考にしたので、PostgreSQL使いの人ならそれほど違和感なく使えるはず。
データベースやユーザ名などの指定は共通。-c COMMAND-f FILENAMEで指定したSQLを実行し、その結果を-o FILENAMEで指定したファイルへ書き出す。

$ ./pg2arrow --help
  pg2arrow [OPTION]... [DBNAME [USERNAME]]

General options:
  -d, --dbname=DBNAME     database name to connect to
  -c, --command=COMMAND   SQL command to run
  -f, --file=FILENAME     SQL command from file
      (-c and -f are exclusive, either of them must be specified)
  -o, --output=FILENAME   result file in Apache Arrow format
      (default creates a temporary file)

Arrow format options:
  -s, --segment-size=SIZE size of record batch for each
      (default is 512MB)

Connection options:
  -h, --host=HOSTNAME     database server host
  -p, --port=PORT         database server port
  -U, --username=USERNAME database user name
  -w, --no-password       never prompt for password
  -W, --password          force password prompt

Debug options:
      --dump=FILENAME     dump information of arrow file
      --progress          shows progress of the job.

Report bugs to <pgstrom@heterodb.com>.



hogehogeテーブルの定義は以下の通り。c列は複合型(composite type)で、内部的にサブフィールドを持つ。

postgres=# \d hogehoge
                  Table "public.hogehoge"
 Column |       Type       | Collation | Nullable | Default
 id     | integer          |           | not null |
 a      | bigint           |           | not null |
 b      | double precision |           | not null |
 c      | comp             |           |          |
 d      | text             |           |          |
 e      | double precision |           |          |
 ymd    | date             |           |          |
    "hogehoge_pkey" PRIMARY KEY, btree (id)

postgres=# \dS comp
                Composite type "public.comp"
 Column |       Type       | Collation | Nullable | Default
 x      | integer          |           |          |
 y      | double precision |           |          |
 z      | numeric          |           |          |
 memo   | text             |           |          |


$ ./pg2arrow postgres -o /tmp/hogehoge -c "SELECT * FROM hogehoge ORDER BY ymd"


以下のように、PyArrowを用いて Apache Arrow 形式のファイルを読み出す事ができる。

>>> import pyarrow as pa
>>> X = pa.RecordBatchFileReader("/tmp/hogehoge").read_all()
>>> X.schema
id: int32
a: int64
b: double
c: struct<x: int32, y: double, z: decimal(30, 11), memo: string>
  child 0, x: int32
  child 1, y: double
  child 2, z: decimal(30, 11)
  child 3, memo: string
d: string
e: double
ymd: date32[day]

DBテーブルの定義に準じて、Apache Arrowとしてのスキーマが作られている事が分かる。


>>> A = pd.read_sql(sql="SELECT * FROM hogehoge LIMIT 1000", con="postgresql://localhost/postgres")
>>> B = pa.Table.from_pandas(A)
>>> B.schema
id: int64
a: int64
b: double
c: string
d: string
e: double
ymd: date32[day]
__index_level_0__: int64
{b'pandas': b'{"index_columns": ["__index_level_0__"], "column_indexes": [{"fi'
            b'eld_name": null, "name": null, "numpy_type": "object", "pandas_t'
            b'ype": "unicode", "metadata": {"encoding": "UTF-8"}}], "columns":'
            b' [{"field_name": "id", "name": "id", "numpy_type": "int64", "pan'
            b'das_type": "int64", "metadata": null}, {"field_name": "a", "name'
            b'": "a", "numpy_type": "int64", "pandas_type": "int64", "metadata'
            b'": null}, {"field_name": "b", "name": "b", "numpy_type": "float6'
            b'4", "pandas_type": "float64", "metadata": null}, {"field_name": '
            b'"c", "name": "c", "numpy_type": "object", "pandas_type": "unicod'
            b'e", "metadata": null}, {"field_name": "d", "name": "d", "numpy_t'
            b'ype": "object", "pandas_type": "unicode", "metadata": null}, {"f'
            b'ield_name": "e", "name": "e", "numpy_type": "float64", "pandas_t'
            b'ype": "float64", "metadata": null}, {"field_name": "ymd", "name"'
            b': "ymd", "numpy_type": "object", "pandas_type": "date", "metadat'
            b'a": null}, {"field_name": "__index_level_0__", "name": null, "nu'
            b'mpy_type": "int64", "pandas_type": "int64", "metadata": null}], '
            b'"pandas_version": "0.22.0"}'}


>>> X.to_pandas()
       id     a          b                                                  c  \
0      24  3379  96.200935  {'memo': '1ff1de774005f8da13f42943881c655f', '...
1    2041  2208  71.122772  {'memo': '3416a75f4cea9109507cacd8e2f2aefc', '...
2    2042  7040  54.081142  {'memo': 'a1d0c6e83f027327d8461063f4ac58a6', '...
3    2043  1635  92.302224  {'memo': '17e62166fc8586dfa4d1bc0e1742c08b', '...
4    2044  3295  58.273429  {'memo': 'f7177163c833dff4b38fc8d2872f1ec6', '...
5    2045  9671  58.085893  {'memo': '6c8349cc7260ae62e3b1396831a8398f', '...
        :          :              :
                                    d          e        ymd
0    f4c1893e352a4e08d1a3b3b444c2d692  34.916724 2025-08-08
1    13f5d46a9f51e30dac02b33b74e9043e  11.098757 2018-09-26
2    b041a9cb97b220c1b073266af31cb45f  81.570772 2025-03-02
3    77e882ed95bf5838abd1b4336a7d2fdc  16.990162 2016-12-29
4    f78d9fde23891ae293f07c576982155b   7.017451 2020-10-08
5    4dfe9131c7ee3a44583a8d21d5ca26a2   3.979350 2023-03-09


ひとまず、pg2arrowを使う事で PostgreSQLApache Arrow へデータを変換する流れができた。

次は本来の目標である、FDWを使って Apache Arrow ⇒ PostgreSQL へのデータの流れを作る事。
