mirror of
https://github.com/linka-cloud/grpc.git
synced 2025-06-22 01:02:29 +00:00
add otel module based on uptrace-go
Signed-off-by: Adphi <philippe.adrien.nousse@gmail.com>
This commit is contained in:
24
otel/LICENSE
Normal file
24
otel/LICENSE
Normal file
@ -0,0 +1,24 @@
|
||||
Copyright (c) 2020 github.com/uptrace/uptrace-go Authors. All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are
|
||||
met:
|
||||
|
||||
* Redistributions of source code must retain the above copyright
|
||||
notice, this list of conditions and the following disclaimer.
|
||||
* Redistributions in binary form must reproduce the above
|
||||
copyright notice, this list of conditions and the following disclaimer
|
||||
in the documentation and/or other materials provided with the
|
||||
distribution.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
120
otel/client.go
Normal file
120
otel/client.go
Normal file
@ -0,0 +1,120 @@
|
||||
package otel
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"runtime"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
sdklog "go.opentelemetry.io/otel/sdk/log"
|
||||
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
|
||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
const dummySpanName = "__dummy__"
|
||||
|
||||
type client struct {
|
||||
dsn *DSN
|
||||
tracer trace.Tracer
|
||||
|
||||
tp *sdktrace.TracerProvider
|
||||
mp *sdkmetric.MeterProvider
|
||||
lp *sdklog.LoggerProvider
|
||||
}
|
||||
|
||||
func newClient(dsn *DSN) *client {
|
||||
return &client{
|
||||
dsn: dsn,
|
||||
tracer: otel.Tracer("otel-go"),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *client) Shutdown(ctx context.Context) (lastErr error) {
|
||||
if c.tp != nil {
|
||||
if err := c.tp.Shutdown(ctx); err != nil {
|
||||
lastErr = err
|
||||
}
|
||||
c.tp = nil
|
||||
}
|
||||
if c.mp != nil {
|
||||
if err := c.mp.Shutdown(ctx); err != nil {
|
||||
lastErr = err
|
||||
}
|
||||
c.mp = nil
|
||||
}
|
||||
if c.lp != nil {
|
||||
if err := c.lp.Shutdown(ctx); err != nil {
|
||||
lastErr = err
|
||||
}
|
||||
c.lp = nil
|
||||
}
|
||||
return lastErr
|
||||
}
|
||||
|
||||
func (c *client) ForceFlush(ctx context.Context) (lastErr error) {
|
||||
if c.tp != nil {
|
||||
if err := c.tp.ForceFlush(ctx); err != nil {
|
||||
lastErr = err
|
||||
}
|
||||
}
|
||||
if c.mp != nil {
|
||||
if err := c.mp.ForceFlush(ctx); err != nil {
|
||||
lastErr = err
|
||||
}
|
||||
}
|
||||
if c.lp != nil {
|
||||
if err := c.lp.ForceFlush(ctx); err != nil {
|
||||
lastErr = err
|
||||
}
|
||||
}
|
||||
return lastErr
|
||||
}
|
||||
|
||||
// TraceURL returns the trace URL for the span.
|
||||
func (c *client) TraceURL(span trace.Span) string {
|
||||
sctx := span.SpanContext()
|
||||
return fmt.Sprintf("%s/traces/%s?span_id=%s",
|
||||
c.dsn.SiteURL(), sctx.TraceID(), sctx.SpanID().String())
|
||||
}
|
||||
|
||||
// ReportError reports an error as a span event creating a dummy span if necessary.
|
||||
func (c *client) ReportError(ctx context.Context, err error, opts ...trace.EventOption) {
|
||||
span := trace.SpanFromContext(ctx)
|
||||
if !span.IsRecording() {
|
||||
_, span = c.tracer.Start(ctx, dummySpanName)
|
||||
defer span.End()
|
||||
}
|
||||
|
||||
span.RecordError(err, opts...)
|
||||
}
|
||||
|
||||
// ReportPanic is used with defer to report panics.
|
||||
func (c *client) ReportPanic(ctx context.Context, val any) {
|
||||
c.reportPanic(ctx, val)
|
||||
// Force flush since we are about to exit on panic.
|
||||
if c.tp != nil {
|
||||
_ = c.tp.ForceFlush(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *client) reportPanic(ctx context.Context, val interface{}) {
|
||||
span := trace.SpanFromContext(ctx)
|
||||
if !span.IsRecording() {
|
||||
_, span = c.tracer.Start(ctx, dummySpanName)
|
||||
defer span.End()
|
||||
}
|
||||
|
||||
stackTrace := make([]byte, 2048)
|
||||
n := runtime.Stack(stackTrace, false)
|
||||
|
||||
span.AddEvent(
|
||||
"log",
|
||||
trace.WithAttributes(
|
||||
attribute.String("log.severity", "panic"),
|
||||
attribute.String("log.message", fmt.Sprint(val)),
|
||||
attribute.String("exception.stackstrace", string(stackTrace[:n])),
|
||||
),
|
||||
)
|
||||
}
|
336
otel/config.go
Normal file
336
otel/config.go
Normal file
@ -0,0 +1,336 @@
|
||||
package otel
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"os"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/propagation"
|
||||
"go.opentelemetry.io/otel/sdk/metric"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
|
||||
)
|
||||
|
||||
type config struct {
|
||||
dsn []string
|
||||
|
||||
// Common options
|
||||
|
||||
resourceAttributes []attribute.KeyValue
|
||||
resourceDetectors []resource.Detector
|
||||
resource *resource.Resource
|
||||
|
||||
tlsConf *tls.Config
|
||||
|
||||
// Tracing options
|
||||
tracingEnabled bool
|
||||
textMapPropagator propagation.TextMapPropagator
|
||||
tracerProvider *sdktrace.TracerProvider
|
||||
traceSampler sdktrace.Sampler
|
||||
spanProcessors []sdktrace.SpanProcessor
|
||||
prettyPrint bool
|
||||
bspOptions []sdktrace.BatchSpanProcessorOption
|
||||
|
||||
// Metrics options
|
||||
metricsEnabled bool
|
||||
metricOptions []metric.Option
|
||||
metricPrometheusBridge bool
|
||||
|
||||
// Logging options
|
||||
loggingEnabled bool
|
||||
// loggerProvider *sdklog.LoggerProvider
|
||||
}
|
||||
|
||||
func newConfig(opts []Option) *config {
|
||||
conf := &config{
|
||||
tracingEnabled: true,
|
||||
metricsEnabled: true,
|
||||
loggingEnabled: true,
|
||||
}
|
||||
|
||||
if dsn, ok := os.LookupEnv("OTEL_DSN"); ok {
|
||||
conf.dsn = []string{dsn}
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt.apply(conf)
|
||||
}
|
||||
|
||||
return conf
|
||||
}
|
||||
|
||||
func (conf *config) newResource() *resource.Resource {
|
||||
if conf.resource != nil {
|
||||
if len(conf.resourceAttributes) > 0 {
|
||||
log.Warnf("WithResource overrides WithResourceAttributes (discarding %v)",
|
||||
conf.resourceAttributes)
|
||||
}
|
||||
if len(conf.resourceDetectors) > 0 {
|
||||
log.Warnf("WithResource overrides WithResourceDetectors (discarding %v)",
|
||||
conf.resourceDetectors)
|
||||
}
|
||||
return conf.resource
|
||||
}
|
||||
|
||||
ctx := context.TODO()
|
||||
|
||||
res, err := resource.New(ctx,
|
||||
resource.WithFromEnv(),
|
||||
resource.WithTelemetrySDK(),
|
||||
resource.WithHost(),
|
||||
resource.WithDetectors(conf.resourceDetectors...),
|
||||
resource.WithAttributes(conf.resourceAttributes...))
|
||||
if err != nil {
|
||||
otel.Handle(err)
|
||||
return resource.Environment()
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------
|
||||
|
||||
type Option interface {
|
||||
apply(conf *config)
|
||||
}
|
||||
|
||||
type option func(conf *config)
|
||||
|
||||
func (fn option) apply(conf *config) {
|
||||
fn(conf)
|
||||
}
|
||||
|
||||
// WithDSN configures a data source name that is used to connect to an open-telemetry collector, for example,
|
||||
// `https://<token>@otel-collector.example.org/<project_id>`.
|
||||
//
|
||||
// The default is to use OTEL_DSN environment variable.
|
||||
func WithDSN(dsn ...string) Option {
|
||||
return option(func(conf *config) {
|
||||
conf.dsn = dsn
|
||||
})
|
||||
}
|
||||
|
||||
// WithServiceName configures `service.name` resource attribute.
|
||||
func WithServiceName(serviceName string) Option {
|
||||
return option(func(conf *config) {
|
||||
attr := semconv.ServiceNameKey.String(serviceName)
|
||||
conf.resourceAttributes = append(conf.resourceAttributes, attr)
|
||||
})
|
||||
}
|
||||
|
||||
// WithServiceVersion configures `service.version` resource attribute, for example, `1.0.0`.
|
||||
func WithServiceVersion(serviceVersion string) Option {
|
||||
return option(func(conf *config) {
|
||||
attr := semconv.ServiceVersionKey.String(serviceVersion)
|
||||
conf.resourceAttributes = append(conf.resourceAttributes, attr)
|
||||
})
|
||||
}
|
||||
|
||||
// WithDeploymentEnvironment configures `deployment.environment` resource attribute,
|
||||
// for example, `production`.
|
||||
func WithDeploymentEnvironment(env string) Option {
|
||||
return option(func(conf *config) {
|
||||
attr := semconv.DeploymentEnvironmentKey.String(env)
|
||||
conf.resourceAttributes = append(conf.resourceAttributes, attr)
|
||||
})
|
||||
}
|
||||
|
||||
// WithResourceAttributes configures resource attributes that describe an entity that produces
|
||||
// telemetry, for example, such attributes as host.name, service.name, etc.
|
||||
//
|
||||
// The default is to use `OTEL_RESOURCE_ATTRIBUTES` env var, for example,
|
||||
// `OTEL_RESOURCE_ATTRIBUTES=service.name=myservice,service.version=1.0.0`.
|
||||
func WithResourceAttributes(attrs ...attribute.KeyValue) Option {
|
||||
return option(func(conf *config) {
|
||||
conf.resourceAttributes = append(conf.resourceAttributes, attrs...)
|
||||
})
|
||||
}
|
||||
|
||||
// WithResourceDetectors adds detectors to be evaluated for the configured resource.
|
||||
func WithResourceDetectors(detectors ...resource.Detector) Option {
|
||||
return option(func(conf *config) {
|
||||
conf.resourceDetectors = append(conf.resourceDetectors, detectors...)
|
||||
})
|
||||
}
|
||||
|
||||
// WithResource configures a resource that describes an entity that produces telemetry,
|
||||
// for example, such attributes as host.name and service.name. All produced spans and metrics
|
||||
// will have these attributes.
|
||||
//
|
||||
// WithResource overrides and replaces any other resource attributes.
|
||||
func WithResource(resource *resource.Resource) Option {
|
||||
return option(func(conf *config) {
|
||||
conf.resource = resource
|
||||
})
|
||||
}
|
||||
|
||||
func WithTLSConfig(tlsConf *tls.Config) Option {
|
||||
return option(func(conf *config) {
|
||||
conf.tlsConf = tlsConf
|
||||
})
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------
|
||||
|
||||
type TracingOption interface {
|
||||
Option
|
||||
tracing()
|
||||
}
|
||||
|
||||
type tracingOption func(conf *config)
|
||||
|
||||
var _ TracingOption = (*tracingOption)(nil)
|
||||
|
||||
func (fn tracingOption) apply(conf *config) {
|
||||
fn(conf)
|
||||
}
|
||||
|
||||
func (fn tracingOption) tracing() {}
|
||||
|
||||
// WithTracingEnabled can be used to enable/disable tracing.
|
||||
func WithTracingEnabled(on bool) TracingOption {
|
||||
return tracingOption(func(conf *config) {
|
||||
conf.tracingEnabled = on
|
||||
})
|
||||
}
|
||||
|
||||
// WithTracingDisabled disables tracing.
|
||||
func WithTracingDisabled() TracingOption {
|
||||
return WithTracingEnabled(false)
|
||||
}
|
||||
|
||||
// WithTracerProvider overwrites the default otel tracer provider.
|
||||
// You can use it to configure otel to use OTLP exporter.
|
||||
//
|
||||
// When this option is used, you might need to call otel.SetTracerProvider
|
||||
// to register the provider as the global trace provider.
|
||||
func WithTracerProvider(provider *sdktrace.TracerProvider) TracingOption {
|
||||
return tracingOption(func(conf *config) {
|
||||
conf.tracerProvider = provider
|
||||
})
|
||||
}
|
||||
|
||||
// WithTraceSampler configures a span sampler.
|
||||
func WithTraceSampler(sampler sdktrace.Sampler) TracingOption {
|
||||
return tracingOption(func(conf *config) {
|
||||
conf.traceSampler = sampler
|
||||
})
|
||||
}
|
||||
|
||||
// WithSpanProcessor configures an additional span processor.
|
||||
func WithSpanProcessor(sp sdktrace.SpanProcessor) TracingOption {
|
||||
return tracingOption(func(conf *config) {
|
||||
conf.spanProcessors = append(conf.spanProcessors, sp)
|
||||
})
|
||||
}
|
||||
|
||||
// WithPropagator sets the global TextMapPropagator used by OpenTelemetry.
|
||||
// The default is propagation.TraceContext and propagation.Baggage.
|
||||
func WithPropagator(propagator propagation.TextMapPropagator) TracingOption {
|
||||
return tracingOption(func(conf *config) {
|
||||
conf.textMapPropagator = propagator
|
||||
})
|
||||
}
|
||||
|
||||
// WithTextMapPropagator is an alias for WithPropagator.
|
||||
func WithTextMapPropagator(propagator propagation.TextMapPropagator) TracingOption {
|
||||
return WithPropagator(propagator)
|
||||
}
|
||||
|
||||
// WithPrettyPrintSpanExporter adds a span exproter that prints spans to stdout.
|
||||
// It is useful for debugging or demonstration purposes.
|
||||
func WithPrettyPrintSpanExporter() TracingOption {
|
||||
return tracingOption(func(conf *config) {
|
||||
conf.prettyPrint = true
|
||||
})
|
||||
}
|
||||
|
||||
// WithBatchSpanProcessorOption specifies options used to created BatchSpanProcessor.
|
||||
func WithBatchSpanProcessorOption(opts ...sdktrace.BatchSpanProcessorOption) TracingOption {
|
||||
return tracingOption(func(conf *config) {
|
||||
conf.bspOptions = append(conf.bspOptions, opts...)
|
||||
})
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------
|
||||
|
||||
type MetricsOption interface {
|
||||
Option
|
||||
metrics()
|
||||
}
|
||||
|
||||
type metricsOption func(conf *config)
|
||||
|
||||
var _ MetricsOption = (*metricsOption)(nil)
|
||||
|
||||
func (fn metricsOption) apply(conf *config) {
|
||||
fn(conf)
|
||||
}
|
||||
|
||||
func (fn metricsOption) metrics() {}
|
||||
|
||||
// WithMetricsEnabled can be used to enable/disable metrics.
|
||||
func WithMetricsEnabled(on bool) MetricsOption {
|
||||
return metricsOption(func(conf *config) {
|
||||
conf.metricsEnabled = on
|
||||
})
|
||||
}
|
||||
|
||||
// WithMetricsDisabled disables metrics.
|
||||
func WithMetricsDisabled() MetricsOption {
|
||||
return WithMetricsEnabled(false)
|
||||
}
|
||||
|
||||
func WithMetricOption(options ...metric.Option) MetricsOption {
|
||||
return metricsOption(func(conf *config) {
|
||||
conf.metricOptions = append(conf.metricOptions, options...)
|
||||
})
|
||||
}
|
||||
|
||||
func WithMetricPrometheusBridge() MetricsOption {
|
||||
return metricsOption(func(conf *config) {
|
||||
conf.metricPrometheusBridge = true
|
||||
})
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------
|
||||
|
||||
type LoggingOption interface {
|
||||
Option
|
||||
logging()
|
||||
}
|
||||
|
||||
type loggingOption func(conf *config)
|
||||
|
||||
var _ LoggingOption = (*loggingOption)(nil)
|
||||
|
||||
func (fn loggingOption) apply(conf *config) {
|
||||
fn(conf)
|
||||
}
|
||||
|
||||
func (fn loggingOption) logging() {}
|
||||
|
||||
// WithLoggingDisabled disables logging.
|
||||
func WithLoggingDisabled() LoggingOption {
|
||||
return WithLoggingEnabled(false)
|
||||
}
|
||||
|
||||
// WithLoggingEnabled can be used to enable/disable logging.
|
||||
func WithLoggingEnabled(on bool) LoggingOption {
|
||||
return loggingOption(func(conf *config) {
|
||||
conf.loggingEnabled = on
|
||||
})
|
||||
}
|
||||
|
||||
// WithLoggerProvider overwrites the default otel logger provider.
|
||||
// You can use it to configure otel to use OTLP exporter.
|
||||
//
|
||||
// When this option is used, you might need to call otel.SetLoggerProvider
|
||||
// to register the provider as the global trace provider.
|
||||
// func WithLoggerProvider(provider *sdklog.LoggerProvider) LoggingOption {
|
||||
// return loggingOption(func(conf *config) {
|
||||
// conf.loggerProvider = provider
|
||||
// })
|
||||
// }
|
103
otel/dsn.go
Normal file
103
otel/dsn.go
Normal file
@ -0,0 +1,103 @@
|
||||
package otel
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
)
|
||||
|
||||
type DSN struct {
|
||||
original string
|
||||
|
||||
Scheme string
|
||||
Host string
|
||||
HTTPPort string
|
||||
User string
|
||||
Password string
|
||||
Token string
|
||||
}
|
||||
|
||||
func (dsn *DSN) String() string {
|
||||
return dsn.original
|
||||
}
|
||||
|
||||
func (dsn *DSN) SiteURL() string {
|
||||
return dsn.Scheme + "://" + joinHostPort(dsn.Host, dsn.HTTPPort)
|
||||
}
|
||||
|
||||
func (dsn *DSN) OTLPHttpEndpoint() string {
|
||||
return joinHostPort(dsn.Host, dsn.HTTPPort)
|
||||
}
|
||||
|
||||
func (dsn *DSN) Headers() map[string]string {
|
||||
if dsn.Token != "" {
|
||||
return map[string]string{
|
||||
"Authorization": "Bearer " + dsn.Token,
|
||||
}
|
||||
}
|
||||
if dsn.User != "" && dsn.Password != "" {
|
||||
return map[string]string{
|
||||
"Authorization": "Basic " + base64.StdEncoding.EncodeToString([]byte(dsn.User+":"+dsn.Password)),
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func ParseDSN(dsnStr string) (*DSN, error) {
|
||||
if dsnStr == "" {
|
||||
return nil, fmt.Errorf("DSN is empty (use WithDSN or OTEL_DSN env var)")
|
||||
}
|
||||
|
||||
u, err := url.Parse(dsnStr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't parse DSN=%q: %s", dsnStr, err)
|
||||
}
|
||||
|
||||
switch u.Scheme {
|
||||
case "http", "https":
|
||||
case "":
|
||||
return nil, fmt.Errorf("DSN=%q does not have a scheme", dsnStr)
|
||||
default:
|
||||
return nil, fmt.Errorf("DSN=%q has unsupported scheme %q", dsnStr, u.Scheme)
|
||||
}
|
||||
|
||||
if u.Host == "" {
|
||||
return nil, fmt.Errorf("DSN=%q does not have a host", dsnStr)
|
||||
}
|
||||
|
||||
dsn := DSN{
|
||||
original: dsnStr,
|
||||
Scheme: u.Scheme,
|
||||
Host: u.Host,
|
||||
}
|
||||
if p, ok := u.User.Password(); ok {
|
||||
dsn.User = u.User.Username()
|
||||
dsn.Password = p
|
||||
} else {
|
||||
dsn.Token = u.User.Username()
|
||||
}
|
||||
|
||||
if host, port, err := net.SplitHostPort(u.Host); err == nil {
|
||||
dsn.Host = host
|
||||
dsn.HTTPPort = port
|
||||
}
|
||||
|
||||
if dsn.HTTPPort == "" {
|
||||
switch dsn.Scheme {
|
||||
case "http":
|
||||
dsn.HTTPPort = "80"
|
||||
case "https":
|
||||
dsn.HTTPPort = "443"
|
||||
}
|
||||
}
|
||||
|
||||
return &dsn, nil
|
||||
}
|
||||
|
||||
func joinHostPort(host, port string) string {
|
||||
if port == "" {
|
||||
return host
|
||||
}
|
||||
return net.JoinHostPort(host, port)
|
||||
}
|
28
otel/internal_test.go
Normal file
28
otel/internal_test.go
Normal file
@ -0,0 +1,28 @@
|
||||
package otel
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestIDGenerator(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
gen := newIDGenerator()
|
||||
|
||||
traceID1, spanID1 := gen.NewIDs(ctx)
|
||||
traceID2, spanID2 := gen.NewIDs(ctx)
|
||||
require.NotEqual(t, traceID1, traceID2)
|
||||
require.NotEqual(t, spanID1, spanID2)
|
||||
|
||||
spanID3 := gen.NewSpanID(ctx, traceID1)
|
||||
require.NotEqual(t, spanID1, spanID3)
|
||||
}
|
||||
|
||||
func TestSpanPrecision(t *testing.T) {
|
||||
dur := time.Duration(math.MaxUint32) * time.Duration(spanIDPrec)
|
||||
require.Equal(t, "1193h2m47.295s", dur.String())
|
||||
}
|
64
otel/logging.go
Normal file
64
otel/logging.go
Normal file
@ -0,0 +1,64 @@
|
||||
package otel
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp"
|
||||
"go.opentelemetry.io/otel/log/global"
|
||||
sdklog "go.opentelemetry.io/otel/sdk/log"
|
||||
)
|
||||
|
||||
func configureLogging(ctx context.Context, conf *config) *sdklog.LoggerProvider {
|
||||
var opts []sdklog.LoggerProviderOption
|
||||
if res := conf.newResource(); res != nil {
|
||||
opts = append(opts, sdklog.WithResource(res))
|
||||
}
|
||||
|
||||
for _, dsn := range conf.dsn {
|
||||
dsn, err := ParseDSN(dsn)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("ParseDSN failed")
|
||||
continue
|
||||
}
|
||||
|
||||
exp, err := newOtlpLogExporter(ctx, conf, dsn)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("otlploghttp.New failed")
|
||||
continue
|
||||
}
|
||||
|
||||
queueSize := queueSize()
|
||||
bspOptions := []sdklog.BatchProcessorOption{
|
||||
sdklog.WithMaxQueueSize(queueSize),
|
||||
sdklog.WithExportMaxBatchSize(queueSize),
|
||||
sdklog.WithExportInterval(10 * time.Second),
|
||||
sdklog.WithExportTimeout(10 * time.Second),
|
||||
}
|
||||
bsp := sdklog.NewBatchProcessor(exp, bspOptions...)
|
||||
opts = append(opts, sdklog.WithProcessor(bsp))
|
||||
}
|
||||
|
||||
provider := sdklog.NewLoggerProvider(opts...)
|
||||
global.SetLoggerProvider(provider)
|
||||
|
||||
return provider
|
||||
}
|
||||
|
||||
func newOtlpLogExporter(
|
||||
ctx context.Context, conf *config, dsn *DSN,
|
||||
) (*otlploghttp.Exporter, error) {
|
||||
options := []otlploghttp.Option{
|
||||
otlploghttp.WithEndpoint(dsn.OTLPHttpEndpoint()),
|
||||
otlploghttp.WithHeaders(dsn.Headers()),
|
||||
otlploghttp.WithCompression(otlploghttp.GzipCompression),
|
||||
}
|
||||
|
||||
if conf.tlsConf != nil {
|
||||
options = append(options, otlploghttp.WithTLSClientConfig(conf.tlsConf))
|
||||
} else if dsn.Scheme == "http" {
|
||||
options = append(options, otlploghttp.WithInsecure())
|
||||
}
|
||||
|
||||
return otlploghttp.New(ctx, options...)
|
||||
}
|
77
otel/metrics.go
Normal file
77
otel/metrics.go
Normal file
@ -0,0 +1,77 @@
|
||||
package otel
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/contrib/bridges/prometheus"
|
||||
runtimemetrics "go.opentelemetry.io/contrib/instrumentation/runtime"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
|
||||
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||
)
|
||||
|
||||
func configureMetrics(ctx context.Context, conf *config) *sdkmetric.MeterProvider {
|
||||
opts := conf.metricOptions
|
||||
if res := conf.newResource(); res != nil {
|
||||
opts = append(opts, sdkmetric.WithResource(res))
|
||||
}
|
||||
|
||||
for _, dsn := range conf.dsn {
|
||||
dsn, err := ParseDSN(dsn)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("ParseDSN failed")
|
||||
continue
|
||||
}
|
||||
|
||||
exp, err := otlpmetricClient(ctx, conf, dsn)
|
||||
if err != nil {
|
||||
log.WithError(err).Warn("otlpmetricClient")
|
||||
continue
|
||||
}
|
||||
|
||||
ropts := []sdkmetric.PeriodicReaderOption{sdkmetric.WithInterval(5 * time.Second)}
|
||||
if conf.metricPrometheusBridge {
|
||||
ropts = append(ropts, sdkmetric.WithProducer(prometheus.NewMetricProducer()))
|
||||
}
|
||||
opts = append(opts, sdkmetric.WithReader(sdkmetric.NewPeriodicReader(exp, ropts...)))
|
||||
}
|
||||
|
||||
provider := sdkmetric.NewMeterProvider(opts...)
|
||||
otel.SetMeterProvider(provider)
|
||||
|
||||
if err := runtimemetrics.Start(); err != nil {
|
||||
log.WithError(err).Error("runtimemetrics.Start failed")
|
||||
}
|
||||
|
||||
return provider
|
||||
}
|
||||
|
||||
func otlpmetricClient(ctx context.Context, conf *config, dsn *DSN) (sdkmetric.Exporter, error) {
|
||||
options := []otlpmetrichttp.Option{
|
||||
otlpmetrichttp.WithEndpoint(dsn.OTLPHttpEndpoint()),
|
||||
otlpmetrichttp.WithHeaders(dsn.Headers()),
|
||||
otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression),
|
||||
otlpmetrichttp.WithTemporalitySelector(preferDeltaTemporalitySelector),
|
||||
}
|
||||
|
||||
if conf.tlsConf != nil {
|
||||
options = append(options, otlpmetrichttp.WithTLSClientConfig(conf.tlsConf))
|
||||
} else if dsn.Scheme == "http" {
|
||||
options = append(options, otlpmetrichttp.WithInsecure())
|
||||
}
|
||||
|
||||
return otlpmetrichttp.New(ctx, options...)
|
||||
}
|
||||
|
||||
func preferDeltaTemporalitySelector(kind sdkmetric.InstrumentKind) metricdata.Temporality {
|
||||
switch kind {
|
||||
case sdkmetric.InstrumentKindCounter,
|
||||
sdkmetric.InstrumentKindObservableCounter,
|
||||
sdkmetric.InstrumentKindHistogram:
|
||||
return metricdata.DeltaTemporality
|
||||
default:
|
||||
return metricdata.CumulativeTemporality
|
||||
}
|
||||
}
|
122
otel/otel.go
Normal file
122
otel/otel.go
Normal file
@ -0,0 +1,122 @@
|
||||
package otel
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/propagation"
|
||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"go.linka.cloud/grpc-toolkit/logger"
|
||||
)
|
||||
|
||||
var log = logger.StandardLogger().WithField("name", "otel")
|
||||
|
||||
// Configure configures OpenTelemetry.
|
||||
// By default, it:
|
||||
// - creates tracer provider;
|
||||
// - registers span exporter;
|
||||
// - sets tracecontext + baggage composite context propagator.
|
||||
//
|
||||
// You can use OTEL_DISABLED env var to completely skip otel configuration.
|
||||
func Configure(opts ...Option) {
|
||||
if _, ok := os.LookupEnv("OTEL_DISABLED"); ok {
|
||||
return
|
||||
}
|
||||
|
||||
ctx := context.TODO()
|
||||
conf := newConfig(opts)
|
||||
|
||||
if !conf.tracingEnabled && !conf.metricsEnabled && !conf.loggingEnabled {
|
||||
return
|
||||
}
|
||||
|
||||
if len(conf.dsn) == 0 {
|
||||
log.Warn("no DSN provided (otel-go is disabled)")
|
||||
return
|
||||
}
|
||||
|
||||
dsn, err := ParseDSN(conf.dsn[0])
|
||||
if err != nil {
|
||||
log.Warnf("invalid DSN: %s (otel is disabled)", err)
|
||||
return
|
||||
}
|
||||
|
||||
if strings.HasSuffix(dsn.Host, ":4318") {
|
||||
log.Warnf("otel-go uses OTLP/gRPC exporter, but got host %q", dsn.Host)
|
||||
}
|
||||
|
||||
client := newClient(dsn)
|
||||
|
||||
configurePropagator(conf)
|
||||
if conf.tracingEnabled {
|
||||
client.tp = configureTracing(ctx, conf)
|
||||
}
|
||||
if conf.metricsEnabled {
|
||||
client.mp = configureMetrics(ctx, conf)
|
||||
}
|
||||
if conf.loggingEnabled {
|
||||
client.lp = configureLogging(ctx, conf)
|
||||
}
|
||||
|
||||
atomicClient.Store(client)
|
||||
}
|
||||
|
||||
func configurePropagator(conf *config) {
|
||||
textMapPropagator := conf.textMapPropagator
|
||||
if textMapPropagator == nil {
|
||||
textMapPropagator = propagation.NewCompositeTextMapPropagator(
|
||||
propagation.TraceContext{},
|
||||
propagation.Baggage{},
|
||||
)
|
||||
}
|
||||
otel.SetTextMapPropagator(textMapPropagator)
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------
|
||||
|
||||
var (
|
||||
fallbackClient = newClient(&DSN{})
|
||||
atomicClient atomic.Value
|
||||
)
|
||||
|
||||
func activeClient() *client {
|
||||
v := atomicClient.Load()
|
||||
if v == nil {
|
||||
return fallbackClient
|
||||
}
|
||||
return v.(*client)
|
||||
}
|
||||
|
||||
func TraceURL(span trace.Span) string {
|
||||
return activeClient().TraceURL(span)
|
||||
}
|
||||
|
||||
func ReportError(ctx context.Context, err error, opts ...trace.EventOption) {
|
||||
activeClient().ReportError(ctx, err, opts...)
|
||||
}
|
||||
|
||||
func ReportPanic(ctx context.Context, val any) {
|
||||
activeClient().ReportPanic(ctx, val)
|
||||
}
|
||||
|
||||
func Shutdown(ctx context.Context) error {
|
||||
return activeClient().Shutdown(ctx)
|
||||
}
|
||||
|
||||
func ForceFlush(ctx context.Context) error {
|
||||
return activeClient().ForceFlush(ctx)
|
||||
}
|
||||
|
||||
func TracerProvider() *sdktrace.TracerProvider {
|
||||
return activeClient().tp
|
||||
}
|
||||
|
||||
// SetLogger sets the logger to the given one.
|
||||
func SetLogger(logger logger.Logger) {
|
||||
log = logger
|
||||
}
|
159
otel/tracing.go
Normal file
159
otel/tracing.go
Normal file
@ -0,0 +1,159 @@
|
||||
package otel
|
||||
|
||||
import (
|
||||
"context"
|
||||
cryptorand "crypto/rand"
|
||||
"encoding/binary"
|
||||
"math/rand"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
|
||||
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
|
||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
func configureTracing(ctx context.Context, conf *config) *sdktrace.TracerProvider {
|
||||
provider := conf.tracerProvider
|
||||
if provider == nil {
|
||||
var opts []sdktrace.TracerProviderOption
|
||||
|
||||
opts = append(opts, sdktrace.WithIDGenerator(newIDGenerator()))
|
||||
if res := conf.newResource(); res != nil {
|
||||
opts = append(opts, sdktrace.WithResource(res))
|
||||
}
|
||||
if conf.traceSampler != nil {
|
||||
opts = append(opts, sdktrace.WithSampler(conf.traceSampler))
|
||||
}
|
||||
|
||||
provider = sdktrace.NewTracerProvider(opts...)
|
||||
otel.SetTracerProvider(provider)
|
||||
}
|
||||
|
||||
for _, dsn := range conf.dsn {
|
||||
dsn, err := ParseDSN(dsn)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("ParseDSN failed")
|
||||
continue
|
||||
}
|
||||
|
||||
exp, err := otlptrace.New(ctx, otlpTraceClient(conf, dsn))
|
||||
if err != nil {
|
||||
log.WithError(err).Error("otlptrace.New failed")
|
||||
continue
|
||||
}
|
||||
|
||||
queueSize := queueSize()
|
||||
bspOptions := []sdktrace.BatchSpanProcessorOption{
|
||||
sdktrace.WithMaxQueueSize(queueSize),
|
||||
sdktrace.WithMaxExportBatchSize(queueSize),
|
||||
sdktrace.WithBatchTimeout(10 * time.Second),
|
||||
sdktrace.WithExportTimeout(10 * time.Second),
|
||||
}
|
||||
bspOptions = append(bspOptions, conf.bspOptions...)
|
||||
|
||||
bsp := sdktrace.NewBatchSpanProcessor(exp, bspOptions...)
|
||||
provider.RegisterSpanProcessor(bsp)
|
||||
}
|
||||
|
||||
// Register additional span processors.
|
||||
for _, sp := range conf.spanProcessors {
|
||||
provider.RegisterSpanProcessor(sp)
|
||||
}
|
||||
|
||||
if conf.prettyPrint {
|
||||
exporter, err := stdouttrace.New(stdouttrace.WithPrettyPrint())
|
||||
if err != nil {
|
||||
log.WithError(err).Error("stdouttrace.New failed")
|
||||
} else {
|
||||
provider.RegisterSpanProcessor(sdktrace.NewSimpleSpanProcessor(exporter))
|
||||
}
|
||||
}
|
||||
|
||||
return provider
|
||||
}
|
||||
|
||||
func otlpTraceClient(conf *config, dsn *DSN) otlptrace.Client {
|
||||
options := []otlptracehttp.Option{
|
||||
otlptracehttp.WithEndpoint(dsn.OTLPHttpEndpoint()),
|
||||
otlptracehttp.WithHeaders(dsn.Headers()),
|
||||
otlptracehttp.WithCompression(otlptracehttp.GzipCompression),
|
||||
}
|
||||
|
||||
if conf.tlsConf != nil {
|
||||
options = append(options, otlptracehttp.WithTLSClientConfig(conf.tlsConf))
|
||||
} else if dsn.Scheme == "http" {
|
||||
options = append(options, otlptracehttp.WithInsecure())
|
||||
}
|
||||
|
||||
return otlptracehttp.NewClient(options...)
|
||||
}
|
||||
|
||||
func queueSize() int {
|
||||
const min = 1000
|
||||
const max = 16000
|
||||
|
||||
n := (runtime.GOMAXPROCS(0) / 2) * 1000
|
||||
if n < min {
|
||||
return min
|
||||
}
|
||||
if n > max {
|
||||
return max
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------------------
|
||||
|
||||
const spanIDPrec = int64(time.Millisecond)
|
||||
|
||||
type idGenerator struct {
|
||||
sync.Mutex
|
||||
randSource *rand.Rand
|
||||
}
|
||||
|
||||
func newIDGenerator() *idGenerator {
|
||||
gen := &idGenerator{}
|
||||
var rngSeed int64
|
||||
_ = binary.Read(cryptorand.Reader, binary.LittleEndian, &rngSeed)
|
||||
gen.randSource = rand.New(rand.NewSource(rngSeed))
|
||||
return gen
|
||||
}
|
||||
|
||||
var _ sdktrace.IDGenerator = (*idGenerator)(nil)
|
||||
|
||||
// NewIDs returns a new trace and span ID.
|
||||
func (gen *idGenerator) NewIDs(ctx context.Context) (trace.TraceID, trace.SpanID) {
|
||||
unixNano := time.Now().UnixNano()
|
||||
|
||||
gen.Lock()
|
||||
defer gen.Unlock()
|
||||
|
||||
tid := trace.TraceID{}
|
||||
binary.BigEndian.PutUint64(tid[:8], uint64(unixNano))
|
||||
_, _ = gen.randSource.Read(tid[8:])
|
||||
|
||||
sid := trace.SpanID{}
|
||||
binary.BigEndian.PutUint32(sid[:4], uint32(unixNano/spanIDPrec))
|
||||
_, _ = gen.randSource.Read(sid[4:])
|
||||
|
||||
return tid, sid
|
||||
}
|
||||
|
||||
// NewSpanID returns a ID for a new span in the trace with traceID.
|
||||
func (gen *idGenerator) NewSpanID(ctx context.Context, traceID trace.TraceID) trace.SpanID {
|
||||
unixNano := time.Now().UnixNano()
|
||||
|
||||
gen.Lock()
|
||||
defer gen.Unlock()
|
||||
|
||||
sid := trace.SpanID{}
|
||||
binary.BigEndian.PutUint32(sid[:4], uint32(unixNano/spanIDPrec))
|
||||
_, _ = gen.randSource.Read(sid[4:])
|
||||
|
||||
return sid
|
||||
}
|
Reference in New Issue
Block a user