-
Notifications
You must be signed in to change notification settings - Fork 1
MetadataTables
カラム | 意味 |
---|---|
prism_table_id | PK |
schema_name | Redshiftでのスキーマ名から末尾の _x を除いたもの |
table_name | Redshiftでのテーブル名 |
create_time | 作成時刻 |
merge_interval | マージ間隔(秒) |
physical_schema_name | S3上でスキーマ名に当たる部分 |
physical_table_name | S3上でテーブル名に当たる部分 |
ユニーク制約:
- (schema_name, table_name)
更新アプリケーション:
- strload-ops
- ストリーム初期化のときにinsertする。
Prismテーブル(Prism管理下にあるSpectrumテーブル)を表現するテーブル。
対応するデータオブジェクトはS3の s3://PRISM_BUCKET/HASH.schema_name.table_name/
以下に存在する。
テーブルのリネームに対応できるように physical_{schema,table}_name
があとで追加された。
physical_schema, physical_tableは片方だけでも設定できる。
カラム | 意味 |
---|---|
prism_small_object_id | PK |
prism_staging_object_id | FK: prism_staging_objects: 処理元のstaging object ID |
prism_partition_id | FK: prism_partitions |
delayed | このパーティションが締められたあとに処理されたオブジェクトであればtrue |
content_length | オブジェクトサイズ |
upload_start_time | S3へのアップロードを開始した時刻 |
ユニーク制約:
- (prism_staging_object_id, prism_partition_id)
更新アプリケーション:
- Prism Stream:
- 新しいsmall objectを作成したときにinsert
カラム | 意味 |
---|---|
prism_partition_id | PK |
prism_table_id | FK: prism_tables |
partition_date | パーティション日付 |
current_manifest_version | 現在使われているパーティションマニフェスト。作成時のデフォルト値は -1(= "new" partition)。0 ならばliveでカタログ反映済み、0より大きいならmergedパーティション |
ongoing_manifest_version | 使われていない |
desired_manifest_version | 次に切り替えるべきパーティションマニフェスト。Prism Mergeが新しいmerged objectを作成したときに更新する。作成時のデフォルト値は 0 |
last_live_object_id | このパーティションで最大のlive objectのsmall_object_id。非nullならばこのパーティションは「締まっている」。CatalogCmdが処理開始時に更新する |
switched | マージ済みオブジェクトに切り替えられ(ようとし)ているときに真。CatalogCmdが処理開始時にtrueに更新する |
ユニーク制約:
- (prism_table_id, partition_date)
更新アプリケーション:
- Prism Stream
- パーティションに含まれる最初のsmall objectを作成したときにinsertする。
- Prism Merge
- マージが完了したとき、そのパーティションのdesired_manifest_versionを、マージ済みで最大のupper_boundに更新する。
- CatalogCmd
- パーティションが「前日」になったあと初回の実行で last_live_object_id をセットする。
- パーティションが締められてすべてのlive objectsがマージされたら switched をtrueに更新する。
- 最初のパーティション反映が完了したとき current_manifest_version を 0 に更新する。
- merged partitionの更新が完了したとき current_manifest_version を desired_manifest_version に更新する。
Prismテーブルのパーティションを表現するテーブル。 対応するメタデータはGlue Catalogに存在する。
カラム | 意味 |
---|---|
prism_merge_job_id | プライマリキー |
prism_partition_id | パーティションの ID |
schedule_time | ジョブの実行予定時刻 |
ongoing_mark | マージジョブが未実行の場合は0。マージジョブが実行中の場合はprism_merge_job_idと同じ値 |
heartbeat_time | ジョブが継続中であることを表明するためにワーカーが継続的に更新する時刻。一定時間以上経過していた場合は担当のワーカーが突然死したと判断される。 (ただし現在の実装では継続的な更新はしておらず、ジョブの開始時にしかセットしていない) |
ユニーク制約:
- (prism_partition_id, ongoing_mark) …… 未開始のジョブがパーティションにつき高々1つしかないことを保証する。
更新アプリケーション:
- Prism Stream
- パーティションに含まれる最初のsmall objectを作成したときにinsertする。
- Prism Merge
- ジョブを開始するときongoing_mark, heartbeat_timeを更新する。
- 積み残しがあるとき同じパーティションの行をinsertする。
- ジョブが正常終了したときdeleteする。
Prism Mergeジョブを表すテーブル。 1ジョブに対して1行作成され、ジョブが完了すると削除される。
ユニーク制約があるので、同一パーティションに対応する未実行のジョブは高々1つしか存在できない。 実行中のジョブであれば、同一パーティションに対応するものでも複数存在できる。
カラム | 意味 |
---|---|
prism_merge_range_id | PK |
prism_partition_id | FK: prism_partitions |
lower_bound | merged objectに含まれるsmall_object_idの下限 |
upper_bound | merged objectに含まれるsmall_object_idの上限 |
content_length | merged objectのサイズ |
create_time | この行の作成時刻 |
update_time | この行の更新時刻 |
ユニーク制約:
- (prism_partition_id, lower_bound)
- (prism_partition_id, upper_bound)
チェック制約:
- lower_bound IS NOT NULL OR lower_bound IS NULL AND upper_bound = 0
- upper_bound > lower_bound
更新アプリケーション:
- Prism Merge
- merged objectを作成完了したときに対応する行をupsertする。
- またそのとき、パーティションに対応する行がなかったら (lower_bound, upper_bound) = (null, 0) の行を作成する。
merged objectを表現するテーブル。prism_merge_rangesの1行に対して、
S3には対応するmerged object part-lower_bound-upper_bound.parquet
(各boundは0埋め19桁)が存在する。
パーティションに対応する行がinsertされるとき、 同時に次のような (lower_bound, upper_bound) = (null, 0) の行が作成されるため、 1パーティションにつき必ず0行または2行以上が存在する。1行のときはない。
dwhctl_r 21:39 => select * from prism_merge_ranges where prism_partition_id = 786508561 order by 1;
prism_merge_range_id | prism_partition_id | lower_bound | upper_bound | content_length | create_time | update_time
----------------------+--------------------+-------------+-------------+----------------+----------------------------+----------------------------
5327399 | 786508561 | | 0 | 0 | 2020-04-23 15:59:05.58764 | 2020-04-23 15:59:05.58764
5327400 | 786508561 | 0 | 779346548 | 179987272 | 2020-04-23 15:59:05.58764 | 2020-04-23 15:59:05.58764
5328881 | 786508561 | 779346548 | 779375934 | 134324486 | 2020-04-23 16:53:32.563936 | 2020-04-23 17:46:00.141485
- small objectをS3にアップロードしたらprism_small_objectsの行を追加する。
- そのsmall objectが対応するprism_partitionsの行を追加する。
- そのパーティションに紐付くprism_merge_jobsの行を作成する。
- このときschedule_timeも埋められて、ジョブがスケジュールされる。
- ただし、前述のユニーク制約により、既にジョブが予定されており、それが未実行であった場合はユニーク制約違反が起きる。この場合、なにもしない。 つまり、未予約であれば新規に予約し、既に予約済みであればそれに相乗りする(何も更新しない)という動きをする。
- ジョブを追加する DML: https://github.com/cookpad/prism/blob/bbd8e03b64051c9686f4502f404b66f6a5a30187/shared/src/main/resources/org/bricolages/streaming/stream/PrismMergeJobMapper.xml#L12
▼prism_partitions
prism_partition_id | prism_table_id | partition_date | current_manifest_version | ongoing_manifest_version | desired_manifest_version | last_live_object_id | switched
--------------------+----------------+----------------+--------------------------+--------------------------+--------------------------+---------------------+----------
787268948 | 387 | 2020-05-19 | -1 | -1 | 0 | | f
▼prism_merge_jobs
prism_merge_job_id | prism_partition_id | schedule_time | ongoing_mark | heartbeat_time
--------------------+--------------------+---------------------------+--------------+----------------
788136822 | 787268948 | 2020-04-24 20:37:27.25887 | 0 |
- small objectをS3にアップロードしたらprism_small_objectsの行を追加する。
- Prism Mergeはprism_merge_jobsをポーリングしていて、
schedule_time < now()
となった行を順番に処理する。 - prism_merge_jobs.ongoing_mark に自分の prism_merge_job_id を埋められたら(ロックできたら)処理を開始できる。
- prism_merge_jobs.heartbeat_time に現在時刻を埋めて実際のマージ処理を開始する。
▼prism_merge_jobs
prism_merge_job_id | prism_partition_id | schedule_time | ongoing_mark | heartbeat_time
--------------------+--------------------+----------------------------+--------------+----------------------------
788209556 | 786508561 | 2020-04-24 07:59:04.846598 | 788209556 | 2020-04-24 09:13:02.850943
- merged objectをS3にアップロードしたら prism_merge_ranges に行を追加する。
- 新しい範囲のとき
- create_time = update_time
- set lower_bound = 前のrangeのupper_bound
- set upper_bound = merged objectに含めた最大のsmall_object_id
- 継ぎ足しのとき
- create_time < update_time
- set upper_bound = merged objectに含めた最大のsmall_object_id
- マージ済みオブジェクトのサイズまたはsmall object数が閾値を越えたときは、同じパーティションを処理するprism_merge_jobsの行を作成する(積み残しリトライ)。
- マージジョブが終了したらprism_merge_jobsの行はDELETEされる
- どっかにログを残したい……
▼prism_merge_ranges
dwhctl_r 18:16 => select * from prism_merge_ranges where prism_partition_id = 786508561 order by 1;
prism_merge_range_id | prism_partition_id | lower_bound | upper_bound | content_length | create_time | update_time
----------------------+--------------------+-------------+-------------+----------------+----------------------------+----------------------------
5327399 | 786508561 | | 0 | 0 | 2020-04-23 15:59:05.58764 | 2020-04-23 15:59:05.58764
5327400 | 786508561 | 0 | 779346548 | 179987272 | 2020-04-23 15:59:05.58764 | 2020-04-23 15:59:05.58764
5328881 | 786508561 | 779346548 | 779375934 | 134324486 | 2020-04-23 16:53:32.563936 | 2020-04-23 17:46:00.141485
5329976 | 786508561 | 779375934 | 779416791 | 157931610 | 2020-04-23 17:48:26.041433 | 2020-04-23 18:42:56.9196
5331579 | 786508561 | 779416791 | 779469041 | 181037249 | 2020-04-23 19:36:47.885103 | 2020-04-23 21:28:04.130142
5333394 | 786508561 | 779469041 | 779508214 | 228912040 | 2020-04-23 21:29:15.239126 | 2020-04-23 22:24:23.089588
5334123 | 786508561 | 779508214 | 779545659 | 250083953 | 2020-04-23 22:24:42.127684 | 2020-04-23 23:26:37.085766
5334994 | 786508561 | 779545659 | 779571957 | 253467235 | 2020-04-23 23:27:20.25861 | 2020-04-24 00:29:06.487441
以下略
マージ処理を実行するとき、対象パーティションに紐づくsmall objectが多すぎる場合、 もしくはmerged objectのサイズが大きくなりすぎる場合は、 マージ対象のsmall objectを一気に全部マージせず、一部を積み残す。
大量のオブジェクトマージを避ける理由は、ジョブの実行時間が長くなるとジョブの 失敗確率が上がるし、リトライにかかる時間が増加して実行効率の無駄が増えるためである。 またmerged object のサイズが大きくなりすぎるとSpectrumアクセスが特定ノードに偏るので、 できるだけ同じくらいのサイズのオブジェクトがたくさん並んでいるほうがよい。
積み残しがある場合は、マージジョブ完了時に新しいprism_merge_jobs行をinsertしてリトライする。 またこのときongoing_markは0に戻る。
pv_logやimpressionsのような巨大なログでは積み残しリトライが何度も発生するため、 prism_merge_jobsをモニタリングしていると、同じパーティションのマージジョブが ずっと実行されているように見える。しかし実際には違うジョブである。
- small object の個数が多すぎる場合:
- small object の合計サイズが大きすぎた場合:
- リトライをしている箇所:
- 「締められていて(closed,
last_live_object_id is not null
)」、liveオブジェクトがすべてマージ済みのパーティションのswitchedをtrueに切り替える。- liveオブジェクトのマージが完了しないうちは切り替えられない。
- 一度もマージが起こっていないパーティションは締められても切り替えられない。
- 前日以前の、「締められていない(not closed,
last_live_object_id is null
)」パーティションを締める(パーティション締め処理)。- set last_live_object_id = そのパーティションで現在最大のsmall_object_id
- この処理の以降に作成されるsmall objectはdelayed objectになる。
- すべてのテーブルの定義をGlue Catalogに反映する
- 新しいテーブルやカラムはここで追加される
- not switchedパーティションについて、liveオブジェクトのみを使ってGlue Catalogのパーティション情報を作成・更新する
- 新しいパーティションはこのとき作成され、参照可能になる。
- switchedパーティションについて、live, mergedオブジェクトを使ってGlue Catalogのパーティション情報を作成・更新する
-
current_manifest_version < desired_manifest_version
であるパーティションが更新対象になる - delayed objectはマージされて切り替わることで参照可能になる。
-
merged objectが参照されるようになるにはパーティションを締めたうえでswitchしないといけないので、 暦日が切り替わってから最低2回はCatalogCmdが走らないとパーティションは切り替わらない。 (マージジョブが動いて、liveオブジェクトがすべてマージ完了している必要もある)
- Glue Catalogに新しいテーブル・カラムが作成されるタイミング
- メタデータを更新したあと、初回のCatalogCmd実行のとき。
- Glue Catalogに新しいパーティションが作成されるタイミング
- 新しいパーティションに始めてlive objectが発生したあと初回のCatalogCmd実行のとき。
- パーティションが締められるタイミング
- 日付が変わって初回のCatalogCmd実行のとき。
- パーティションにmergedが使われるようになるタイミング
- パーティションが締められてliveがすべてマージされたあと初回のCatalogCmd実行のとき。