Skip to content

Use internal/otelarrow in receiver/otelarrowreceiver #34138

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jul 19, 2024
1 change: 0 additions & 1 deletion cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,6 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.105.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/winperfcounters v0.105.0 // indirect
github.com/open-telemetry/otel-arrow v0.24.0 // indirect
github.com/open-telemetry/otel-arrow/collector v0.24.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/opencontainers/runc v1.1.13 // indirect
Expand Down
2 changes: 0 additions & 2 deletions cmd/otelcontribcol/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions exporter/otelarrowexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,5 @@ require (
replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow => ../../internal/otelarrow

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver => ../../receiver/otelarrowreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent => ../../internal/sharedcomponent
4 changes: 3 additions & 1 deletion internal/otelarrow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mostynb/go-grpc-compression v1.2.3 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/open-telemetry/otel-arrow/collector v0.24.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.105.0 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.19.1 // indirect
Expand Down Expand Up @@ -101,3 +101,5 @@ require (
replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver => ../../receiver/otelarrowreceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelarrowexporter => ../../exporter/otelarrowexporter

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent => ../sharedcomponent
2 changes: 0 additions & 2 deletions internal/otelarrow/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion receiver/otelarrowreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ package otelarrowreceiver // import "github.com/open-telemetry/opentelemetry-col
import (
"fmt"

"github.com/open-telemetry/otel-arrow/collector/compression/zstd"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/compression/zstd"
)

// Protocols is the configuration for the supported protocols.
Expand Down
38 changes: 23 additions & 15 deletions receiver/otelarrowreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ package otelarrowreceiver // import "github.com/open-telemetry/opentelemetry-col
import (
"context"

"github.com/open-telemetry/otel-arrow/collector/sharedcomponent"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/metadata"
)

Expand Down Expand Up @@ -63,15 +63,18 @@ func createTraces(
nextConsumer consumer.Traces,
) (receiver.Traces, error) {
oCfg := cfg.(*Config)
r, err := receivers.GetOrAdd(oCfg, func() (*otelArrowReceiver, error) {
return newOTelArrowReceiver(oCfg, set)
var err error
recv := receivers.GetOrAdd(oCfg, func() component.Component {
var recv *otelArrowReceiver
recv, err = newOTelArrowReceiver(oCfg, set)
return recv
})
if err != nil {
return nil, err
}

r.Unwrap().registerTraceConsumer(nextConsumer)
return r, nil
recv.Unwrap().(*otelArrowReceiver).registerTraceConsumer(nextConsumer)
return recv, nil
}

// createMetrics creates a metrics receiver based on provided config.
Expand All @@ -82,15 +85,17 @@ func createMetrics(
consumer consumer.Metrics,
) (receiver.Metrics, error) {
oCfg := cfg.(*Config)
r, err := receivers.GetOrAdd(oCfg, func() (*otelArrowReceiver, error) {
return newOTelArrowReceiver(oCfg, set)
var err error
recv := receivers.GetOrAdd(oCfg, func() component.Component {
var recv *otelArrowReceiver
recv, err = newOTelArrowReceiver(oCfg, set)
return recv
})
if err != nil {
return nil, err
}

r.Unwrap().registerMetricsConsumer(consumer)
return r, nil
recv.Unwrap().(*otelArrowReceiver).registerMetricsConsumer(consumer)
return recv, nil
}

// createLog creates a log receiver based on provided config.
Expand All @@ -101,15 +106,18 @@ func createLog(
consumer consumer.Logs,
) (receiver.Logs, error) {
oCfg := cfg.(*Config)
r, err := receivers.GetOrAdd(oCfg, func() (*otelArrowReceiver, error) {
return newOTelArrowReceiver(oCfg, set)
var err error
recv := receivers.GetOrAdd(oCfg, func() component.Component {
var recv *otelArrowReceiver
recv, err = newOTelArrowReceiver(oCfg, set)
return recv
})
if err != nil {
return nil, err
}

r.Unwrap().registerLogsConsumer(consumer)
return r, nil
recv.Unwrap().(*otelArrowReceiver).registerLogsConsumer(consumer)
return recv, nil
}

// This is the map of already created OTel-Arrow receivers for particular configurations.
Expand All @@ -118,4 +126,4 @@ func createLog(
// create separate objects, they must use one otelArrowReceiver object per configuration.
// When the receiver is shutdown it should be removed from this map so the same configuration
// can be recreated successfully.
var receivers = sharedcomponent.NewSharedComponents[*Config, *otelArrowReceiver]()
var receivers = sharedcomponent.NewSharedComponents()
3 changes: 2 additions & 1 deletion receiver/otelarrowreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"testing"

"github.com/open-telemetry/otel-arrow/collector/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
Expand All @@ -16,6 +15,8 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/receiver/receivertest"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testutil"
)

func TestCreateDefaultConfig(t *testing.T) {
Expand Down
11 changes: 9 additions & 2 deletions receiver/otelarrowreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelar
go 1.21.0

require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow v0.105.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.105.0
github.com/open-telemetry/otel-arrow v0.24.0
github.com/open-telemetry/otel-arrow/collector v0.24.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector v0.105.1-0.20240717163034-43ed6184f9fe
go.opentelemetry.io/collector/component v0.105.1-0.20240717163034-43ed6184f9fe
Expand Down Expand Up @@ -53,7 +54,7 @@ require (
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
Expand Down Expand Up @@ -94,3 +95,9 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow => ../../internal/otelarrow

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent => ../../internal/sharedcomponent

replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelarrowexporter => ../../exporter/otelarrowexporter
6 changes: 2 additions & 4 deletions receiver/otelarrowreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions receiver/otelarrowreceiver/internal/arrow/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
"sync/atomic"

arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1"
"github.com/open-telemetry/otel-arrow/collector/admission"
"github.com/open-telemetry/otel-arrow/collector/netstats"
arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record"
"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
Expand All @@ -43,6 +41,8 @@ import (
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats"
internalmetadata "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/metadata"
)

Expand Down
6 changes: 3 additions & 3 deletions receiver/otelarrowreceiver/internal/arrow/arrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ import (

arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1"
arrowCollectorMock "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1/mock"
"github.com/open-telemetry/otel-arrow/collector/admission"
"github.com/open-telemetry/otel-arrow/collector/netstats"
"github.com/open-telemetry/otel-arrow/collector/testdata"
arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record"
arrowRecordMock "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record/mock"
otelAssert "github.com/open-telemetry/otel-arrow/pkg/otel/assert"
Expand All @@ -46,6 +43,9 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/arrow/mock"
)

Expand Down
3 changes: 2 additions & 1 deletion receiver/otelarrowreceiver/internal/logs/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"net"
"testing"

"github.com/open-telemetry/otel-arrow/collector/testdata"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
Expand All @@ -20,6 +19,8 @@ import (
"go.opentelemetry.io/collector/receiver/receivertest"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata"
)

func TestExport(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion receiver/otelarrowreceiver/internal/metrics/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"net"
"testing"

"github.com/open-telemetry/otel-arrow/collector/testdata"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
Expand All @@ -20,6 +19,8 @@ import (
"go.opentelemetry.io/collector/receiver/receivertest"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata"
)

func TestExport(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion receiver/otelarrowreceiver/internal/trace/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"net"
"testing"

"github.com/open-telemetry/otel-arrow/collector/testdata"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
Expand All @@ -20,6 +19,8 @@ import (
"go.opentelemetry.io/collector/receiver/receivertest"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata"
)

func TestExport(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions receiver/otelarrowreceiver/otelarrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ import (
"sync"

arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1"
"github.com/open-telemetry/otel-arrow/collector/admission"
"github.com/open-telemetry/otel-arrow/collector/compression/zstd"
"github.com/open-telemetry/otel-arrow/collector/netstats"
arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
Expand All @@ -25,6 +22,9 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/compression/zstd"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/arrow"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/logs"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/metrics"
Expand Down
4 changes: 2 additions & 2 deletions receiver/otelarrowreceiver/otelarrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"time"

arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1"
"github.com/open-telemetry/otel-arrow/collector/testdata"
"github.com/open-telemetry/otel-arrow/collector/testutil"
arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -47,6 +45,8 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/arrow/mock"
componentMetadata "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/metadata"
)
Expand Down