Skip to content

stream: normalized destroy() #30906

Closed
Closed
@ronag

Description

@ronag

I've been quite regularly using utils for safe and normalized destruction of streams and streamlike objects. Given @mcollina:s recent comment about making the destroy(err, cb) API public I just wanted to suggest (as an alternative?) including something like this into the stream utils.

If there is interest in something like this I could look into preparing a more complete PR and proposal.

module.exports = function destroy (stream, err, cb) {
  // TODO: cb should not be optional, throw if missing?
  let sync = true;
  const callback = once(er => {
    if (sync) {
      process.nextTick(cb, er);
    } else {
      cb(er);
    }
  });
 
  const s = stream._writableState || stream._readableState;

  if (stream.destroyed || (s && s.destroyed)) {
    // TODO: Move this logic into eos?
    // TODO: Move this logic into destroy(err, callback)?
    if (s) {
      if (s.errorEmitted) {
        // TODO: cb with error? Save error in errored?
        // callback(s.errored);
        return callback();
      } else if (s.closeEmitted) {
        // TODO: s.closeEmitted doesn't exists yet.
        // TODO: it's still possible for stream to emit 'error' after 'close'.
        return callback();
      } else if (typeof s.closeEmitted !== 'boolean' && (s.endEmitted || s.finished)) {
        // TODO: This is not entirely correct for Duplex w/ halfOpen
        setImmediate(callback);
        stream.on('error', callback);
      } else {
        eos(stream, callback);
      }
      if (err && typeof stream.destroy === 'function') {
        // Forward error
        stream.destroy(err);
      }
    } else {
      // TODO: How to handle?
      callback(new Error('invalid stream'));
    }
  } else if (typeof stream.destroy === 'function' && stream.destroy.length === 2) {
    stream.destroy(err, callback);
    // We handle error through callback
    stream.on('error', noop);
  } else {
    eos(stream, callback);
    if (typeof stream.abort === 'function') {
      stream.abort();
    } else if (stream.req && typeof stream.req.abort === 'function') {
      // HTTP response.destroy() is slightly broken. Try to avoid.
      stream.req.abort();
    } else if (typeof stream.destroy === 'function') {
      stream.destroy(err);
    } else if (typeof stream.end === 'function') {
      stream.end();
    } else {
      callback(new Error('invalid stream'));
    }
  }
  sync = false;
}

Which would be used something like:

const { destroy } = require('stream');

const someStream = ...
... do stuff
destroy(someStream);

Metadata

Metadata

Assignees

No one assigned

    Labels

    streamIssues and PRs related to the stream subsystem.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions