Skip to content

Commit e6b06a8

Browse files
AirbyteEstimateTraceMessage (#18875)
* `AirbyteEstimateTraceMessage` * Add PR number * fix method name * Lint * Lint * fix merge * Update docs/understanding-airbyte/airbyte-protocol.md Co-authored-by: Davin Chia <[email protected]> * `EstimateType` sub type in python * lint Co-authored-by: Davin Chia <[email protected]>
1 parent 29676e1 commit e6b06a8

File tree

4 files changed

+109
-2
lines changed

4 files changed

+109
-2
lines changed

airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ class Config:
8181

8282
class TraceType(Enum):
8383
ERROR = "ERROR"
84+
ESTIMATE = "ESTIMATE"
8485

8586

8687
class FailureType(Enum):
@@ -98,6 +99,28 @@ class Config:
9899
failure_type: Optional[FailureType] = Field(None, description="The type of error")
99100

100101

102+
class EstimateType(Enum):
103+
STREAM = "STREAM"
104+
SYNC = "SYNC"
105+
106+
107+
class AirbyteEstimateTraceMessage(BaseModel):
108+
class Config:
109+
extra = Extra.allow
110+
111+
name: str = Field(..., description="The name of the stream")
112+
type: EstimateType = Field(..., description="The type of estimate", title="estimate type")
113+
namespace: Optional[str] = Field(None, description="The namespace of the stream")
114+
row_estimate: Optional[int] = Field(
115+
None,
116+
description="The estimated number of rows to be emitted by this sync for this stream",
117+
)
118+
byte_estimate: Optional[int] = Field(
119+
None,
120+
description="The estimated number of bytes to be emitted by this sync for this stream",
121+
)
122+
123+
101124
class OrchestratorType(Enum):
102125
CONNECTOR_CONFIG = "CONNECTOR_CONFIG"
103126

@@ -213,6 +236,10 @@ class Config:
213236
type: TraceType = Field(..., description="the type of trace message", title="trace type")
214237
emitted_at: float = Field(..., description="the time in ms that the message was emitted")
215238
error: Optional[AirbyteErrorTraceMessage] = Field(None, description="error trace message: the error object")
239+
estimate: Optional[AirbyteEstimateTraceMessage] = Field(
240+
None,
241+
description="Estimate trace message: a guess at how much data will be produced in this sync",
242+
)
216243

217244

218245
class AirbyteControlMessage(BaseModel):

airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ private void handleEmittedOrchestratorConnectorConfig(final AirbyteControlConnec
250250
*/
251251
private void handleEmittedTrace(final AirbyteTraceMessage traceMessage, final ConnectorType connectorType) {
252252
switch (traceMessage.getType()) {
253+
case ESTIMATE -> handleEmittedEstimateTrace(traceMessage, connectorType);
253254
case ERROR -> handleEmittedErrorTrace(traceMessage, connectorType);
254255
default -> log.warn("Invalid message type for trace message: {}", traceMessage);
255256
}
@@ -263,6 +264,11 @@ private void handleEmittedErrorTrace(final AirbyteTraceMessage errorTraceMessage
263264
}
264265
}
265266

267+
@SuppressWarnings("PMD") // until method is implemented
268+
private void handleEmittedEstimateTrace(final AirbyteTraceMessage estimateTraceMessage, final ConnectorType connectorType) {
269+
270+
}
271+
266272
private short getStreamIndex(final String streamName) {
267273
if (!streamNameToIndex.containsKey(streamName)) {
268274
streamNameToIndex.put(streamName, nextStreamIndex);

airbyte-protocol/protocol-models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
title: AirbyteProtocol
55
type: object
66
description: AirbyteProtocol structs
7-
version: 0.3.1
7+
version: 0.3.2
88
properties:
99
airbyte_message:
1010
"$ref": "#/definitions/AirbyteMessage"
@@ -174,12 +174,16 @@ definitions:
174174
type: string
175175
enum:
176176
- ERROR
177+
- ESTIMATE
177178
emitted_at:
178179
description: "the time in ms that the message was emitted"
179180
type: number
180181
error:
181182
description: "error trace message: the error object"
182183
"$ref": "#/definitions/AirbyteErrorTraceMessage"
184+
estimate:
185+
description: "Estimate trace message: a guess at how much data will be produced in this sync"
186+
"$ref": "#/definitions/AirbyteEstimateTraceMessage"
183187
AirbyteErrorTraceMessage:
184188
type: object
185189
additionalProperties: true
@@ -201,6 +205,32 @@ definitions:
201205
enum:
202206
- system_error
203207
- config_error
208+
AirbyteEstimateTraceMessage:
209+
type: object
210+
additionalProperties: true
211+
required:
212+
- name
213+
- type
214+
properties:
215+
name:
216+
description: The name of the stream
217+
type: string
218+
type:
219+
title: "estimate type" # this title is required to avoid python codegen conflicts with the "type" parameter in AirbyteMessage. See https://github.com/airbytehq/airbyte/pull/12581
220+
description: The type of estimate
221+
type: string
222+
enum:
223+
- STREAM
224+
- SYNC
225+
namespace:
226+
description: The namespace of the stream
227+
type: string
228+
row_estimate:
229+
description: The estimated number of rows to be emitted by this sync for this stream
230+
type: integer
231+
byte_estimate:
232+
description: The estimated number of bytes to be emitted by this sync for this stream
233+
type: integer
204234
AirbyteControlMessage:
205235
type: object
206236
additionalProperties: true

docs/understanding-airbyte/airbyte-protocol.md

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ The Airbyte Protocol is versioned independently of the Airbyte Platform, and the
2828

2929
| Version | Date of Change | Pull Request(s) | Subject |
3030
| :------- | :------------- | :------------------------------------------------------------------------------------------------------------------ | :------------------------------------------------------------------------------- |
31+
| `v0.3.2` | 2022-10-128 | [18875](https://github.com/airbytehq/airbyte/pull/18875) | `AirbyteEstimateTraceMessage` added |
3132
| `v0.3.1` | 2022-10-12 | [17907](https://github.com/airbytehq/airbyte/pull/17907) | `AirbyteControlMessage.ConnectorConfig` added |
3233
| `v0.3.0` | 2022-09-09 | [16479](https://github.com/airbytehq/airbyte/pull/16479) | `AirbyteLogMessage.stack_trace` added |
3334
| `v0.2.0` | 2022-06-10 | [13573](https://github.com/airbytehq/airbyte/pull/13573) & [12586](https://github.com/airbytehq/airbyte/pull/12586) | `STREAM` and `GLOBAL` STATE messages |
@@ -759,7 +760,7 @@ AirbyteLogMessage:
759760

760761
### AirbyteTraceMessage
761762

762-
The trace message allows an Actor to emit metadata about the runtime of the Actor. As currently implemented, it allows an Actor to surface information about errors. This message is designed to grow to handle other use cases, including progress and performance metrics.
763+
The trace message allows an Actor to emit metadata about the runtime of the Actor, such as errors or estimates. This message is designed to grow to handle other use cases, including additonal performance metrics.
763764

764765
```yaml
765766
AirbyteTraceMessage:
@@ -775,12 +776,16 @@ AirbyteTraceMessage:
775776
type: string
776777
enum:
777778
- ERROR
779+
- ESTIMATE
778780
emitted_at:
779781
description: "the time in ms that the message was emitted"
780782
type: number
781783
error:
782784
description: "error trace message: the error object"
783785
"$ref": "#/definitions/AirbyteErrorTraceMessage"
786+
estimate:
787+
description: "Estimate trace message: a guess at how much data will be produced in this sync"
788+
"$ref": "#/definitions/AirbyteEstimateTraceMessage"
784789
AirbyteErrorTraceMessage:
785790
type: object
786791
additionalProperties: true
@@ -802,8 +807,47 @@ AirbyteErrorTraceMessage:
802807
enum:
803808
- system_error
804809
- config_error
810+
AirbyteEstimateTraceMessage:
811+
type: object
812+
additionalProperties: true
813+
required:
814+
- name
815+
- type
816+
properties:
817+
name:
818+
description: The name of the stream
819+
type: string
820+
type:
821+
description: The type of estimate
822+
type: string
823+
enum:
824+
- STREAM
825+
- SYNC
826+
namespace:
827+
description: The namespace of the stream
828+
type: string
829+
row_estimate:
830+
description: The estimated number of rows to be emitted by this sync for this stream
831+
type: integer
832+
byte_estimate:
833+
description: The estimated number of bytes to be emitted by this sync for this stream
834+
type: integer
805835
```
806836

837+
#### AirbyteErrorTraceMessage
838+
839+
Error Trace Messages are used when a sync is about to fail and the connector can provide meaningful information to the orhcestrator or user about what to do next.
840+
841+
Of note, an `internal_message` might be an exception code, but an `external_message` is meant to be user-facing, e.g. "Your API Key is invalid".
842+
843+
Syncs can fail for multiple reasons, and therefore multiple `AirbyteErrorTraceMessage` can be sent from a connector.
844+
845+
#### AirbyteEstimateTraceMessage
846+
847+
Estimate Trace Messages are used by connectors to inform the orchestrator about how much data they expect to move within the sync. This ise useful to present the user with estimates of the time remaining in the sync, or percentage complete. An example of this would be for every stream about to be synced from a databse to provde a `COUNT (*) from {table_name} where updated_at > {state}` to provide an estimate of the rows to be sent in this sync.
848+
849+
`AirbyteEstimateTraceMessage` should be emitted early in the sync to provide an early estimate of the sync's duration. Multiple `AirbyteEstimateTraceMessage`s can be sent for the same stream, and an updated estimate will replace the previous value.
850+
807851
### AirbyteControlMessage
808852

809853
An `AirbyteControlMessage` is for connectors to signal to the Airbyte Platform or Orchestrator that an action with a side-effect should be taken. This means that the Orchestrator will likely be altering some stored data about the connector, connection, or sync.

0 commit comments

Comments
 (0)