Skip to content

Commit ce411b4

Browse files
committed
comments - functional global hash computation + comments fixes
1 parent 35fe7ea commit ce411b4

File tree

2 files changed

+34
-17
lines changed

2 files changed

+34
-17
lines changed

orchestration/src/main/scala/ai/chronon/orchestration/RepoIndex.scala

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class RepoIndex[T >: Null](proc: ConfProcessor[T]) extends Logging {
5252

5353
/**
5454
* @param fileHashes map of all the file names to their hashes in the branch.
55-
* @param newNodes nodes that aren't present in the index's [[fileHashToContent]] table. New nodes typically.
55+
* @param newNodes nodes that aren't already present in the index. De-duped across branches.
5656
* @param branch branch on which the user is making the changes.
5757
* @param dryRun when true shows potential versions that will be assigned to nodes without modifying the index.
5858
* @return
@@ -74,8 +74,7 @@ class RepoIndex[T >: Null](proc: ConfProcessor[T]) extends Logging {
7474
*
7575
* we use memoization to avoid recomputing global hashes via the [[globalHashes]] map
7676
*/
77-
val globalHashes = mutable.Map.empty[Name, GlobalHash]
78-
def computeGlobalHash(name: Name): GlobalHash = {
77+
def computeGlobalHash(name: Name, globalHashes: mutable.Map[Name, GlobalHash]): GlobalHash = {
7978

8079
if (globalHashes.contains(name)) return globalHashes(name)
8180

@@ -107,7 +106,7 @@ class RepoIndex[T >: Null](proc: ConfProcessor[T]) extends Logging {
107106
// recursively compute parent hashes
108107
val parentHashes = parents
109108
.map { parent =>
110-
val parentHash = computeGlobalHash(parent).hash
109+
val parentHash = computeGlobalHash(parent, globalHashes).hash
111110
s"${parent.name}:$parentHash"
112111
}
113112
.mkString(",")
@@ -123,7 +122,9 @@ class RepoIndex[T >: Null](proc: ConfProcessor[T]) extends Logging {
123122
globalHash
124123
}
125124

126-
fileHashes.foreach { case (name, _) => computeGlobalHash(name) }
125+
val globalHashes = mutable.Map.empty[Name, GlobalHash]
126+
// this line fills global hashes
127+
fileHashes.foreach { case (name, _) => computeGlobalHash(name, globalHashes) }
127128

128129
val existingVersions = branchVersionIndex.getOrElse(branch, mutable.Map.empty)
129130
val mainVersions = branchVersionIndex.getOrElse(Branch.main, mutable.Map.empty)
@@ -146,19 +147,20 @@ class RepoIndex[T >: Null](proc: ConfProcessor[T]) extends Logging {
146147
branchToFileHash.update(branch, enrichedFileHashes)
147148
branchVersionIndex.update(branch, newVersions)
148149

149-
return VersionUpdate.join(newVersions, existingVersions, mainVersions)
150-
}
150+
VersionUpdate.join(newVersions, existingVersions, mainVersions)
151151

152-
// dry run - don't insert into any members of the index
153-
val newVersions = mutable.Map.empty[Name, Version]
154-
globalHashes.foreach {
155-
case (name, globalHash) =>
156-
val versionIndex = versionSequencer.potentialIndex(name, globalHash)
157-
newVersions.update(name, Version("v" + versionIndex.toString))
158-
}
152+
} else {
153+
154+
// dry run - don't insert into any members of the index
155+
val newVersions = mutable.Map.empty[Name, Version]
156+
globalHashes.foreach {
157+
case (name, globalHash) =>
158+
val versionIndex = versionSequencer.potentialIndex(name, globalHash)
159+
newVersions.update(name, Version("v" + versionIndex.toString))
160+
}
159161

160-
val versionUpdates = VersionUpdate.join(newVersions, existingVersions, mainVersions)
161-
versionUpdates
162+
VersionUpdate.join(newVersions, existingVersions, mainVersions)
163+
}
162164
}
163165

164166
def addFiles(fileHashes: mutable.Map[Name, FileHash], updatedFiles: Map[String, String], branch: Branch): Unit = {
@@ -171,7 +173,9 @@ class RepoIndex[T >: Null](proc: ConfProcessor[T]) extends Logging {
171173
addNodes(fileHashes, nodes, branch)
172174
}
173175

174-
// returns the contents of the files not present in the index
176+
/**
177+
* returns the names of the files whose contents haven't been found in the index across any of the versions
178+
*/
175179
def diff(incomingFileHashes: mutable.Map[Name, FileHash]): Seq[Name] = {
176180

177181
incomingFileHashes

orchestration/src/test/scala/ai/chronon/orchestration/test/RepoIndexSpec.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,19 @@ class RepoIndexSpec extends AnyFlatSpec with Matchers with Logging {
164164
)
165165

166166
updateIndex(branchConfs5, Branch.main, "new sq3 and gb3, un-deleted gb2")
167+
168+
val branchConfs6 = Seq(
169+
TestConf("sq1", "v1", "4g", Seq.empty, Seq("t1")) -> "v0",
170+
TestConf("sq3", "v1", "4g", Seq.empty, Seq("t3")) -> "v0",
171+
TestConf("gb1", "v1", "4g", Seq("t1")) -> "v0",
172+
TestConf("gb3", "v1", "4g", Seq("t3")) -> "v0",
173+
TestConf("gb2", "v1", "8g", Seq("t2")) -> "v0",
174+
TestConf("j1", "v1", "4g", Seq("gb1", "gb2", "gb3"), Seq("table_j1")) -> "v3",
175+
TestConf("m1", "v1", "4g", Seq("j1"), Seq("table_m1")) -> "v3",
176+
TestConf("m2", "v1", "4g", Seq("j1"), Seq("table_m2")) -> "v0",
177+
)
178+
179+
updateIndex(branchConfs6, Branch.main, "m2 is added")
167180
}
168181

169182

0 commit comments

Comments
 (0)