Skip to content

[WIP] [Issue 456] feat: support chunked msg #717

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions pulsar/internal/batch_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ type BatchBuilder interface {
callback interface{}, replicateTo []string, deliverAt time.Time,
) bool

// AddMessageMetaData will add a message to batch,
// currently it will only be used when sending chunked msg.
AddMessageMetaData(
metadata *pb.MessageMetadata, sequenceIDGenerator *uint64,
payload []byte,
callback interface{}, replicateTo []string, deliverAt time.Time,
) bool

// Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
Flush() (batchData Buffer, sequenceID uint64, callbacks []interface{}, err error)

Expand Down Expand Up @@ -207,6 +215,57 @@ func (bc *batchContainer) Add(
return true
}

func (bc *batchContainer) AddMessageMetaData(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not find any code calls this function. Who's supposed to call this function?

metadata *pb.MessageMetadata, sequenceIDGenerator *uint64,
payload []byte,
callback interface{}, replicateTo []string, deliverAt time.Time,
) bool {
if replicateTo != nil && bc.numMessages != 0 {
// If the current batch is not empty and we're trying to set the replication clusters,
// then we need to force the current batch to flush and send the message individually
return false
} else if bc.msgMetadata.ReplicateTo != nil {
// There's already a message with cluster replication list. need to flush before next
// message can be sent
return false
} else if !bc.hasSpace(payload) {
// The current batch is full. Producer has to call Flush() to
return false
}

if bc.numMessages == 0 {
var sequenceID uint64
if metadata.SequenceId != nil {
sequenceID = *metadata.SequenceId
} else {
sequenceID = GetAndAdd(sequenceIDGenerator, 1)
}
bc.msgMetadata.SequenceId = proto.Uint64(sequenceID)
bc.msgMetadata.PublishTime = proto.Uint64(TimestampMillis(time.Now()))
bc.msgMetadata.ProducerName = &bc.producerName
bc.msgMetadata.ReplicateTo = replicateTo
bc.msgMetadata.PartitionKey = metadata.PartitionKey
bc.msgMetadata.Properties = metadata.Properties

// Special field for chunks
bc.msgMetadata.Uuid = metadata.Uuid
bc.msgMetadata.NumChunksFromMsg = metadata.NumChunksFromMsg
bc.msgMetadata.TotalChunkMsgSize = metadata.TotalChunkMsgSize
bc.msgMetadata.ChunkId = metadata.ChunkId

if deliverAt.UnixNano() > 0 {
bc.msgMetadata.DeliverAtTime = proto.Int64(int64(TimestampMillis(deliverAt)))
}

bc.cmdSend.Send.SequenceId = proto.Uint64(sequenceID)
}
addMessageToBatch(bc.buffer, metadata, payload)

bc.numMessages++
bc.callbacks = append(bc.callbacks, callback)
return true
}

func (bc *batchContainer) reset() {
bc.numMessages = 0
bc.buffer.Clear()
Expand Down
28 changes: 28 additions & 0 deletions pulsar/internal/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,34 @@ func addSingleMessageToBatch(wb Buffer, smm *pb.SingleMessageMetadata, payload [
wb.Write(payload)
}

func addMessageToBatch(wb Buffer, mm *pb.MessageMetadata, payload []byte) {
metadataSize := uint32(mm.Size())
wb.WriteUint32(metadataSize)

wb.ResizeIfNeeded(metadataSize)
_, err := mm.MarshalToSizedBuffer(wb.WritableSlice()[:metadataSize])
if err != nil {
panic(fmt.Sprintf("Protobuf serialization error: %v", err))
}

wb.WrittenBytes(metadataSize)
wb.Write(payload)
}

func ConstructBufferFromMessage(wb Buffer, mm *pb.MessageMetadata, payload []byte) {
metadataSize := uint32(mm.Size())
wb.WriteUint32(metadataSize)

wb.ResizeIfNeeded(metadataSize)
_, err := mm.MarshalToSizedBuffer(wb.WritableSlice()[:metadataSize])
if err != nil {
panic(fmt.Sprintf("Protobuf serialization error: %v", err))
}

wb.WrittenBytes(metadataSize)
wb.Write(payload)
}

func serializeBatch(wb Buffer,
cmdSend *pb.BaseCommand,
msgMetadata *pb.MessageMetadata,
Expand Down
4 changes: 4 additions & 0 deletions pulsar/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ type ProducerOptions struct {

// Encryption specifies the fields required to encrypt a message
Encryption *ProducerEncryptionInfo

// ChunkingEnabled specifies if Producer will split the original message into chunks and publish
// them with chunked metadata when message payload size is larger than broker can support.
ChunkingEnabled bool
}

// Producer is used to publish messages on a topic
Expand Down
86 changes: 84 additions & 2 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package pulsar

import (
"context"
uuidGen "github.com/google/uuid"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -411,8 +412,8 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
payload = schemaPayload
}

// if msg is too large
if len(payload) > int(p._getConn().GetMaxMessageSize()) {
// if msg is too large and chunked msg not enabled
if len(payload) > int(p._getConn().GetMaxMessageSize()) && !p.options.ChunkingEnabled {
p.publishSemaphore.Release()
request.callback(nil, request.msg, errMessageTooLarge)
p.log.WithError(errMessageTooLarge).
Expand All @@ -423,6 +424,16 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
return
}

// if msg is too large and chunked msg enabled, send chunked msg
if len(payload) > int(p._getConn().GetMaxMessageSize()) && p.options.ChunkingEnabled {
p.log.
WithField("size", len(payload)).
WithField("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize())).
Info("Size exceed limit, send with chunks")
p.internalSendWithTrunks(request, payload)
return
}

deliverAt := msg.DeliverAt
if msg.DeliverAfter.Nanoseconds() > 0 {
deliverAt = time.Now().Add(msg.DeliverAfter)
Expand Down Expand Up @@ -496,6 +507,77 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
}
}

func (p *partitionProducer) internalSendWithTrunks(request *sendRequest, payload []byte) {
chunkSize := int(p._getConn().GetMaxMessageSize())
totalChunks := (len(payload)+1)/chunkSize + 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The last +1 may not be accurate. Java implementation has a more precise way to calculate the number of chunks

 int totalChunks = canAddToBatch(msg) ? 1
                : Math.max(1, compressedPayload.readableBytes()) / ClientCnx.getMaxMessageSize()
                        + (Math.max(1, compressedPayload.readableBytes()) % ClientCnx.getMaxMessageSize() == 0 ? 0 : 1);

uuid := uuidGen.New().String()

for chunkId := 0; chunkId < chunkSize; chunkId++ {
left := chunkId * chunkSize
right := left + chunkSize
if right > len(payload)-1 {
right = len(payload) - 1
}
// [left, right)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this as a separate function to return a slice of [ {trunk 0 left,right}, {trunk 1 left,right} ... ]. So that we can write a unit test to verify any number is missing from the splitting. WDYT?

p.internalSendSingleChunk(request, payload[left:right], uuid, totalChunks, len(payload), chunkId)
}
}

func (p *partitionProducer) internalSendSingleChunk(request *sendRequest, payload []byte,
uuid string, totalChunks int, totalSize int, chunkId int) {

msg := request.msg
mm := &pb.MessageMetadata{}

deliverAt := msg.DeliverAt
if msg.DeliverAfter.Nanoseconds() > 0 {
deliverAt = time.Now().Add(msg.DeliverAfter)
mm.DeliverAtTime = proto.Int64(int64(internal.TimestampMillis(deliverAt)))
}

if msg.EventTime.UnixNano() != 0 {
mm.EventTime = proto.Uint64(internal.TimestampMillis(msg.EventTime))
}

if msg.Key != "" {
mm.PartitionKey = proto.String(msg.Key)
}

if len(msg.OrderingKey) != 0 {
mm.OrderingKey = []byte(msg.OrderingKey)
}

if msg.Properties != nil {
mm.Properties = internal.ConvertFromStringMap(msg.Properties)
}

if msg.SequenceID != nil {
sequenceID := uint64(*msg.SequenceID)
mm.SequenceId = proto.Uint64(sequenceID)
}

// Fields required for chunked data
mm.Uuid = proto.String(uuid)
mm.NumChunksFromMsg = proto.Int(totalChunks)
mm.TotalChunkMsgSize = proto.Int(totalSize)
mm.ChunkId = proto.Int(chunkId)

// Directly construct a buffer and put it to the pending queue
newBuffer := p.GetBuffer()
internal.ConstructBufferFromMessage(newBuffer, mm, payload)

callbacks := make([]interface{}, 1)
callbacks[0] = request.callback

p.pendingQueue.Put(&pendingItem{
sentAt: time.Now(),
batchData: newBuffer,
sequenceID: uint64(*msg.SequenceID),
sendRequests: callbacks,
})
p._getConn().WriteData(newBuffer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a batching is enabled, how does this flush work with batching?
One scenario is there is message already in a batch yet to be flushed upon batch requirements are fulfilled. There is a large message added requires chunking. Is this logic going to flush the chunk message ahead of previously batched message?
So do you need to flush the batch first before call individual chunking flushing? Probably add a logic like this before flush a chunk?

batchData, sequenceID, callbacks, err := p.batchBuilder.Flush()
if batchData != nil {
p.pendingQueue.Put(&pendingItem{
		sentAt:       time.Now(),
		batchData:    batchData,
		sequenceID:   sequenceID,
		sendRequests: callbacks,
	})
	p._getConn().WriteData(batchData)}

}

type pendingItem struct {
sync.Mutex
batchData internal.Buffer
Expand Down