|
| 1 | +package googlecloudloggingexporter |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "fmt" |
| 6 | + |
| 7 | + "cloud.google.com/go/logging" |
| 8 | + "github.com/google/uuid" |
| 9 | + "go.opentelemetry.io/collector/component" |
| 10 | + "go.opentelemetry.io/collector/consumer" |
| 11 | + "go.opentelemetry.io/collector/exporter/exporterhelper" |
| 12 | + "go.opentelemetry.io/collector/model/pdata" |
| 13 | + "go.uber.org/zap" |
| 14 | +) |
| 15 | + |
| 16 | +type exporter struct { |
| 17 | + Config *Config |
| 18 | + logger *zap.Logger |
| 19 | + collectorID string |
| 20 | + cloudLogger *logging.Logger |
| 21 | +} |
| 22 | + |
| 23 | +func newCloudLoggingExporter(config *Config, params component.ExporterCreateSettings) (component.LogsExporter, error) { |
| 24 | + loggingExporter, err := newCloudLoggingExporter(config, params) |
| 25 | + if err != nil { |
| 26 | + return nil, err |
| 27 | + } |
| 28 | + return exporterhelper.NewLogsExporter( |
| 29 | + config, |
| 30 | + params, |
| 31 | + loggingExporter.ConsumeLogs, |
| 32 | + exporterhelper.WithQueue(config.enforcedQueueSettings()), |
| 33 | + exporterhelper.WithRetry(config.RetrySettings)) |
| 34 | +} |
| 35 | + |
| 36 | +func newCloudLoggingLogExporter(config *Config, params component.ExporterCreateSettings) (component.LogsExporter, error) { |
| 37 | + // Validate the passed config. |
| 38 | + if err := config.Validate(); err != nil { |
| 39 | + return nil, err |
| 40 | + } |
| 41 | + |
| 42 | + // Generate a Collector ID. |
| 43 | + collectorIdentifier, err := uuid.NewRandom() |
| 44 | + if err != nil { |
| 45 | + return nil, err |
| 46 | + } |
| 47 | + |
| 48 | + // Read project ID from Metadata if not specified by config. |
| 49 | + if config.ProjectID == "" { |
| 50 | + projectId, err := readProjectIdMetadata() |
| 51 | + if err != nil { |
| 52 | + return nil, fmt.Errorf("failed to read Google Cloud project ID: %v", err) |
| 53 | + } |
| 54 | + config.ProjectID = projectId |
| 55 | + } |
| 56 | + |
| 57 | + // Create Cloud Logging logger with project ID. |
| 58 | + client, err := logging.NewClient(context.Background(), config.ProjectID) |
| 59 | + if err != nil { |
| 60 | + return nil, fmt.Errorf("failed to create Google Cloud Logging client: %v", err) |
| 61 | + } |
| 62 | + defer client.Close() |
| 63 | + logger := client.Logger(config.LogName) |
| 64 | + |
| 65 | + // Create the logging exporter. |
| 66 | + loggingExporter := &exporter{ |
| 67 | + Config: config, |
| 68 | + logger: params.Logger, |
| 69 | + collectorID: collectorIdentifier.String(), |
| 70 | + cloudLogger: logger, |
| 71 | + } |
| 72 | + return loggingExporter, nil |
| 73 | +} |
| 74 | + |
| 75 | +func (e *exporter) ConsumeLogs(ctx context.Context, ld pdata.Logs) error { |
| 76 | + logEntries, dropped := logsToEntries(e.Config, e.logger, ld) |
| 77 | + if len(logEntries) == 0 { |
| 78 | + return nil |
| 79 | + } |
| 80 | + if dropped > 0 { |
| 81 | + e.logger.Debug("Dropped logs", zap.Any("logsDropped", dropped)) |
| 82 | + } |
| 83 | + |
| 84 | + for _, logEntry := range logEntries { |
| 85 | + e.logger.Debug("Adding log entry", zap.Any("entry", logEntry)) |
| 86 | + e.cloudLogger.Log(logEntry) |
| 87 | + } |
| 88 | + e.logger.Debug("Log entries successfully buffered") |
| 89 | + err := e.cloudLogger.Flush() |
| 90 | + if err != nil { |
| 91 | + e.logger.Error("error force flushing logs. Skipping to next logPusher.", zap.Error(err)) |
| 92 | + } |
| 93 | + return nil |
| 94 | +} |
| 95 | + |
| 96 | +func (e *exporter) Capabilities() consumer.Capabilities { |
| 97 | + return consumer.Capabilities{MutatesData: false} |
| 98 | +} |
| 99 | + |
| 100 | +func (e *exporter) Shutdown(ctx context.Context) error { |
| 101 | + // Flush the remaining logs before shutting down the exporter. |
| 102 | + if e.cloudLogger != nil { |
| 103 | + err := e.cloudLogger.Flush() |
| 104 | + if err != nil { |
| 105 | + return err |
| 106 | + } |
| 107 | + } |
| 108 | + return nil |
| 109 | +} |
| 110 | + |
| 111 | +func (e *exporter) Start(ctx context.Context, host component.Host) error { |
| 112 | + return nil |
| 113 | +} |
| 114 | + |
| 115 | +func logsToEntries(config *Config, logger *zap.Logger, ld pdata.Logs) ([]logging.Entry, int) { |
| 116 | + entries := []logging.Entry{} |
| 117 | + dropped := 0 |
| 118 | + rls := ld.ResourceLogs() |
| 119 | + for i := 0; i < rls.Len(); i++ { |
| 120 | + rl := rls.At(i) |
| 121 | + resourceAttrs := attrsValue(rl.Resource().Attributes()) |
| 122 | + ills := rl.InstrumentationLibraryLogs() |
| 123 | + for j := 0; j < ills.Len(); j++ { |
| 124 | + ils := ills.At(j) |
| 125 | + logs := ils.LogRecords() |
| 126 | + for k := 0; k < logs.Len(); k++ { |
| 127 | + log := logs.At(k) |
| 128 | + entry, err := logToEntry(config, resourceAttrs, log) |
| 129 | + if err != nil { |
| 130 | + logger.Debug("Failed to convert to Cloud Logging Entry", zap.Error(err)) |
| 131 | + dropped++ |
| 132 | + } else { |
| 133 | + entries = append(entries, entry) |
| 134 | + } |
| 135 | + } |
| 136 | + } |
| 137 | + } |
| 138 | + return entries, dropped |
| 139 | +} |
| 140 | + |
| 141 | +type entryPayload struct { |
| 142 | + Message string `json:"message"` |
| 143 | +} |
| 144 | + |
| 145 | +func logToEntry(config *Config, attributes map[string]interface{}, log pdata.LogRecord) (logging.Entry, error) { |
| 146 | + payload := entryPayload{ |
| 147 | + Message: log.Body().AsString(), |
| 148 | + } |
| 149 | + return logging.Entry{ |
| 150 | + Payload: payload, |
| 151 | + Timestamp: log.Timestamp().AsTime(), |
| 152 | + Severity: logging.Severity(log.SeverityNumber()), |
| 153 | + LogName: config.LogName, |
| 154 | + Trace: log.TraceID().HexString(), |
| 155 | + SpanID: log.SpanID().HexString(), |
| 156 | + }, nil |
| 157 | +} |
| 158 | + |
| 159 | +func attrsValue(attrs pdata.AttributeMap) map[string]interface{} { |
| 160 | + if attrs.Len() == 0 { |
| 161 | + return nil |
| 162 | + } |
| 163 | + out := make(map[string]interface{}, attrs.Len()) |
| 164 | + attrs.Range(func(k string, v pdata.AttributeValue) bool { |
| 165 | + out[k] = attrValue(v) |
| 166 | + return true |
| 167 | + }) |
| 168 | + return out |
| 169 | +} |
| 170 | + |
| 171 | +func attrValue(value pdata.AttributeValue) interface{} { |
| 172 | + switch value.Type() { |
| 173 | + case pdata.AttributeValueTypeInt: |
| 174 | + return value.IntVal() |
| 175 | + case pdata.AttributeValueTypeBool: |
| 176 | + return value.BoolVal() |
| 177 | + case pdata.AttributeValueTypeDouble: |
| 178 | + return value.DoubleVal() |
| 179 | + case pdata.AttributeValueTypeString: |
| 180 | + return value.StringVal() |
| 181 | + case pdata.AttributeValueTypeMap: |
| 182 | + values := map[string]interface{}{} |
| 183 | + value.MapVal().Range(func(k string, v pdata.AttributeValue) bool { |
| 184 | + values[k] = attrValue(v) |
| 185 | + return true |
| 186 | + }) |
| 187 | + return values |
| 188 | + case pdata.AttributeValueTypeArray: |
| 189 | + arrayVal := value.SliceVal() |
| 190 | + values := make([]interface{}, arrayVal.Len()) |
| 191 | + for i := 0; i < arrayVal.Len(); i++ { |
| 192 | + values[i] = attrValue(arrayVal.At(i)) |
| 193 | + } |
| 194 | + return values |
| 195 | + case pdata.AttributeValueTypeEmpty: |
| 196 | + return nil |
| 197 | + default: |
| 198 | + return nil |
| 199 | + } |
| 200 | +} |
0 commit comments