Skip to content

Failed to decode basic Protobuf message example when publishing to Kafka #21361

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
renkenono opened this issue Apr 11, 2025 · 1 comment
Closed
Assignees
Labels
type/bug Something isn't working
Milestone

Comments

@renkenono
Copy link

Describe the bug

Hi,

I have a basic use case where I'd like to publish all the changes of a given materialized view to a Kafka topic in its Protobuf form. An almost complete list of steps to reproduce the issue is provided below.

I am familiar with the final error message which most likely comes from prost. I believe the input data fed to prost come from this file but I could be wrong.

Error message/log

# Server-side

risingwave-standalone  | 2025-04-11T13:28:44.672317165Z  INFO             rw-acceptor pgwire::pg_server: accept connection peer_addr=127.0.0.1:57956
risingwave-standalone  | 2025-04-11T13:28:45.8768341Z ERROR      rw-standalone-meta bootstrap_recovery: risingwave_meta::rpc::ddl_controller: failed to create streaming job id=123 error=failed to validate sink: config error: cannot build descriptor pool from schema `file:///opt/protocol/a.proto`: failed to decode file descriptor set: failed to decode Protobuf message: invalid wire type value: 7
risingwave-standalone  | 2025-04-11T13:28:45.881568146Z  WARN      rw-standalone-meta bootstrap_recovery: risingwave_meta::rpc::ddl_controller: aborted streaming job id=123
risingwave-standalone  | 2025-04-11T13:28:45.882136502Z ERROR  rw-standalone-frontend handle_query{mode="simple query" session_id=9 sql=CREATE SINK example_mv_kafka FROM example_mv_pb WITH (connector = 'kafka', properties.bootstrap.server = 'localhost:9092', topic = 'example') FORMAT PLAIN ENCODE PROTOBUF (message = 'proto.Example', schema.location = 'file:///opt/protocol/a.proto', force_append_only = 'true')}: pgwire::pg_protocol: error when process message error=Failed to run the query: gRPC request to meta service (call `/ddl_service.DdlService/CreateSink`) failed: Internal error: failed to validate sink: config error: cannot build descriptor pool from schema `file:///opt/protocol/a.proto`: failed to decode file descriptor set: failed to decode Protobuf message: invalid wire type value: 7

# Client-side (psql)


DROP_MATERIALIZED_VIEW
CREATE_MATERIALIZED_VIEW
 column_name |     data_type     
-------------+-------------------
 id        | character varying
(1 row)

ERROR:  Failed to run the query

Caused by these errors (recent errors listed first):
  1: gRPC request to meta service (call `/ddl_service.DdlService/CreateSink`) failed: Internal error
  2: failed to validate sink
  3: config error
  4: cannot build descriptor pool from schema `file:///opt/protocol/a.proto`
  5: failed to decode file descriptor set
  6: failed to decode Protobuf message: invalid wire type value: 7

To Reproduce

  1. Populate $PWD/protocol/a.proto with the following content.
syntax = "proto3";
package proto;
message Example { string id = 1; }
  1. Execute the following SQL commands.
DROP MATERIALIZED VIEW IF EXISTS example_mv_pb;
DROP TABLE IF EXISTS example;

CREATE TABLE example ( id varchar );

CREATE MATERIALIZED VIEW example_mv_pb AS
SELECT id
FROM example;

SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = 'example_mv_pb';


CREATE SINK example_mv_kafka FROM example_mv_pb
WITH (
   connector='kafka',
   properties.bootstrap.server='localhost:9092',
   topic='example'
)
FORMAT PLAIN
ENCODE PROTOBUF (
   message = 'proto.Example',
   schema.location = 'file:///opt/protocol/a.proto',
   force_append_only='true'
);

Expected behavior

I expected this to happen:

  1. Successful generation of file descriptors.
  2. Successful Protobuf serialization of example_mv rows.
  3. Successful publication of the latter to the Kafka topic example.

How did you deploy RisingWave?

---
x-image: &image
  image: ${RW_IMAGE:-risingwavelabs/risingwave:v2.2.1}
services:
  risingwave-standalone:
    <<: *image
    command: "standalone --meta-opts=\" \
                    --listen-addr 0.0.0.0:5690 \
                    --advertise-addr 0.0.0.0:5690 \
                    --dashboard-host 0.0.0.0:5691 \
                    --prometheus-host 0.0.0.0:1250 \
                    --prometheus-endpoint http://prometheus-0:9500 \
                    --backend sqlite \
                    --sql-endpoint sqlite:///opt/risingwave/backend.db \
                    --state-store hummock+fs:///opt/risingwave \
                    --data-directory hummock_001 \
                    --config-path /risingwave.toml\" \
                 --compute-opts=\" \
                    --config-path /risingwave.toml \
                    --listen-addr 0.0.0.0:5688 \
                    --prometheus-listener-addr 0.0.0.0:1250 \
                    --advertise-addr 0.0.0.0:5688 \
                    --async-stack-trace verbose \
                    --parallelism 8 \
                    --total-memory-bytes 21474836480 \
                    --role both \
                    --meta-address http://0.0.0.0:5690 \
                    --memory-manager-target-bytes 22333829939 \" \
                 --frontend-opts=\" \
                   --config-path /risingwave.toml \
                   --listen-addr 0.0.0.0:4599 \
                   --advertise-addr 0.0.0.0:4599 \
                   --prometheus-listener-addr 0.0.0.0:1250 \
                   --health-check-listener-addr 0.0.0.0:6786 \
                   --meta-addr http://0.0.0.0:5690 \
                   --frontend-total-memory-bytes=4294967296\" \
                 --compactor-opts=\" \
                   --listen-addr 0.0.0.0:6660 \
                   --prometheus-listener-addr 0.0.0.0:1250 \
                   --advertise-addr 0.0.0.0:6660 \
                   --meta-address http://0.0.0.0:5690 \
                   --compactor-total-memory-bytes=4294967296\""
    expose:
      - "6660"
      - "4599"
      - "5688"
      - "5690"
      - "1250"
      - "5691"
    ports:
      - "4599:4599"
      - "5690:5690"
      - "5691:5691"
      - "1250:1250"
    network_mode: "host"
    volumes:
      - "./protocol:/opt/protocol"
      - "./data:/opt/risingwave"
      - "./risingwave.toml:/risingwave.toml"
    environment:
      RUST_BACKTRACE: "1"
      # If ENABLE_TELEMETRY is not set, telemetry will start by default
      ENABLE_TELEMETRY: ${ENABLE_TELEMETRY:-false}
      RW_TELEMETRY_TYPE: ${RW_TELEMETRY_TYPE:-"docker-compose"}
      RW_SECRET_STORE_PRIVATE_KEY_HEX: ${RW_SECRET_STORE_PRIVATE_KEY_HEX:-0123456789bbcdef0123456789abcdef}
      RW_LICENSE_KEY: ${RW_LICENSE_KEY:-}
    container_name: risingwave-standalone
    healthcheck:
      test:
        - CMD-SHELL
        - bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/6660; exit $$?;'
        - bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/5688; exit $$?;'
        - bash -c '> /dev/tcp/127.0.0.1/4599; exit $$?;'
        - bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/5690; exit $$?;'
      interval: 1s
      timeout: 5s
    restart: always
    deploy:
      resources:
        limits:
          memory: 28G
        reservations:
          memory: 28G

The version of RisingWave

dev=> select version();
                                    version                                     
--------------------------------------------------------------------------------
 PostgreSQL 13.14.0-RisingWave-2.2.1 (3bbe7ed3142c758e6c07dffddebd8f7a3bd3318d)
(1 row)

dev=> 

Additional context

No response

@renkenono renkenono added the type/bug Something isn't working label Apr 11, 2025
@github-actions github-actions bot added this to the release-2.3 milestone Apr 11, 2025
@xiangjinwu xiangjinwu self-assigned this Apr 25, 2025
@xiangjinwu xiangjinwu modified the milestones: release-2.3, release-2.4 Apr 25, 2025
@xiangjinwu
Copy link
Contributor

schema.location = 'file:///opt/protocol/a.proto'

The schema.location parameter expects a compiled descriptor file rather than a protobuf source file.

https://docs.risingwave.com/ingestion/getting-started/formats-and-encoding-options#protobuf-handling

@xiangjinwu xiangjinwu closed this as not planned Won't fix, can't repro, duplicate, stale Apr 25, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants