Skip to content

Commit 860e4b5

Browse files
committed
added drain pause/resume
Signed-off-by: Jeromy Cannon <[email protected]>
1 parent 6b7459c commit 860e4b5

File tree

1 file changed

+43
-16
lines changed

1 file changed

+43
-16
lines changed

src/core/k8.mjs

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -573,34 +573,52 @@ export class K8 {
573573
return new Promise((resolve, reject) => {
574574
const execInstance = new k8s.Exec(this.kubeConfig)
575575
const command = ['tar', 'cf', '-', '-C', srcDir, srcFile]
576-
const writerStream = fs.createWriteStream(tmpFile)
577-
const writerPassthroughStream = new stream.PassThrough({ highWaterMark: 1024 * 1024 })
576+
const outputFileStream = fs.createWriteStream(tmpFile)
577+
const outputPassthroughStream = new stream.PassThrough({ highWaterMark: 1024 * 1024 })
578578
const errStream = new stream.PassThrough()
579579
let additionalErrorMessageDetail = ''
580580

581581
// Use pipe() to automatically handle backpressure between streams
582-
writerPassthroughStream.pipe(writerStream);
582+
outputPassthroughStream.pipe(outputFileStream);
583+
584+
outputPassthroughStream.on('data', (chunk) => {
585+
const canWrite = outputFileStream.write(chunk); // Write chunk to file and check if buffer is full
586+
587+
if (!canWrite) {
588+
console.log(`Buffer is full, pausing data stream... for copying from ${podName}:${srcDir}/${srcFile} to ${destPath}`);
589+
outputPassthroughStream.pause(); // Pause the data stream if buffer is full
590+
}
591+
592+
})
593+
594+
outputFileStream.on('drain', () => {
595+
outputPassthroughStream.resume()
596+
this.logger.debug(`stream drained, resume write for copying from ${podName}:${srcDir}/${srcFile} to ${destPath}`)
597+
})
583598

584599
execInstance.exec(
585600
namespace,
586601
podName,
587602
containerName,
588603
command,
589-
writerStream,
604+
outputFileStream,
590605
errStream,
591606
null,
592607
false,
593608
async ({ status }) => {
594609
this.logger.debug(`copyFrom.callback(status)=${status}`)
595-
writerStream.close()
610+
outputFileStream.end()
611+
outputFileStream.close(() => {
612+
this.logger.debug(`finished closing writerStream copying from ${podName}:${srcDir}/${srcFile} to ${destPath}`)
613+
})
596614
if (status === 'Failure') {
597615
self._deleteTempFile(tmpFile)
598616
}
599617
})
600618
.then(conn => {
601619
conn.on('close', async (code, reason) => {
602620
if (code !== 1000) { // code 1000 is the success code
603-
return reject(new FullstackTestingError(`failed to copy because of error (${code}): ${reason}`))
621+
return reject(new FullstackTestingError(`failed copying from ${podName}:${srcDir}/${srcFile} to ${destPath} because of error (${code}): ${reason}`))
604622
}
605623

606624
try {
@@ -622,40 +640,49 @@ export class K8 {
622640
}
623641
}
624642
} catch (e) {
625-
return reject(new FullstackTestingError(`failed to extract file: ${destPath}`, e))
643+
return reject(new FullstackTestingError(`failed copying from ${podName}:${srcDir}/${srcFile} to ${destPath} to extract file: ${destPath}`, e))
626644
}
627645

628-
return reject(new FullstackTestingError(`failed to download file completely: ${destPath}${additionalErrorMessageDetail}`))
646+
return reject(new FullstackTestingError(`failed copying from ${podName}:${srcDir}/${srcFile} to ${destPath} to download file completely: ${destPath}${additionalErrorMessageDetail}`))
629647
})
630648

631649
conn.on('error', (e) => {
632650
self._deleteTempFile(tmpFile)
633651
return reject(new FullstackTestingError(
634-
`failed to copy file ${destPath} because of connection error: ${e.message}`, e))
652+
`failed copying from ${podName}:${srcDir}/${srcFile} to ${destPath} because of connection error: ${e.message}`, e))
635653
})
636654
})
637655

638656
errStream.on('data', (data) => {
639-
return reject(new FullstackTestingError(`error encountered during download of file: ${destPath}, error: ${data.toString()}`))
657+
return reject(new FullstackTestingError(`error encountered copying from ${podName}:${srcDir}/${srcFile} to ${destPath}, error: ${data.toString()}`))
640658
})
641659

642-
writerStream.on('close', () => {
660+
outputFileStream.on('close', () => {
643661
this.logger.debug(`finished copying from ${podName}:${srcDir}/${srcFile} to ${destPath}`)
644662
})
645663

646-
writerStream.on('error', (err) => {
647-
return reject(new FullstackTestingError(`error encountered during download of file: ${destPath}, err: ${err.toString()}`, err))
664+
outputFileStream.on('error', (err) => {
665+
return reject(new FullstackTestingError(`writerStream error encountered copying from ${podName}:${srcDir}/${srcFile} to ${destPath}, err: ${err.toString()}`, err))
648666

649667
})
650668

651-
writerStream.on('end', () => {
669+
outputFileStream.on('end', () => {
652670
this.logger.debug(`writerStream has ended for copying from ${podName}:${srcDir}/${srcFile} to ${destPath}`)
653671
})
654672

655-
writerStream.on('finish', () => {
656-
writerStream.end()
673+
outputPassthroughStream.on('end', () => {
674+
this.logger.debug(`writerPassthroughStream has ended for copying from ${podName}:${srcDir}/${srcFile} to ${destPath}`)
675+
})
676+
677+
outputFileStream.on('finish', () => {
678+
outputFileStream.end()
657679
this.logger.debug(`stopping copy, writerStream has finished for copying from ${podName}:${srcDir}/${srcFile} to ${destPath}`)
658680
})
681+
682+
outputPassthroughStream.on('finish', () => {
683+
outputFileStream.end()
684+
this.logger.debug(`stopping copy, writerPassthroughStream has finished for copying from ${podName}:${srcDir}/${srcFile} to ${destPath}`)
685+
})
659686
})
660687
} catch (e) {
661688
throw new FullstackTestingError(

0 commit comments

Comments
 (0)