Skip to content

Commit 03f2b1f

Browse files
authored
feat: add wrapper for reading table data using Storage API (#431)
Add support for easily reading Tables using the BigQuery Storage API instead of the BigQuery API. This will provide increased performance and reduced memory usage for most use cases and will allow users to keep using the same interface as they used to use on our main library or fetch data directly via a new veneer on BigQuery Storage Read API
1 parent 2ff0553 commit 03f2b1f

File tree

13 files changed

+1679
-5
lines changed

13 files changed

+1679
-5
lines changed

package.json

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,21 @@
2727
"precompile": "gts clean"
2828
},
2929
"dependencies": {
30+
"@google-cloud/paginator": "^5.0.0",
31+
"apache-arrow": "^14.0.2",
32+
"core-js": "^3.37.1",
3033
"extend": "^3.0.2",
31-
"google-gax": "^4.3.1",
32-
"google-auth-library": "^9.6.3"
34+
"google-auth-library": "^9.6.3",
35+
"google-gax": "^4.3.1"
3336
},
3437
"peerDependencies": {
3538
"protobufjs": "^7.2.4"
3639
},
3740
"devDependencies": {
38-
"@google-cloud/bigquery": "^7.0.0",
41+
"@google-cloud/bigquery": "^7.5.2",
3942
"@types/extend": "^3.0.4",
4043
"@types/mocha": "^9.0.0",
41-
"@types/node": "^20.0.0",
44+
"@types/node": "^20.16.5",
4245
"@types/sinon": "^17.0.0",
4346
"@types/uuid": "^9.0.1",
4447
"c8": "^9.0.0",
@@ -55,7 +58,7 @@
5558
"nise": "6.0.0",
5659
"path-to-regexp": "6.3.0",
5760
"ts-loader": "^9.0.0",
58-
"typescript": "^5.1.6",
61+
"typescript": "^5.5.3",
5962
"uuid": "^9.0.0",
6063
"webpack": "^5.0.0",
6164
"webpack-cli": "^5.0.0"

src/index.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import * as v1 from './v1';
2020
import * as v1beta1 from './v1beta1';
2121
import * as v1alpha from './v1alpha';
2222
import * as managedwriter from './managedwriter';
23+
import * as reader from './reader';
2324
const BigQueryReadClient = v1.BigQueryReadClient;
2425
type BigQueryReadClient = v1.BigQueryReadClient;
2526
const BigQueryWriteClient = v1.BigQueryWriteClient;
@@ -28,6 +29,8 @@ const BigQueryStorageClient = v1beta1.BigQueryStorageClient;
2829
type BigQueryStorageClient = v1beta1.BigQueryStorageClient;
2930
const WriterClient = managedwriter.WriterClient;
3031
type WriterClient = managedwriter.WriterClient;
32+
const ReadClient = reader.ReadClient;
33+
type ReadClient = reader.ReadClient;
3134
export {
3235
v1,
3336
BigQueryReadClient,
@@ -37,6 +40,8 @@ export {
3740
BigQueryWriteClient,
3841
managedwriter,
3942
WriterClient,
43+
reader,
44+
ReadClient,
4045
};
4146
// For compatibility with JavaScript libraries we need to provide this default export:
4247
// tslint:disable-next-line no-default-export
@@ -46,6 +51,8 @@ export default {
4651
BigQueryWriteClient,
4752
managedwriter,
4853
WriterClient,
54+
reader,
55+
ReadClient,
4956
};
5057
import * as protos from '../protos/protos';
5158
export {protos};

src/reader/arrow_reader.ts

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
// Copyright 2024 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
import {ResourceStream} from '@google-cloud/paginator';
16+
import {RecordBatch} from 'apache-arrow';
17+
18+
import * as protos from '../../protos/protos';
19+
import {TableReference, ReadClient} from './read_client';
20+
import {logger} from '../util/logger';
21+
import {
22+
ArrowRawTransform,
23+
ArrowRecordBatchTransform,
24+
ArrowRecordReaderTransform,
25+
} from './arrow_transform';
26+
import {ReadSession, GetStreamOptions} from './read_session';
27+
import {ArrowFormat} from './data_format';
28+
29+
type ReadSessionInfo = protos.google.cloud.bigquery.storage.v1.IReadSession;
30+
31+
/**
32+
* A BigQuery Storage API Reader that can be used to read data
33+
* from BigQuery Tables using the Storage API in Arrow format.
34+
*
35+
* @class
36+
* @memberof reader
37+
*/
38+
export class ArrowTableReader {
39+
private _tableRef: TableReference;
40+
private _session: ReadSession;
41+
42+
/**
43+
* Creates a new ArrowTableReader instance. Usually created via
44+
* ReadClient.createArrowTableReader().
45+
*
46+
* @param {ReadClient} readClient - Storage Read Client.
47+
* @param {TableReference} table - target table to read data from.
48+
*/
49+
constructor(readClient: ReadClient, tableRef: TableReference) {
50+
this._tableRef = tableRef;
51+
this._session = new ReadSession(readClient, tableRef, ArrowFormat);
52+
}
53+
54+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
55+
private trace(msg: string, ...otherArgs: any[]) {
56+
logger(
57+
'arrow_table_reader',
58+
`[table: ${this._tableRef.tableId}]`,
59+
msg,
60+
...otherArgs
61+
);
62+
}
63+
64+
getSessionInfo(): ReadSessionInfo | undefined | null {
65+
return this._session.getSessionInfo();
66+
}
67+
68+
/**
69+
* Get a byte stream of Arrow Record Batch.
70+
*
71+
* @param {GetStreamOptions} options
72+
*/
73+
async getStream(
74+
options?: GetStreamOptions
75+
): Promise<ResourceStream<Uint8Array>> {
76+
this.trace('getStream', options);
77+
const stream = await this._session.getStream(options);
78+
return stream.pipe(new ArrowRawTransform()) as ResourceStream<Uint8Array>;
79+
}
80+
81+
/**
82+
* Get a stream of Arrow RecordBatch objects.
83+
*
84+
* @param {GetStreamOptions} options
85+
*/
86+
async getRecordBatchStream(
87+
options?: GetStreamOptions
88+
): Promise<ResourceStream<RecordBatch>> {
89+
this.trace('getRecordBatchStream', options);
90+
const stream = await this._session.getStream(options);
91+
const info = this._session.getSessionInfo();
92+
return stream
93+
.pipe(new ArrowRawTransform())
94+
.pipe(new ArrowRecordReaderTransform(info!))
95+
.pipe(new ArrowRecordBatchTransform()) as ResourceStream<RecordBatch>;
96+
}
97+
98+
close() {
99+
this._session.close();
100+
}
101+
}

src/reader/arrow_transform.ts

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
// Copyright 2024 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
import {Transform, TransformCallback} from 'stream';
16+
import {
17+
RecordBatchReader,
18+
RecordBatch,
19+
RecordBatchStreamReader,
20+
Vector,
21+
} from 'apache-arrow';
22+
import * as protos from '../../protos/protos';
23+
24+
type ReadRowsResponse =
25+
protos.google.cloud.bigquery.storage.v1.IReadRowsResponse;
26+
type ReadSession = protos.google.cloud.bigquery.storage.v1.IReadSession;
27+
28+
interface TableCell {
29+
v?: any;
30+
}
31+
interface TableRow {
32+
f?: Array<TableCell>;
33+
}
34+
35+
/**
36+
* ArrowRawTransform implements a node stream Transform that reads
37+
* ReadRowsResponse from BigQuery Storage Read API and convert
38+
* a raw Arrow Record Batch.
39+
*/
40+
export class ArrowRawTransform extends Transform {
41+
constructor() {
42+
super({
43+
readableObjectMode: false,
44+
writableObjectMode: true,
45+
});
46+
}
47+
48+
_transform(
49+
response: ReadRowsResponse,
50+
_: BufferEncoding,
51+
callback: TransformCallback
52+
): void {
53+
if (
54+
!(
55+
response.arrowRecordBatch &&
56+
response.arrowRecordBatch.serializedRecordBatch
57+
)
58+
) {
59+
callback(null);
60+
return;
61+
}
62+
callback(null, response.arrowRecordBatch?.serializedRecordBatch);
63+
}
64+
}
65+
66+
/**
67+
* ArrowRecordReaderTransform implements a node stream Transform that reads
68+
* a byte stream of raw Arrow Record Batch and convert to a stream of Arrow
69+
* RecordBatchStreamReader.
70+
*/
71+
export class ArrowRecordReaderTransform extends Transform {
72+
private session: ReadSession;
73+
74+
constructor(session: ReadSession) {
75+
super({
76+
objectMode: true,
77+
});
78+
this.session = session;
79+
}
80+
81+
_transform(
82+
serializedRecordBatch: Uint8Array,
83+
_: BufferEncoding,
84+
callback: TransformCallback
85+
): void {
86+
const buf = Buffer.concat([
87+
this.session.arrowSchema?.serializedSchema as Uint8Array,
88+
serializedRecordBatch,
89+
]);
90+
const reader = RecordBatchReader.from(buf);
91+
callback(null, reader);
92+
}
93+
}
94+
95+
/**
96+
* ArrowRecordBatchTransform implements a node stream Transform that reads
97+
* a RecordBatchStreamReader and convert a stream of Arrow RecordBatch.
98+
*/
99+
export class ArrowRecordBatchTransform extends Transform {
100+
constructor() {
101+
super({
102+
objectMode: true,
103+
});
104+
}
105+
106+
_transform(
107+
reader: RecordBatchStreamReader,
108+
_: BufferEncoding,
109+
callback: TransformCallback
110+
): void {
111+
const batches = reader.readAll();
112+
for (const row of batches) {
113+
this.push(row);
114+
}
115+
callback(null);
116+
}
117+
}
118+
119+
/**
120+
* ArrowRecordBatchTableRowTransform implements a node stream Transform that reads
121+
* an Arrow RecordBatch and convert a stream of BigQuery TableRow.
122+
*/
123+
export class ArrowRecordBatchTableRowTransform extends Transform {
124+
constructor() {
125+
super({
126+
objectMode: true,
127+
});
128+
}
129+
130+
_transform(
131+
batch: RecordBatch,
132+
_: BufferEncoding,
133+
callback: TransformCallback
134+
): void {
135+
const rows = new Array(batch.numRows);
136+
for (let i = 0; i < batch.numRows; i++) {
137+
rows[i] = {
138+
f: new Array(batch.numCols),
139+
};
140+
}
141+
for (let j = 0; j < batch.numCols; j++) {
142+
const column = batch.selectAt([j]);
143+
const columnName = column.schema.fields[0].name;
144+
for (let i = 0; i < batch.numRows; i++) {
145+
const fieldData = column.get(i);
146+
const fieldValue = fieldData?.toJSON()[columnName];
147+
rows[i].f[j] = {
148+
v: convertArrowValue(fieldValue),
149+
};
150+
}
151+
}
152+
for (let i = 0; i < batch.numRows; i++) {
153+
this.push(rows[i]);
154+
}
155+
callback(null);
156+
}
157+
}
158+
159+
function convertArrowValue(fieldValue: any): any {
160+
if (typeof fieldValue === 'object') {
161+
if (fieldValue instanceof Vector) {
162+
const arr = fieldValue.toJSON();
163+
return arr.map((v: any) => {
164+
return {v: convertArrowValue(v)};
165+
});
166+
}
167+
const tableRow: TableRow = {f: []};
168+
Object.keys(fieldValue).forEach(key => {
169+
tableRow.f?.push({
170+
v: convertArrowValue(fieldValue[key]),
171+
});
172+
});
173+
return tableRow;
174+
}
175+
return fieldValue;
176+
}

src/reader/data_format.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright 2024 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
import * as protos from '../../protos/protos';
16+
17+
export type DataFormat =
18+
protos.google.cloud.bigquery.storage.v1.IReadSession['dataFormat'];
19+
const DataFormat = protos.google.cloud.bigquery.storage.v1.DataFormat;
20+
21+
/**
22+
* Return data in Apache Arrow format.
23+
*
24+
* @memberof reader
25+
*/
26+
export const ArrowFormat: DataFormat = 'ARROW';
27+
28+
/**
29+
* Return data in Apache Avro format.
30+
*
31+
* @memberof reader
32+
*/
33+
export const AvroFormat: DataFormat = 'AVRO';

0 commit comments

Comments
 (0)