Skip to content

Failed to decode basic Protobuf message example when publishing to Kafka #21361

Closed as not planned
@renkenono

Description

@renkenono

Describe the bug

Hi,

I have a basic use case where I'd like to publish all the changes of a given materialized view to a Kafka topic in its Protobuf form. An almost complete list of steps to reproduce the issue is provided below.

I am familiar with the final error message which most likely comes from prost. I believe the input data fed to prost come from this file but I could be wrong.

Error message/log

# Server-side

risingwave-standalone  | 2025-04-11T13:28:44.672317165Z  INFO             rw-acceptor pgwire::pg_server: accept connection peer_addr=127.0.0.1:57956
risingwave-standalone  | 2025-04-11T13:28:45.8768341Z ERROR      rw-standalone-meta bootstrap_recovery: risingwave_meta::rpc::ddl_controller: failed to create streaming job id=123 error=failed to validate sink: config error: cannot build descriptor pool from schema `file:///opt/protocol/a.proto`: failed to decode file descriptor set: failed to decode Protobuf message: invalid wire type value: 7
risingwave-standalone  | 2025-04-11T13:28:45.881568146Z  WARN      rw-standalone-meta bootstrap_recovery: risingwave_meta::rpc::ddl_controller: aborted streaming job id=123
risingwave-standalone  | 2025-04-11T13:28:45.882136502Z ERROR  rw-standalone-frontend handle_query{mode="simple query" session_id=9 sql=CREATE SINK example_mv_kafka FROM example_mv_pb WITH (connector = 'kafka', properties.bootstrap.server = 'localhost:9092', topic = 'example') FORMAT PLAIN ENCODE PROTOBUF (message = 'proto.Example', schema.location = 'file:///opt/protocol/a.proto', force_append_only = 'true')}: pgwire::pg_protocol: error when process message error=Failed to run the query: gRPC request to meta service (call `/ddl_service.DdlService/CreateSink`) failed: Internal error: failed to validate sink: config error: cannot build descriptor pool from schema `file:///opt/protocol/a.proto`: failed to decode file descriptor set: failed to decode Protobuf message: invalid wire type value: 7

# Client-side (psql)


DROP_MATERIALIZED_VIEW
CREATE_MATERIALIZED_VIEW
 column_name |     data_type     
-------------+-------------------
 id        | character varying
(1 row)

ERROR:  Failed to run the query

Caused by these errors (recent errors listed first):
  1: gRPC request to meta service (call `/ddl_service.DdlService/CreateSink`) failed: Internal error
  2: failed to validate sink
  3: config error
  4: cannot build descriptor pool from schema `file:///opt/protocol/a.proto`
  5: failed to decode file descriptor set
  6: failed to decode Protobuf message: invalid wire type value: 7

To Reproduce

  1. Populate $PWD/protocol/a.proto with the following content.
syntax = "proto3";
package proto;
message Example { string id = 1; }
  1. Execute the following SQL commands.
DROP MATERIALIZED VIEW IF EXISTS example_mv_pb;
DROP TABLE IF EXISTS example;

CREATE TABLE example ( id varchar );

CREATE MATERIALIZED VIEW example_mv_pb AS
SELECT id
FROM example;

SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = 'example_mv_pb';


CREATE SINK example_mv_kafka FROM example_mv_pb
WITH (
   connector='kafka',
   properties.bootstrap.server='localhost:9092',
   topic='example'
)
FORMAT PLAIN
ENCODE PROTOBUF (
   message = 'proto.Example',
   schema.location = 'file:///opt/protocol/a.proto',
   force_append_only='true'
);

Expected behavior

I expected this to happen:

  1. Successful generation of file descriptors.
  2. Successful Protobuf serialization of example_mv rows.
  3. Successful publication of the latter to the Kafka topic example.

How did you deploy RisingWave?

---
x-image: &image
  image: ${RW_IMAGE:-risingwavelabs/risingwave:v2.2.1}
services:
  risingwave-standalone:
    <<: *image
    command: "standalone --meta-opts=\" \
                    --listen-addr 0.0.0.0:5690 \
                    --advertise-addr 0.0.0.0:5690 \
                    --dashboard-host 0.0.0.0:5691 \
                    --prometheus-host 0.0.0.0:1250 \
                    --prometheus-endpoint http://prometheus-0:9500 \
                    --backend sqlite \
                    --sql-endpoint sqlite:///opt/risingwave/backend.db \
                    --state-store hummock+fs:///opt/risingwave \
                    --data-directory hummock_001 \
                    --config-path /risingwave.toml\" \
                 --compute-opts=\" \
                    --config-path /risingwave.toml \
                    --listen-addr 0.0.0.0:5688 \
                    --prometheus-listener-addr 0.0.0.0:1250 \
                    --advertise-addr 0.0.0.0:5688 \
                    --async-stack-trace verbose \
                    --parallelism 8 \
                    --total-memory-bytes 21474836480 \
                    --role both \
                    --meta-address http://0.0.0.0:5690 \
                    --memory-manager-target-bytes 22333829939 \" \
                 --frontend-opts=\" \
                   --config-path /risingwave.toml \
                   --listen-addr 0.0.0.0:4599 \
                   --advertise-addr 0.0.0.0:4599 \
                   --prometheus-listener-addr 0.0.0.0:1250 \
                   --health-check-listener-addr 0.0.0.0:6786 \
                   --meta-addr http://0.0.0.0:5690 \
                   --frontend-total-memory-bytes=4294967296\" \
                 --compactor-opts=\" \
                   --listen-addr 0.0.0.0:6660 \
                   --prometheus-listener-addr 0.0.0.0:1250 \
                   --advertise-addr 0.0.0.0:6660 \
                   --meta-address http://0.0.0.0:5690 \
                   --compactor-total-memory-bytes=4294967296\""
    expose:
      - "6660"
      - "4599"
      - "5688"
      - "5690"
      - "1250"
      - "5691"
    ports:
      - "4599:4599"
      - "5690:5690"
      - "5691:5691"
      - "1250:1250"
    network_mode: "host"
    volumes:
      - "./protocol:/opt/protocol"
      - "./data:/opt/risingwave"
      - "./risingwave.toml:/risingwave.toml"
    environment:
      RUST_BACKTRACE: "1"
      # If ENABLE_TELEMETRY is not set, telemetry will start by default
      ENABLE_TELEMETRY: ${ENABLE_TELEMETRY:-false}
      RW_TELEMETRY_TYPE: ${RW_TELEMETRY_TYPE:-"docker-compose"}
      RW_SECRET_STORE_PRIVATE_KEY_HEX: ${RW_SECRET_STORE_PRIVATE_KEY_HEX:-0123456789bbcdef0123456789abcdef}
      RW_LICENSE_KEY: ${RW_LICENSE_KEY:-}
    container_name: risingwave-standalone
    healthcheck:
      test:
        - CMD-SHELL
        - bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/6660; exit $$?;'
        - bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/5688; exit $$?;'
        - bash -c '> /dev/tcp/127.0.0.1/4599; exit $$?;'
        - bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/5690; exit $$?;'
      interval: 1s
      timeout: 5s
    restart: always
    deploy:
      resources:
        limits:
          memory: 28G
        reservations:
          memory: 28G

The version of RisingWave

dev=> select version();
                                    version                                     
--------------------------------------------------------------------------------
 PostgreSQL 13.14.0-RisingWave-2.2.1 (3bbe7ed3142c758e6c07dffddebd8f7a3bd3318d)
(1 row)

dev=> 

Additional context

No response

Metadata

Metadata

Assignees

Labels

type/bugType: Bug. Only for issues.

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions