Skip to content

Commit 88550e4

Browse files
fix(hash join): avoid emitting chunks that violate UpdateDelete ass… (risingwavelabs#8579)
1 parent dc76ad7 commit 88550e4

File tree

1 file changed

+6
-6
lines changed

1 file changed

+6
-6
lines changed

src/stream/src/executor/hash_join.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -922,19 +922,19 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
922922
}
923923
if degree == 0 {
924924
if let Some(chunk) =
925-
hashjoin_chunk_builder.forward_if_not_matched(op, row)
925+
hashjoin_chunk_builder.forward_if_not_matched(Op::Insert, row)
926926
{
927927
yield chunk;
928928
}
929929
} else if let Some(chunk) =
930-
hashjoin_chunk_builder.forward_exactly_once_if_matched(op, row)
930+
hashjoin_chunk_builder.forward_exactly_once_if_matched(Op::Insert, row)
931931
{
932932
yield chunk;
933933
}
934934
// Insert back the state taken from ht.
935935
side_match.ht.update_state(key, matched_rows);
936936
} else if let Some(chunk) =
937-
hashjoin_chunk_builder.forward_if_not_matched(op, row)
937+
hashjoin_chunk_builder.forward_if_not_matched(Op::Insert, row)
938938
{
939939
yield chunk;
940940
}
@@ -990,19 +990,19 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
990990
}
991991
if degree == 0 {
992992
if let Some(chunk) =
993-
hashjoin_chunk_builder.forward_if_not_matched(op, row)
993+
hashjoin_chunk_builder.forward_if_not_matched(Op::Delete, row)
994994
{
995995
yield chunk;
996996
}
997997
} else if let Some(chunk) =
998-
hashjoin_chunk_builder.forward_exactly_once_if_matched(op, row)
998+
hashjoin_chunk_builder.forward_exactly_once_if_matched(Op::Delete, row)
999999
{
10001000
yield chunk;
10011001
}
10021002
// Insert back the state taken from ht.
10031003
side_match.ht.update_state(key, matched_rows);
10041004
} else if let Some(chunk) =
1005-
hashjoin_chunk_builder.forward_if_not_matched(op, row)
1005+
hashjoin_chunk_builder.forward_if_not_matched(Op::Delete, row)
10061006
{
10071007
yield chunk;
10081008
}

0 commit comments

Comments
 (0)