17
17
18
18
package org .apache .streampark .flink .packer .pipeline .impl
19
19
20
+ import com .github .dockerjava .api .DockerClient
20
21
import org .apache .streampark .common .fs .LfsOperator
21
22
import org .apache .streampark .common .util .ThreadUtils
22
23
import org .apache .streampark .flink .kubernetes .PodTemplateTool
@@ -34,6 +35,7 @@ import org.apache.commons.lang3.StringUtils
34
35
import java .io .File
35
36
import java .util .concurrent .{LinkedBlockingQueue , ThreadPoolExecutor , TimeUnit }
36
37
38
+ import org .apache .streampark .common .util .Implicits ._
37
39
import scala .concurrent .{ExecutionContext , Future }
38
40
39
41
/** Building pipeline for flink kubernetes-native application mode */
@@ -139,29 +141,34 @@ class FlinkK8sApplicationBuildPipeline(request: FlinkK8sApplicationBuildRequest)
139
141
execStep(5 ) {
140
142
usingDockerClient {
141
143
dockerClient =>
142
- val pullImageCmd = {
143
- // when the register address prefix is explicitly identified on base image tag,
144
- // the user's pre-saved docker register auth info would be used.
145
- val pullImageCmdState =
146
- dockerConf.registerAddress != null && ! baseImageTag.startsWith(
147
- dockerConf.registerAddress)
148
- if (pullImageCmdState) {
149
- dockerClient.pullImageCmd(baseImageTag)
150
- } else {
151
- dockerClient
152
- .pullImageCmd(baseImageTag)
153
- .withAuthConfig(dockerConf.toAuthConf)
144
+ val imgExists = dockerClient.listImagesCmd().exec().exists(_.getRepoTags.exists(_.contains(baseImageTag)))
145
+ if (imgExists) {
146
+ logInfo(s " found local docker image $baseImageTag, no need to pull from remote. " )
147
+ } else {
148
+ val pullImageCmd = {
149
+ // when the register address prefix is explicitly identified on base image tag,
150
+ // the user's pre-saved docker register auth info would be used.
151
+ val pullImageCmdState =
152
+ dockerConf.registerAddress != null && ! baseImageTag.startsWith(
153
+ dockerConf.registerAddress)
154
+ if (pullImageCmdState) {
155
+ dockerClient.pullImageCmd(baseImageTag)
156
+ } else {
157
+ dockerClient
158
+ .pullImageCmd(baseImageTag)
159
+ .withAuthConfig(dockerConf.toAuthConf)
160
+ }
154
161
}
162
+ val pullCmdCallback = pullImageCmd
163
+ .asInstanceOf [HackPullImageCmd ]
164
+ .start(watchDockerPullProcess {
165
+ pullRsp =>
166
+ dockerProcess.pull.update(pullRsp)
167
+ Future (dockerProcessWatcher.onDockerPullProgressChange(dockerProcess.pull.snapshot))
168
+ })
169
+ pullCmdCallback.awaitCompletion
170
+ logInfo(s " Already pulled docker image from remote register, imageTag= $baseImageTag" )
155
171
}
156
- val pullCmdCallback = pullImageCmd
157
- .asInstanceOf [HackPullImageCmd ]
158
- .start(watchDockerPullProcess {
159
- pullRsp =>
160
- dockerProcess.pull.update(pullRsp)
161
- Future (dockerProcessWatcher.onDockerPullProgressChange(dockerProcess.pull.snapshot))
162
- })
163
- pullCmdCallback.awaitCompletion
164
- logInfo(s " Already pulled docker image from remote register, imageTag= $baseImageTag" )
165
172
}(err => throw new Exception (s " Pull docker image failed, imageTag= $baseImageTag" , err))
166
173
}.getOrElse(throw getError.exception)
167
174
0 commit comments