Skip to content

Commit

Permalink
Feat/dynamic shipping (#385)
Browse files Browse the repository at this point in the history
* fix(sentry): Override beacon url before validating config

* fix: Set default shipping method if not specified

* style: Update shipping method variable value
  • Loading branch information
samcm authored Oct 3, 2024
1 parent 547f06c commit c24535c
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 7 deletions.
4 changes: 3 additions & 1 deletion deploy/local/docker-compose/xatu-server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ services:
outputs:
- name: general
type: http
shippingMethod: async
config:
address: http://xatu-vector-http-kafka:9005
maxQueueSize: 102400
Expand All @@ -80,4 +81,5 @@ services:
maxExportBatchSize: 64
compression: zstd
keepAlive: true
workers: 50
workers: 50

8 changes: 7 additions & 1 deletion pkg/cannon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,18 @@ func (c *Config) CreateSinks(log logrus.FieldLogger) ([]output.Sink, error) {
sinks := make([]output.Sink, len(c.Outputs))

for i, out := range c.Outputs {
if out.ShippingMethod == nil {
shippingMethod := processor.ShippingMethodSync

out.ShippingMethod = &shippingMethod
}

sink, err := output.NewSink(out.Name,
out.SinkType,
out.Config,
log,
out.FilterConfig,
processor.ShippingMethodSync,
*out.ShippingMethod,
)
if err != nil {
return nil, err
Expand Down
7 changes: 6 additions & 1 deletion pkg/clmimicry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,18 @@ func (c *Config) CreateSinks(log logrus.FieldLogger) ([]output.Sink, error) {
sinks := make([]output.Sink, len(c.Outputs))

for i, out := range c.Outputs {
if out.ShippingMethod == nil {
shippingMethod := processor.ShippingMethodAsync
out.ShippingMethod = &shippingMethod
}

sink, err := output.NewSink(
out.Name,
out.SinkType,
out.Config,
log,
out.FilterConfig,
processor.ShippingMethodAsync,
*out.ShippingMethod,
)
if err != nil {
return nil, err
Expand Down
7 changes: 6 additions & 1 deletion pkg/mimicry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,18 @@ func (c *Config) CreateSinks(log logrus.FieldLogger) ([]output.Sink, error) {
sinks := make([]output.Sink, len(c.Outputs))

for i, out := range c.Outputs {
if out.ShippingMethod == nil {
shippingMethod := processor.ShippingMethodAsync
out.ShippingMethod = &shippingMethod
}

sink, err := output.NewSink(
out.Name,
out.SinkType,
out.Config,
log,
out.FilterConfig,
processor.ShippingMethodAsync,
*out.ShippingMethod,
)
if err != nil {
return nil, err
Expand Down
2 changes: 2 additions & 0 deletions pkg/output/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type Config struct {
Config *RawMessage `yaml:"config"`

FilterConfig pxatu.EventFilterConfig `yaml:"filter"`

ShippingMethod *processor.ShippingMethod `yaml:"shippingMethod"`
}

func (c *Config) Validate() error {
Expand Down
7 changes: 6 additions & 1 deletion pkg/relaymonitor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,18 @@ func (c *Config) CreateSinks(log logrus.FieldLogger) ([]output.Sink, error) {
sinks := make([]output.Sink, len(c.Outputs))

for i, out := range c.Outputs {
if out.ShippingMethod == nil {
shippingMethod := processor.ShippingMethodAsync
out.ShippingMethod = &shippingMethod
}

sink, err := output.NewSink(
out.Name,
out.SinkType,
out.Config,
log,
out.FilterConfig,
processor.ShippingMethodAsync,
*out.ShippingMethod,
)
if err != nil {
return nil, err
Expand Down
7 changes: 6 additions & 1 deletion pkg/sentry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,17 @@ func (c *Config) CreateSinks(log logrus.FieldLogger) ([]output.Sink, error) {
sinks := make([]output.Sink, len(c.Outputs))

for i, out := range c.Outputs {
if out.ShippingMethod == nil {
shippingMethod := processor.ShippingMethodAsync
out.ShippingMethod = &shippingMethod
}

sink, err := output.NewSink(out.Name,
out.SinkType,
out.Config,
log,
out.FilterConfig,
processor.ShippingMethodAsync,
*out.ShippingMethod,
)
if err != nil {
return nil, err
Expand Down
7 changes: 6 additions & 1 deletion pkg/server/service/event-ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,17 @@ func (e *Ingester) CreateSinks() ([]output.Sink, error) {
sinks := make([]output.Sink, len(e.config.Outputs))

for i, out := range e.config.Outputs {
if out.ShippingMethod == nil {
shippingMethod := processor.ShippingMethodSync
out.ShippingMethod = &shippingMethod
}

sink, err := output.NewSink(out.Name,
out.SinkType,
out.Config,
e.log,
out.FilterConfig,
processor.ShippingMethodSync,
*out.ShippingMethod,
)
if err != nil {
return nil, err
Expand Down

0 comments on commit c24535c

Please sign in to comment.