diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go index 9d18f26ca6..a516af5dbc 100644 --- a/pulsar/internal/batch_builder.go +++ b/pulsar/internal/batch_builder.go @@ -51,6 +51,14 @@ type BatchBuilder interface { callback interface{}, replicateTo []string, deliverAt time.Time, ) bool + // AddMessageMetaData will add a message to batch, + // currently it will only be used when sending chunked msg. + AddMessageMetaData( + metadata *pb.MessageMetadata, sequenceIDGenerator *uint64, + payload []byte, + callback interface{}, replicateTo []string, deliverAt time.Time, + ) bool + // Flush all the messages buffered in the client and wait until all messages have been successfully persisted. Flush() (batchData Buffer, sequenceID uint64, callbacks []interface{}, err error) @@ -207,6 +215,57 @@ func (bc *batchContainer) Add( return true } +func (bc *batchContainer) AddMessageMetaData( + metadata *pb.MessageMetadata, sequenceIDGenerator *uint64, + payload []byte, + callback interface{}, replicateTo []string, deliverAt time.Time, +) bool { + if replicateTo != nil && bc.numMessages != 0 { + // If the current batch is not empty and we're trying to set the replication clusters, + // then we need to force the current batch to flush and send the message individually + return false + } else if bc.msgMetadata.ReplicateTo != nil { + // There's already a message with cluster replication list. need to flush before next + // message can be sent + return false + } else if !bc.hasSpace(payload) { + // The current batch is full. Producer has to call Flush() to + return false + } + + if bc.numMessages == 0 { + var sequenceID uint64 + if metadata.SequenceId != nil { + sequenceID = *metadata.SequenceId + } else { + sequenceID = GetAndAdd(sequenceIDGenerator, 1) + } + bc.msgMetadata.SequenceId = proto.Uint64(sequenceID) + bc.msgMetadata.PublishTime = proto.Uint64(TimestampMillis(time.Now())) + bc.msgMetadata.ProducerName = &bc.producerName + bc.msgMetadata.ReplicateTo = replicateTo + bc.msgMetadata.PartitionKey = metadata.PartitionKey + bc.msgMetadata.Properties = metadata.Properties + + // Special field for chunks + bc.msgMetadata.Uuid = metadata.Uuid + bc.msgMetadata.NumChunksFromMsg = metadata.NumChunksFromMsg + bc.msgMetadata.TotalChunkMsgSize = metadata.TotalChunkMsgSize + bc.msgMetadata.ChunkId = metadata.ChunkId + + if deliverAt.UnixNano() > 0 { + bc.msgMetadata.DeliverAtTime = proto.Int64(int64(TimestampMillis(deliverAt))) + } + + bc.cmdSend.Send.SequenceId = proto.Uint64(sequenceID) + } + addMessageToBatch(bc.buffer, metadata, payload) + + bc.numMessages++ + bc.callbacks = append(bc.callbacks, callback) + return true +} + func (bc *batchContainer) reset() { bc.numMessages = 0 bc.buffer.Clear() diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go index b91c0b6341..acdb437f39 100644 --- a/pulsar/internal/commands.go +++ b/pulsar/internal/commands.go @@ -218,6 +218,34 @@ func addSingleMessageToBatch(wb Buffer, smm *pb.SingleMessageMetadata, payload [ wb.Write(payload) } +func addMessageToBatch(wb Buffer, mm *pb.MessageMetadata, payload []byte) { + metadataSize := uint32(mm.Size()) + wb.WriteUint32(metadataSize) + + wb.ResizeIfNeeded(metadataSize) + _, err := mm.MarshalToSizedBuffer(wb.WritableSlice()[:metadataSize]) + if err != nil { + panic(fmt.Sprintf("Protobuf serialization error: %v", err)) + } + + wb.WrittenBytes(metadataSize) + wb.Write(payload) +} + +func ConstructBufferFromMessage(wb Buffer, mm *pb.MessageMetadata, payload []byte) { + metadataSize := uint32(mm.Size()) + wb.WriteUint32(metadataSize) + + wb.ResizeIfNeeded(metadataSize) + _, err := mm.MarshalToSizedBuffer(wb.WritableSlice()[:metadataSize]) + if err != nil { + panic(fmt.Sprintf("Protobuf serialization error: %v", err)) + } + + wb.WrittenBytes(metadataSize) + wb.Write(payload) +} + func serializeBatch(wb Buffer, cmdSend *pb.BaseCommand, msgMetadata *pb.MessageMetadata, diff --git a/pulsar/producer.go b/pulsar/producer.go index d9b2307a33..489396fdb1 100644 --- a/pulsar/producer.go +++ b/pulsar/producer.go @@ -168,6 +168,10 @@ type ProducerOptions struct { // Encryption specifies the fields required to encrypt a message Encryption *ProducerEncryptionInfo + + // ChunkingEnabled specifies if Producer will split the original message into chunks and publish + // them with chunked metadata when message payload size is larger than broker can support. + ChunkingEnabled bool } // Producer is used to publish messages on a topic diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 3f1e54b6a5..dea07e62af 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -19,6 +19,7 @@ package pulsar import ( "context" + uuidGen "github.com/google/uuid" "strings" "sync" "sync/atomic" @@ -411,8 +412,8 @@ func (p *partitionProducer) internalSend(request *sendRequest) { payload = schemaPayload } - // if msg is too large - if len(payload) > int(p._getConn().GetMaxMessageSize()) { + // if msg is too large and chunked msg not enabled + if len(payload) > int(p._getConn().GetMaxMessageSize()) && !p.options.ChunkingEnabled { p.publishSemaphore.Release() request.callback(nil, request.msg, errMessageTooLarge) p.log.WithError(errMessageTooLarge). @@ -423,6 +424,16 @@ func (p *partitionProducer) internalSend(request *sendRequest) { return } + // if msg is too large and chunked msg enabled, send chunked msg + if len(payload) > int(p._getConn().GetMaxMessageSize()) && p.options.ChunkingEnabled { + p.log. + WithField("size", len(payload)). + WithField("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize())). + Info("Size exceed limit, send with chunks") + p.internalSendWithTrunks(request, payload) + return + } + deliverAt := msg.DeliverAt if msg.DeliverAfter.Nanoseconds() > 0 { deliverAt = time.Now().Add(msg.DeliverAfter) @@ -496,6 +507,77 @@ func (p *partitionProducer) internalSend(request *sendRequest) { } } +func (p *partitionProducer) internalSendWithTrunks(request *sendRequest, payload []byte) { + chunkSize := int(p._getConn().GetMaxMessageSize()) + totalChunks := (len(payload)+1)/chunkSize + 1 + uuid := uuidGen.New().String() + + for chunkId := 0; chunkId < chunkSize; chunkId++ { + left := chunkId * chunkSize + right := left + chunkSize + if right > len(payload)-1 { + right = len(payload) - 1 + } + // [left, right) + p.internalSendSingleChunk(request, payload[left:right], uuid, totalChunks, len(payload), chunkId) + } +} + +func (p *partitionProducer) internalSendSingleChunk(request *sendRequest, payload []byte, + uuid string, totalChunks int, totalSize int, chunkId int) { + + msg := request.msg + mm := &pb.MessageMetadata{} + + deliverAt := msg.DeliverAt + if msg.DeliverAfter.Nanoseconds() > 0 { + deliverAt = time.Now().Add(msg.DeliverAfter) + mm.DeliverAtTime = proto.Int64(int64(internal.TimestampMillis(deliverAt))) + } + + if msg.EventTime.UnixNano() != 0 { + mm.EventTime = proto.Uint64(internal.TimestampMillis(msg.EventTime)) + } + + if msg.Key != "" { + mm.PartitionKey = proto.String(msg.Key) + } + + if len(msg.OrderingKey) != 0 { + mm.OrderingKey = []byte(msg.OrderingKey) + } + + if msg.Properties != nil { + mm.Properties = internal.ConvertFromStringMap(msg.Properties) + } + + if msg.SequenceID != nil { + sequenceID := uint64(*msg.SequenceID) + mm.SequenceId = proto.Uint64(sequenceID) + } + + // Fields required for chunked data + mm.Uuid = proto.String(uuid) + mm.NumChunksFromMsg = proto.Int(totalChunks) + mm.TotalChunkMsgSize = proto.Int(totalSize) + mm.ChunkId = proto.Int(chunkId) + + // Directly construct a buffer and put it to the pending queue + newBuffer := p.GetBuffer() + internal.ConstructBufferFromMessage(newBuffer, mm, payload) + + callbacks := make([]interface{}, 1) + callbacks[0] = request.callback + + p.pendingQueue.Put(&pendingItem{ + sentAt: time.Now(), + batchData: newBuffer, + sequenceID: uint64(*msg.SequenceID), + sendRequests: callbacks, + }) + p._getConn().WriteData(newBuffer) +} + type pendingItem struct { sync.Mutex batchData internal.Buffer