Skip to content

Commit

Permalink
Set the exit span before sending the payload to the target. (#45)
Browse files Browse the repository at this point in the history
Signed-off-by: laminar <[email protected]>
  • Loading branch information
tpiperatgod authored Mar 23, 2022
1 parent 465c554 commit 5e32151
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 48 deletions.
48 changes: 46 additions & 2 deletions context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import (
"sync"
"time"

"github.com/SkyAPM/go2sky"
cloudevents "github.com/cloudevents/sdk-go/v2"
dapr "github.com/dapr/go-sdk/client"
"github.com/dapr/go-sdk/service/common"
"k8s.io/klog/v2"

dapr "github.com/dapr/go-sdk/client"
agentv3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
)

var (
Expand Down Expand Up @@ -54,6 +55,7 @@ const (
SelfHostMode = "self-host"
TestModeOn = "on"
innerEventTypePrefix = "io.openfunction.function"
tracingProviderSkywalking = "skywalking"
)

type Runtime string
Expand Down Expand Up @@ -154,6 +156,9 @@ type RuntimeContext interface {

// GetPluginsTracingCfg returns the TracingConfig interface.
GetPluginsTracingCfg() TracingConfig

// HasPluginsTracingCfg returns nil if there is no TracingConfig.
HasPluginsTracingCfg() bool
}

type Context interface {
Expand Down Expand Up @@ -361,6 +366,12 @@ func (ctx *FunctionContext) Send(outputName string, data []byte) ([]byte, error)
ie := NewInnerEvent(ctx)
ie.MergeMetadata(ctx.GetInnerEvent())
ie.SetUserData(data)

// Set the exit span for tracing
if err := setExitSpan(ctx, ie, outputName); err != nil {
klog.Warningf("failed to set exit span: %v", err)
}

payload = ie.GetCloudEventJSON()
}

Expand Down Expand Up @@ -573,6 +584,10 @@ func (ctx *FunctionContext) GetPluginsTracingCfg() TracingConfig {
return ctx.PluginsTracing
}

func (ctx *FunctionContext) HasPluginsTracingCfg() bool {
return ctx.PluginsTracing != nil
}

func (ctx *FunctionContext) WithOut(out *FunctionOut) RuntimeContext {
ctx.mu.Lock()
defer ctx.mu.Unlock()
Expand Down Expand Up @@ -824,6 +839,35 @@ func getBuildingBlockType(componentType string) (ResourceType, error) {
return "", errors.New("invalid component type")
}

func setExitSpan(ctx *FunctionContext, innerEvent InnerEvent, target string) error {
if !ctx.HasPluginsTracingCfg() || !ctx.GetPluginsTracingCfg().IsEnabled() {
return nil
}

switch ctx.GetPluginsTracingCfg().ProviderName() {
case tracingProviderSkywalking:
tracer := go2sky.GetGlobalTracer()
if tracer == nil {
return errors.New("skywalking is not enabled")
}

span, err := tracer.CreateExitSpan(ctx.GetNativeContext(), ctx.GetName(), target, func(headerKey, headerValue string) error {
innerEvent.SetMetadata(headerKey, headerValue)
return nil
})
if err != nil {
return err
}
defer span.End()

span.SetSpanLayer(agentv3.SpanLayer_FAAS)
span.SetComponent(5013)
return nil
default:
return nil
}
}

func ConvertUserDataToBytes(data interface{}) []byte {
if d, ok := data.([]byte); ok {
return d
Expand Down
2 changes: 1 addition & 1 deletion plugin/skywalking/test/binding-event/expected.data.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ segmentItems:
- segmentId: {{ notEmpty .segmentId }}
spans:
{{- contains .spans }}
- operationName: sample-topic
- operationName: provider
parentSpanId: 0
spanId: 1
spanLayer: FAAS
Expand Down
25 changes: 3 additions & 22 deletions plugin/skywalking/test/binding-event/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,17 @@ import (
"context"
"time"

"k8s.io/klog/v2"

ofctx "github.com/OpenFunction/functions-framework-go/context"
"github.com/OpenFunction/functions-framework-go/framework"
"github.com/OpenFunction/functions-framework-go/plugin"
"github.com/OpenFunction/functions-framework-go/plugin/skywalking"
"github.com/SkyAPM/go2sky"
"k8s.io/klog/v2"
agentv3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
)

func bindingsFunction(ofCtx ofctx.Context, in []byte) (ofctx.Out, error) {
tracer := go2sky.GetGlobalTracer()
if tracer == nil {
klog.Warning("go2sky is not enabled")
return ofCtx.ReturnOnInternalError().WithData([]byte("go2sky is not enabled")), nil
}

span, err := tracer.CreateExitSpan(ofCtx.GetNativeContext(), "sample-topic", "sample-topic", func(headerKey, headerValue string) error {
ofCtx.GetInnerEvent().SetMetadata(headerKey, headerValue)
return nil
})
if err != nil {
klog.Error(err)
return ofCtx.ReturnOnInternalError().WithData([]byte(err.Error())), err
}
defer span.End()

span.SetSpanLayer(agentv3.SpanLayer_FAAS)
span.SetComponent(5013)

_, err = ofCtx.Send("sample-topic", []byte(time.Now().String()))
_, err := ofCtx.Send("sample-topic", []byte(time.Now().String()))
if err != nil {
klog.Error(err)
return ofCtx.ReturnOnInternalError().WithData([]byte(err.Error())), err
Expand Down
2 changes: 1 addition & 1 deletion plugin/skywalking/test/topic-event/expected.data.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ segmentItems:
- segmentId: {{ notEmpty .segmentId }}
spans:
{{- contains .spans }}
- operationName: publish-topic
- operationName: publish
parentSpanId: 0
spanId: 1
spanLayer: FAAS
Expand Down
25 changes: 3 additions & 22 deletions plugin/skywalking/test/topic-event/publish/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,18 @@ import (
"context"
"time"

"k8s.io/klog/v2"

ofctx "github.com/OpenFunction/functions-framework-go/context"
"github.com/OpenFunction/functions-framework-go/framework"
"github.com/OpenFunction/functions-framework-go/plugin"
"github.com/OpenFunction/functions-framework-go/plugin/skywalking"
"github.com/SkyAPM/go2sky"
"k8s.io/klog/v2"
agentv3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
)

func pubsubFunction(ofCtx ofctx.Context, in []byte) (ofctx.Out, error) {
tracer := go2sky.GetGlobalTracer()
if tracer == nil {
klog.Warningf("go2sky is not enabled")
return ofCtx.ReturnOnInternalError().WithData([]byte("go2sky is not enabled")), nil
}

span, err := tracer.CreateExitSpan(ofCtx.GetNativeContext(), "publish-topic", "publish-topic", func(headerKey, headerValue string) error {
ofCtx.GetInnerEvent().SetMetadata(headerKey, headerValue)
return nil
})
if err != nil {
klog.Error(err)
return ofCtx.ReturnOnInternalError().WithData([]byte(err.Error())), err
}
defer span.End()

span.SetSpanLayer(agentv3.SpanLayer_FAAS)
span.SetComponent(5013)

// topic
_, err = ofCtx.Send("publish-topic", []byte(time.Now().String()))
_, err := ofCtx.Send("publish-topic", []byte(time.Now().String()))

if err != nil {
klog.Error(err)
Expand Down

0 comments on commit 5e32151

Please sign in to comment.