Skip to content

Nodejs transaction redesign feature branch 2 adding commit #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: nodejs-transaction-change-run
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 117 additions & 111 deletions src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,117 +161,7 @@ class Transaction extends DatastoreRequest {
: () => {};
const gaxOptions =
typeof gaxOptionsOrCallback === 'object' ? gaxOptionsOrCallback : {};

if (this.skipCommit) {
setImmediate(callback);
return;
}

const keys: Entities = {};

this.modifiedEntities_
// Reverse the order of the queue to respect the "last queued request
// wins" behavior.
.reverse()
// Limit the operations we're going to send through to only the most
// recently queued operations. E.g., if a user tries to save with the
// same key they just asked to be deleted, the delete request will be
// ignored, giving preference to the save operation.
.filter((modifiedEntity: Entity) => {
const key = modifiedEntity.entity.key;

if (!entity.isKeyComplete(key)) return true;

const stringifiedKey = JSON.stringify(modifiedEntity.entity.key);

if (!keys[stringifiedKey]) {
keys[stringifiedKey] = true;
return true;
}

return false;
})
// Group entities together by method: `save` mutations, then `delete`.
// Note: `save` mutations being first is required to maintain order when
// assigning IDs to incomplete keys.
.sort((a, b) => {
return a.method < b.method ? 1 : a.method > b.method ? -1 : 0;
})
// Group arguments together so that we only make one call to each
// method. This is important for `DatastoreRequest.save`, especially, as
// that method handles assigning auto-generated IDs to the original keys
// passed in. When we eventually execute the `save` method's API
// callback, having all the keys together is necessary to maintain
// order.
.reduce((acc: Entities, entityObject: Entity) => {
const lastEntityObject = acc[acc.length - 1];
const sameMethod =
lastEntityObject && entityObject.method === lastEntityObject.method;

if (!lastEntityObject || !sameMethod) {
acc.push(entityObject);
} else {
lastEntityObject.args = lastEntityObject.args.concat(
entityObject.args
);
}

return acc;
}, [])
// Call each of the mutational methods (DatastoreRequest[save,delete])
// to build up a `req` array on this instance. This will also build up a
// `callbacks` array, that is the same callback that would run if we
// were using `save` and `delete` outside of a transaction, to process
// the response from the API.
.forEach(
(modifiedEntity: {method: string; args: {reverse: () => void}}) => {
const method = modifiedEntity.method;
const args = modifiedEntity.args.reverse();
Datastore.prototype[method].call(this, args, () => {});
}
);

// Take the `req` array built previously, and merge them into one request to
// send as the final transactional commit.
const reqOpts = {
mutations: this.requests_
.map((x: {mutations: google.datastore.v1.Mutation}) => x.mutations)
.reduce(
(a: {concat: (arg0: Entity) => void}, b: Entity) => a.concat(b),
[]
),
};

this.request_(
{
client: 'DatastoreClient',
method: 'commit',
reqOpts,
gaxOpts: gaxOptions || {},
},
(err, resp) => {
if (err) {
// Rollback automatically for the user.
this.rollback(() => {
// Provide the error & API response from the failed commit to the
// user. Even a failed rollback should be transparent. RE:
// https://github.com/GoogleCloudPlatform/google-cloud-node/pull/1369#discussion_r66833976
callback(err, resp);
});
return;
}

// The `callbacks` array was built previously. These are the callbacks
// that handle the API response normally when using the
// DatastoreRequest.save and .delete methods.
this.requestCallbacks_.forEach(
(cb: (arg0: null, arg1: Entity) => void) => {
cb(null, resp);
}
);
callback(null, resp);
}
);
this.runCommit(gaxOptions, callback);
}

/**
Expand Down Expand Up @@ -561,6 +451,122 @@ class Transaction extends DatastoreRequest {
});
}

private runCommit(
gaxOptions: CallOptions,
callback: CommitCallback
): void | Promise<CommitResponse> {
if (this.skipCommit) {
setImmediate(callback);
return;
}

const keys: Entities = {};

this.modifiedEntities_
// Reverse the order of the queue to respect the "last queued request
// wins" behavior.
.reverse()
// Limit the operations we're going to send through to only the most
// recently queued operations. E.g., if a user tries to save with the
// same key they just asked to be deleted, the delete request will be
// ignored, giving preference to the save operation.
.filter((modifiedEntity: Entity) => {
const key = modifiedEntity.entity.key;

if (!entity.isKeyComplete(key)) return true;

const stringifiedKey = JSON.stringify(modifiedEntity.entity.key);

if (!keys[stringifiedKey]) {
keys[stringifiedKey] = true;
return true;
}

return false;
})
// Group entities together by method: `save` mutations, then `delete`.
// Note: `save` mutations being first is required to maintain order when
// assigning IDs to incomplete keys.
.sort((a, b) => {
return a.method < b.method ? 1 : a.method > b.method ? -1 : 0;
})
// Group arguments together so that we only make one call to each
// method. This is important for `DatastoreRequest.save`, especially, as
// that method handles assigning auto-generated IDs to the original keys
// passed in. When we eventually execute the `save` method's API
// callback, having all the keys together is necessary to maintain
// order.
.reduce((acc: Entities, entityObject: Entity) => {
const lastEntityObject = acc[acc.length - 1];
const sameMethod =
lastEntityObject && entityObject.method === lastEntityObject.method;

if (!lastEntityObject || !sameMethod) {
acc.push(entityObject);
} else {
lastEntityObject.args = lastEntityObject.args.concat(
entityObject.args
);
}

return acc;
}, [])
// Call each of the mutational methods (DatastoreRequest[save,delete])
// to build up a `req` array on this instance. This will also build up a
// `callbacks` array, that is the same callback that would run if we
// were using `save` and `delete` outside of a transaction, to process
// the response from the API.
.forEach(
(modifiedEntity: {method: string; args: {reverse: () => void}}) => {
const method = modifiedEntity.method;
const args = modifiedEntity.args.reverse();
Datastore.prototype[method].call(this, args, () => {});
}
);

// Take the `req` array built previously, and merge them into one request to
// send as the final transactional commit.
const reqOpts = {
mutations: this.requests_
.map((x: {mutations: google.datastore.v1.Mutation}) => x.mutations)
.reduce(
(a: {concat: (arg0: Entity) => void}, b: Entity) => a.concat(b),
[]
),
};

this.request_(
{
client: 'DatastoreClient',
method: 'commit',
reqOpts,
gaxOpts: gaxOptions || {},
},
(err, resp) => {
if (err) {
// Rollback automatically for the user.
this.rollback(() => {
// Provide the error & API response from the failed commit to the
// user. Even a failed rollback should be transparent. RE:
// https://github.com/GoogleCloudPlatform/google-cloud-node/pull/1369#discussion_r66833976
callback(err, resp);
});
return;
}

// The `callbacks` array was built previously. These are the callbacks
// that handle the API response normally when using the
// DatastoreRequest.save and .delete methods.
this.requestCallbacks_.forEach(
(cb: (arg0: null, arg1: Entity) => void) => {
cb(null, resp);
}
);
callback(null, resp);
}
);
}

/**
* This function parses results from a beginTransaction call
*
Expand Down
Loading