Skip to content

MetadataTables

Minero Aoki edited this page Nov 18, 2021 · 1 revision

各テーブルの意味

prism_tables

カラム 意味
prism_table_id PK
schema_name Redshiftでのスキーマ名から末尾の _x を除いたもの
table_name Redshiftでのテーブル名
create_time 作成時刻
merge_interval マージ間隔(秒)
physical_schema_name S3上でスキーマ名に当たる部分
physical_table_name S3上でテーブル名に当たる部分

ユニーク制約:

  • (schema_name, table_name)

更新アプリケーション:

  • strload-ops
    • ストリーム初期化のときにinsertする。

Prismテーブル(Prism管理下にあるSpectrumテーブル)を表現するテーブル。 対応するデータオブジェクトはS3の s3://PRISM_BUCKET/HASH.schema_name.table_name/ 以下に存在する。

テーブルのリネームに対応できるように physical_{schema,table}_name があとで追加された。 physical_schema, physical_tableは片方だけでも設定できる。

prism_small_objects

カラム 意味
prism_small_object_id PK
prism_staging_object_id FK: prism_staging_objects: 処理元のstaging object ID
prism_partition_id FK: prism_partitions
delayed このパーティションが締められたあとに処理されたオブジェクトであればtrue
content_length オブジェクトサイズ
upload_start_time S3へのアップロードを開始した時刻

ユニーク制約:

  • (prism_staging_object_id, prism_partition_id)

更新アプリケーション:

  • Prism Stream:
    • 新しいsmall objectを作成したときにinsert

prism_partitions

カラム 意味
prism_partition_id PK
prism_table_id FK: prism_tables
partition_date パーティション日付
current_manifest_version 現在使われているパーティションマニフェスト。作成時のデフォルト値は -1(= "new" partition)。0 ならばliveでカタログ反映済み、0より大きいならmergedパーティション
ongoing_manifest_version 使われていない
desired_manifest_version 次に切り替えるべきパーティションマニフェスト。Prism Mergeが新しいmerged objectを作成したときに更新する。作成時のデフォルト値は 0
last_live_object_id このパーティションで最大のlive objectのsmall_object_id。非nullならばこのパーティションは「締まっている」。CatalogCmdが処理開始時に更新する
switched マージ済みオブジェクトに切り替えられ(ようとし)ているときに真。CatalogCmdが処理開始時にtrueに更新する

ユニーク制約:

  • (prism_table_id, partition_date)

更新アプリケーション:

  • Prism Stream
    • パーティションに含まれる最初のsmall objectを作成したときにinsertする。
  • Prism Merge
    • マージが完了したとき、そのパーティションのdesired_manifest_versionを、マージ済みで最大のupper_boundに更新する。
  • CatalogCmd
    • パーティションが「前日」になったあと初回の実行で last_live_object_id をセットする。
    • パーティションが締められてすべてのlive objectsがマージされたら switched をtrueに更新する。
    • 最初のパーティション反映が完了したとき current_manifest_version を 0 に更新する。
    • merged partitionの更新が完了したとき current_manifest_version を desired_manifest_version に更新する。

Prismテーブルのパーティションを表現するテーブル。 対応するメタデータはGlue Catalogに存在する。

prism_merge_jobs

カラム 意味
prism_merge_job_id プライマリキー
prism_partition_id パーティションの ID
schedule_time ジョブの実行予定時刻
ongoing_mark マージジョブが未実行の場合は0。マージジョブが実行中の場合はprism_merge_job_idと同じ値
heartbeat_time ジョブが継続中であることを表明するためにワーカーが継続的に更新する時刻。一定時間以上経過していた場合は担当のワーカーが突然死したと判断される。
(ただし現在の実装では継続的な更新はしておらず、ジョブの開始時にしかセットしていない)

ユニーク制約:

  • (prism_partition_id, ongoing_mark) …… 未開始のジョブがパーティションにつき高々1つしかないことを保証する。

更新アプリケーション:

  • Prism Stream
    • パーティションに含まれる最初のsmall objectを作成したときにinsertする。
  • Prism Merge
    • ジョブを開始するときongoing_mark, heartbeat_timeを更新する。
    • 積み残しがあるとき同じパーティションの行をinsertする。
    • ジョブが正常終了したときdeleteする。

Prism Mergeジョブを表すテーブル。 1ジョブに対して1行作成され、ジョブが完了すると削除される。

ユニーク制約があるので、同一パーティションに対応する未実行のジョブは高々1つしか存在できない。 実行中のジョブであれば、同一パーティションに対応するものでも複数存在できる。

prism_merge_ranges

カラム 意味
prism_merge_range_id PK
prism_partition_id FK: prism_partitions
lower_bound merged objectに含まれるsmall_object_idの下限
upper_bound merged objectに含まれるsmall_object_idの上限
content_length merged objectのサイズ
create_time この行の作成時刻
update_time この行の更新時刻

ユニーク制約:

  • (prism_partition_id, lower_bound)
  • (prism_partition_id, upper_bound)

チェック制約:

  • lower_bound IS NOT NULL OR lower_bound IS NULL AND upper_bound = 0
  • upper_bound > lower_bound

更新アプリケーション:

  • Prism Merge
    • merged objectを作成完了したときに対応する行をupsertする。
    • またそのとき、パーティションに対応する行がなかったら (lower_bound, upper_bound) = (null, 0) の行を作成する。

merged objectを表現するテーブル。prism_merge_rangesの1行に対して、 S3には対応するmerged object part-lower_bound-upper_bound.parquet (各boundは0埋め19桁)が存在する。

パーティションに対応する行がinsertされるとき、 同時に次のような (lower_bound, upper_bound) = (null, 0) の行が作成されるため、 1パーティションにつき必ず0行または2行以上が存在する。1行のときはない。

dwhctl_r 21:39 => select * from prism_merge_ranges where prism_partition_id = 786508561 order by 1;
 prism_merge_range_id | prism_partition_id | lower_bound | upper_bound | content_length |        create_time         |        update_time         
----------------------+--------------------+-------------+-------------+----------------+----------------------------+----------------------------
              5327399 |          786508561 |             |           0 |              0 | 2020-04-23 15:59:05.58764  | 2020-04-23 15:59:05.58764
              5327400 |          786508561 |           0 |   779346548 |      179987272 | 2020-04-23 15:59:05.58764  | 2020-04-23 15:59:05.58764
              5328881 |          786508561 |   779346548 |   779375934 |      134324486 | 2020-04-23 16:53:32.563936 | 2020-04-23 17:46:00.141485

Parquetマージ処理全体における各テーブルの状態遷移

1. Prism Streamがパーティションの最初のオブジェクトを処理する

  • small objectをS3にアップロードしたらprism_small_objectsの行を追加する。
  • そのsmall objectが対応するprism_partitionsの行を追加する。
  • そのパーティションに紐付くprism_merge_jobsの行を作成する。

▼prism_partitions

 prism_partition_id | prism_table_id | partition_date | current_manifest_version | ongoing_manifest_version | desired_manifest_version | last_live_object_id | switched 
--------------------+----------------+----------------+--------------------------+--------------------------+--------------------------+---------------------+----------
          787268948 |            387 | 2020-05-19     |                       -1 |                       -1 |                        0 |                     | f

▼prism_merge_jobs

 prism_merge_job_id | prism_partition_id |       schedule_time       | ongoing_mark | heartbeat_time 
--------------------+--------------------+---------------------------+--------------+----------------
          788136822 |          787268948 | 2020-04-24 20:37:27.25887 |            0 | 

2. Prism Streamが既存のパーティションのオブジェクトを処理する

  • small objectをS3にアップロードしたらprism_small_objectsの行を追加する。

3. スケジュール時間となり、Prism Mergeジョブが起動

  • Prism Mergeはprism_merge_jobsをポーリングしていて、 schedule_time < now() となった行を順番に処理する。
  • prism_merge_jobs.ongoing_mark に自分の prism_merge_job_id を埋められたら(ロックできたら)処理を開始できる。
  • prism_merge_jobs.heartbeat_time に現在時刻を埋めて実際のマージ処理を開始する。

▼prism_merge_jobs

 prism_merge_job_id | prism_partition_id |       schedule_time        | ongoing_mark |       heartbeat_time       
--------------------+--------------------+----------------------------+--------------+----------------------------
          788209556 |          786508561 | 2020-04-24 07:59:04.846598 |    788209556 | 2020-04-24 09:13:02.850943
  • merged objectをS3にアップロードしたら prism_merge_ranges に行を追加する。
  • 新しい範囲のとき
    • create_time = update_time
    • set lower_bound = 前のrangeのupper_bound
    • set upper_bound = merged objectに含めた最大のsmall_object_id
  • 継ぎ足しのとき
    • create_time < update_time
    • set upper_bound = merged objectに含めた最大のsmall_object_id
  • マージ済みオブジェクトのサイズまたはsmall object数が閾値を越えたときは、同じパーティションを処理するprism_merge_jobsの行を作成する(積み残しリトライ)。
  • マージジョブが終了したらprism_merge_jobsの行はDELETEされる
    • どっかにログを残したい……

▼prism_merge_ranges

dwhctl_r 18:16 => select * from prism_merge_ranges where prism_partition_id = 786508561 order by 1;
 prism_merge_range_id | prism_partition_id | lower_bound | upper_bound | content_length |        create_time         |        update_time         
----------------------+--------------------+-------------+-------------+----------------+----------------------------+----------------------------
              5327399 |          786508561 |             |           0 |              0 | 2020-04-23 15:59:05.58764  | 2020-04-23 15:59:05.58764
              5327400 |          786508561 |           0 |   779346548 |      179987272 | 2020-04-23 15:59:05.58764  | 2020-04-23 15:59:05.58764
              5328881 |          786508561 |   779346548 |   779375934 |      134324486 | 2020-04-23 16:53:32.563936 | 2020-04-23 17:46:00.141485
              5329976 |          786508561 |   779375934 |   779416791 |      157931610 | 2020-04-23 17:48:26.041433 | 2020-04-23 18:42:56.9196
              5331579 |          786508561 |   779416791 |   779469041 |      181037249 | 2020-04-23 19:36:47.885103 | 2020-04-23 21:28:04.130142
              5333394 |          786508561 |   779469041 |   779508214 |      228912040 | 2020-04-23 21:29:15.239126 | 2020-04-23 22:24:23.089588
              5334123 |          786508561 |   779508214 |   779545659 |      250083953 | 2020-04-23 22:24:42.127684 | 2020-04-23 23:26:37.085766
              5334994 |          786508561 |   779545659 |   779571957 |      253467235 | 2020-04-23 23:27:20.25861  | 2020-04-24 00:29:06.487441
以下略

3-2. マージオブジェクトの分割(積み残しリトライ)

マージ処理を実行するとき、対象パーティションに紐づくsmall objectが多すぎる場合、 もしくはmerged objectのサイズが大きくなりすぎる場合は、 マージ対象のsmall objectを一気に全部マージせず、一部を積み残す。

大量のオブジェクトマージを避ける理由は、ジョブの実行時間が長くなるとジョブの 失敗確率が上がるし、リトライにかかる時間が増加して実行効率の無駄が増えるためである。 またmerged object のサイズが大きくなりすぎるとSpectrumアクセスが特定ノードに偏るので、 できるだけ同じくらいのサイズのオブジェクトがたくさん並んでいるほうがよい。

積み残しがある場合は、マージジョブ完了時に新しいprism_merge_jobs行をinsertしてリトライする。 またこのときongoing_markは0に戻る。

pv_logやimpressionsのような巨大なログでは積み残しリトライが何度も発生するため、 prism_merge_jobsをモニタリングしていると、同じパーティションのマージジョブが ずっと実行されているように見える。しかし実際には違うジョブである。

4. 定期起動するCatalogCmdがパーティション情報をGlue Catalogに反映

  • 「締められていて(closed, last_live_object_id is not null )」、liveオブジェクトがすべてマージ済みのパーティションのswitchedをtrueに切り替える。
    • liveオブジェクトのマージが完了しないうちは切り替えられない。
    • 一度もマージが起こっていないパーティションは締められても切り替えられない。
  • 前日以前の、「締められていない(not closed, last_live_object_id is null )」パーティションを締める(パーティション締め処理)。
    • set last_live_object_id = そのパーティションで現在最大のsmall_object_id
    • この処理の以降に作成されるsmall objectはdelayed objectになる。
  • すべてのテーブルの定義をGlue Catalogに反映する
    • 新しいテーブルやカラムはここで追加される
  • not switchedパーティションについて、liveオブジェクトのみを使ってGlue Catalogのパーティション情報を作成・更新する
    • 新しいパーティションはこのとき作成され、参照可能になる。
  • switchedパーティションについて、live, mergedオブジェクトを使ってGlue Catalogのパーティション情報を作成・更新する
    • current_manifest_version < desired_manifest_version であるパーティションが更新対象になる
    • delayed objectはマージされて切り替わることで参照可能になる。

merged objectが参照されるようになるにはパーティションを締めたうえでswitchしないといけないので、 暦日が切り替わってから最低2回はCatalogCmdが走らないとパーティションは切り替わらない。 (マージジョブが動いて、liveオブジェクトがすべてマージ完了している必要もある)

4-2. 各種処理が走るタイミング

  • Glue Catalogに新しいテーブル・カラムが作成されるタイミング
    • メタデータを更新したあと、初回のCatalogCmd実行のとき。
  • Glue Catalogに新しいパーティションが作成されるタイミング
    • 新しいパーティションに始めてlive objectが発生したあと初回のCatalogCmd実行のとき。
  • パーティションが締められるタイミング
    • 日付が変わって初回のCatalogCmd実行のとき。
  • パーティションにmergedが使われるようになるタイミング
    • パーティションが締められてliveがすべてマージされたあと初回のCatalogCmd実行のとき。