Skip to content

Commit 54a51b1

Browse files
authored
enhance: Support dynamic config for opentelemetry trace (milvus-io#32169)
relate: milvus-io#31940 --------- Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
1 parent ca0cf9b commit 54a51b1

10 files changed

Lines changed: 205 additions & 23 deletions

File tree

cmd/roles/roles.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ import (
3737
rocksmqimpl "github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server"
3838
"github.com/milvus-io/milvus/internal/util/dependency"
3939
kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
40+
"github.com/milvus-io/milvus/internal/util/initcore"
4041
internalmetrics "github.com/milvus-io/milvus/internal/util/metrics"
42+
"github.com/milvus-io/milvus/pkg/config"
4143
"github.com/milvus-io/milvus/pkg/log"
4244
"github.com/milvus-io/milvus/pkg/metrics"
4345
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper/nmq"
@@ -392,6 +394,30 @@ func (mr *MilvusRoles) Run() {
392394

393395
mr.setupLogger()
394396
tracer.Init()
397+
paramtable.Get().WatchKeyPrefix("trace", config.NewHandler("tracing handler", func(e *config.Event) {
398+
params := paramtable.Get()
399+
400+
exp, err := tracer.CreateTracerExporter(params)
401+
if err != nil {
402+
log.Warn("Init tracer faield", zap.Error(err))
403+
return
404+
}
405+
406+
// close old provider
407+
err = tracer.CloseTracerProvider(context.Background())
408+
if err != nil {
409+
log.Warn("Close old provider failed, stop reset", zap.Error(err))
410+
return
411+
}
412+
413+
tracer.SetTracerProvider(exp, params.TraceCfg.SampleFraction.GetAsFloat())
414+
log.Info("Reset tracer finished", zap.String("Exporter", params.TraceCfg.Exporter.GetValue()))
415+
416+
if paramtable.GetRole() == typeutil.QueryNodeRole || paramtable.GetRole() == typeutil.StandaloneRole {
417+
initcore.InitTraceConfig(params)
418+
log.Info("Reset segcore tracer finished", zap.String("Exporter", params.TraceCfg.Exporter.GetValue()))
419+
}
420+
}))
395421

396422
paramtable.SetCreateTime(time.Now())
397423
paramtable.SetUpdateTime(time.Now())

configs/milvus.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -790,8 +790,8 @@ quotaAndLimits:
790790

791791
trace:
792792
# trace exporter type, default is stdout,
793-
# optional values: ['stdout', 'jaeger', 'otlp']
794-
exporter: stdout
793+
# optional values: ['noop','stdout', 'jaeger', 'otlp']
794+
exporter: noop
795795
# fraction of traceID based sampler,
796796
# optional values: [0, 1]
797797
# Fractions >= 1 will always sample. Fractions < 0 are treated as zero.

internal/core/src/common/init_c.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,3 +93,14 @@ InitTrace(CTraceConfig* config) {
9393
},
9494
traceConfig);
9595
}
96+
97+
void
98+
SetTrace(CTraceConfig* config) {
99+
auto traceConfig = milvus::tracer::TraceConfig{config->exporter,
100+
config->sampleFraction,
101+
config->jaegerURL,
102+
config->otlpEndpoint,
103+
config->oltpSecure,
104+
config->nodeID};
105+
milvus::tracer::initTelemetry(traceConfig);
106+
}

internal/core/src/common/init_c.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ InitCpuNum(const int);
4545
void
4646
InitTrace(CTraceConfig* config);
4747

48+
void
49+
SetTrace(CTraceConfig* config);
50+
4851
#ifdef __cplusplus
4952
};
5053
#endif

internal/util/initcore/init_core.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,26 @@ func InitTraceConfig(params *paramtable.ComponentParam) {
6464
C.InitTrace(&config)
6565
}
6666

67+
func ResetTraceConfig(params *paramtable.ComponentParam) {
68+
sampleFraction := C.float(params.TraceCfg.SampleFraction.GetAsFloat())
69+
nodeID := C.int(paramtable.GetNodeID())
70+
exporter := C.CString(params.TraceCfg.Exporter.GetValue())
71+
jaegerURL := C.CString(params.TraceCfg.JaegerURL.GetValue())
72+
endpoint := C.CString(params.TraceCfg.OtlpEndpoint.GetValue())
73+
defer C.free(unsafe.Pointer(exporter))
74+
defer C.free(unsafe.Pointer(jaegerURL))
75+
defer C.free(unsafe.Pointer(endpoint))
76+
77+
config := C.CTraceConfig{
78+
exporter: exporter,
79+
sampleFraction: sampleFraction,
80+
jaegerURL: jaegerURL,
81+
otlpEndpoint: endpoint,
82+
nodeID: nodeID,
83+
}
84+
C.SetTrace(&config)
85+
}
86+
6787
func InitRemoteChunkManager(params *paramtable.ComponentParam) error {
6888
cAddress := C.CString(params.MinioCfg.Address.GetValue())
6989
cBucketName := C.CString(params.MinioCfg.BucketName.GetValue())
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Licensed to the LF AI & Data foundation under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
package initcore
18+
19+
import (
20+
"testing"
21+
22+
"github.com/milvus-io/milvus/pkg/util/paramtable"
23+
)
24+
25+
func TestTracer(t *testing.T) {
26+
paramtable.Init()
27+
InitTraceConfig(paramtable.Get())
28+
29+
paramtable.Get().Save(paramtable.Get().TraceCfg.Exporter.Key, "stdout")
30+
ResetTraceConfig(paramtable.Get())
31+
}

pkg/tracer/tracer.go

Lines changed: 51 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,63 @@ import (
2929
"go.opentelemetry.io/otel/sdk/resource"
3030
sdk "go.opentelemetry.io/otel/sdk/trace"
3131
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
32+
"go.opentelemetry.io/otel/trace"
3233
"go.uber.org/zap"
3334

3435
"github.com/milvus-io/milvus/pkg/log"
3536
"github.com/milvus-io/milvus/pkg/util/paramtable"
3637
)
3738

38-
func Init() {
39+
func Init() error {
3940
params := paramtable.Get()
4041

42+
exp, err := CreateTracerExporter(params)
43+
if err != nil {
44+
log.Warn("Init tracer faield", zap.Error(err))
45+
return err
46+
}
47+
48+
SetTracerProvider(exp, params.TraceCfg.SampleFraction.GetAsFloat())
49+
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
50+
log.Info("Init tracer finished", zap.String("Exporter", params.TraceCfg.Exporter.GetValue()))
51+
return nil
52+
}
53+
54+
func CloseTracerProvider(ctx context.Context) error {
55+
provider, ok := otel.GetTracerProvider().(*sdk.TracerProvider)
56+
if ok {
57+
err := provider.Shutdown(ctx)
58+
if err != nil {
59+
return err
60+
}
61+
}
62+
return nil
63+
}
64+
65+
func SetTracerProvider(exp sdk.SpanExporter, traceIDRatio float64) {
66+
if exp == nil {
67+
otel.SetTracerProvider(trace.NewNoopTracerProvider())
68+
return
69+
}
70+
71+
tp := sdk.NewTracerProvider(
72+
sdk.WithBatcher(exp),
73+
sdk.WithResource(resource.NewWithAttributes(
74+
semconv.SchemaURL,
75+
semconv.ServiceNameKey.String(paramtable.GetRole()),
76+
attribute.Int64("NodeID", paramtable.GetNodeID()),
77+
)),
78+
sdk.WithSampler(sdk.ParentBased(
79+
sdk.TraceIDRatioBased(traceIDRatio),
80+
)),
81+
)
82+
otel.SetTracerProvider(tp)
83+
}
84+
85+
func CreateTracerExporter(params *paramtable.ComponentParam) (sdk.SpanExporter, error) {
4186
var exp sdk.SpanExporter
4287
var err error
88+
4389
switch params.TraceCfg.Exporter.GetValue() {
4490
case "jaeger":
4591
exp, err = jaeger.New(jaeger.WithCollectorEndpoint(
@@ -55,25 +101,11 @@ func Init() {
55101
exp, err = otlptracegrpc.New(context.Background(), opts...)
56102
case "stdout":
57103
exp, err = stdout.New()
104+
case "noop":
105+
return nil, nil
58106
default:
59107
err = errors.New("Empty Trace")
60108
}
61-
if err != nil {
62-
log.Warn("Init tracer faield", zap.Error(err))
63-
return
64-
}
65-
tp := sdk.NewTracerProvider(
66-
sdk.WithBatcher(exp),
67-
sdk.WithResource(resource.NewWithAttributes(
68-
semconv.SchemaURL,
69-
semconv.ServiceNameKey.String(paramtable.GetRole()),
70-
attribute.Int64("NodeID", paramtable.GetNodeID()),
71-
)),
72-
sdk.WithSampler(sdk.ParentBased(
73-
sdk.TraceIDRatioBased(params.TraceCfg.SampleFraction.GetAsFloat()),
74-
)),
75-
)
76-
otel.SetTracerProvider(tp)
77-
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
78-
log.Info("Init tracer finished", zap.String("Exporter", params.TraceCfg.Exporter.GetValue()))
109+
110+
return exp, err
79111
}

pkg/tracer/tracer_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Licensed to the LF AI & Data foundation under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
package tracer
18+
19+
import (
20+
"context"
21+
"testing"
22+
23+
"github.com/stretchr/testify/assert"
24+
25+
"github.com/milvus-io/milvus/pkg/util/paramtable"
26+
)
27+
28+
func TestTracer_Init(t *testing.T) {
29+
paramtable.Init()
30+
paramtable.Get().Save(paramtable.Get().TraceCfg.Exporter.Key, "Unknown")
31+
// init failed with unknown exporter
32+
err := Init()
33+
assert.Error(t, err)
34+
35+
paramtable.Get().Save(paramtable.Get().TraceCfg.Exporter.Key, "stdout")
36+
// init with stdout exporter
37+
err = Init()
38+
assert.NoError(t, err)
39+
40+
paramtable.Get().Save(paramtable.Get().TraceCfg.Exporter.Key, "noop")
41+
// init with noop exporter
42+
err = Init()
43+
assert.NoError(t, err)
44+
}
45+
46+
func TestTracer_CloseProviderFailed(t *testing.T) {
47+
paramtable.Init()
48+
paramtable.Get().Save(paramtable.Get().TraceCfg.Exporter.Key, "stdout")
49+
// init with stdout exporter
50+
err := Init()
51+
assert.NoError(t, err)
52+
53+
ctx, cancel := context.WithCancel(context.Background())
54+
cancel()
55+
56+
err = CloseTracerProvider(ctx)
57+
assert.Error(t, err)
58+
}

pkg/util/paramtable/component_param.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -770,8 +770,8 @@ func (t *traceConfig) init(base *BaseTable) {
770770
Key: "trace.exporter",
771771
Version: "2.3.0",
772772
Doc: `trace exporter type, default is stdout,
773-
optional values: ['stdout', 'jaeger', 'otlp']`,
774-
DefaultValue: "stdout",
773+
optional values: ['noop','stdout', 'jaeger', 'otlp']`,
774+
DefaultValue: "noop",
775775
Export: true,
776776
}
777777
t.Exporter.Init(base.mgr)

scripts/run_go_unittest.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ go test -race -cover -tags dynamic "${MILVUS_DIR}/util/sessionutil/..." -failfas
106106
go test -race -cover -tags dynamic "${MILVUS_DIR}/util/typeutil/..." -failfast -count=1 -ldflags="-r ${RPATH}"
107107
go test -race -cover -tags dynamic "${MILVUS_DIR}/util/importutilv2/..." -failfast -count=1 -ldflags="-r ${RPATH}"
108108
go test -race -cover -tags dynamic "${MILVUS_DIR}/util/proxyutil/..." -failfast -count=1 -ldflags="-r ${RPATH}"
109+
go test -race -cover -tags dynamic "${MILVUS_DIR}/util/initcore/..." -failfast -count=1 -ldflags="-r ${RPATH}"
109110
}
110111

111112
function test_pkg()

0 commit comments

Comments
 (0)