diff --git a/deploy/local/docker-compose/xatu-server.yaml b/deploy/local/docker-compose/xatu-server.yaml index 7894d835..331f901f 100644 --- a/deploy/local/docker-compose/xatu-server.yaml +++ b/deploy/local/docker-compose/xatu-server.yaml @@ -72,6 +72,7 @@ services: outputs: - name: general type: http + shippingMethod: async config: address: http://xatu-vector-http-kafka:9005 maxQueueSize: 102400 @@ -80,4 +81,5 @@ services: maxExportBatchSize: 64 compression: zstd keepAlive: true - workers: 50 \ No newline at end of file + workers: 50 + \ No newline at end of file diff --git a/pkg/cannon/config.go b/pkg/cannon/config.go index 8382c101..bfaeb214 100644 --- a/pkg/cannon/config.go +++ b/pkg/cannon/config.go @@ -77,12 +77,18 @@ func (c *Config) CreateSinks(log logrus.FieldLogger) ([]output.Sink, error) { sinks := make([]output.Sink, len(c.Outputs)) for i, out := range c.Outputs { + if out.ShippingMethod == nil { + shippingMethod := processor.ShippingMethodSync + + out.ShippingMethod = &shippingMethod + } + sink, err := output.NewSink(out.Name, out.SinkType, out.Config, log, out.FilterConfig, - processor.ShippingMethodSync, + *out.ShippingMethod, ) if err != nil { return nil, err diff --git a/pkg/clmimicry/config.go b/pkg/clmimicry/config.go index b588959f..66c0a058 100644 --- a/pkg/clmimicry/config.go +++ b/pkg/clmimicry/config.go @@ -67,13 +67,18 @@ func (c *Config) CreateSinks(log logrus.FieldLogger) ([]output.Sink, error) { sinks := make([]output.Sink, len(c.Outputs)) for i, out := range c.Outputs { + if out.ShippingMethod == nil { + shippingMethod := processor.ShippingMethodAsync + out.ShippingMethod = &shippingMethod + } + sink, err := output.NewSink( out.Name, out.SinkType, out.Config, log, out.FilterConfig, - processor.ShippingMethodAsync, + *out.ShippingMethod, ) if err != nil { return nil, err diff --git a/pkg/mimicry/config.go b/pkg/mimicry/config.go index b0eade13..8279fdc0 100644 --- a/pkg/mimicry/config.go +++ b/pkg/mimicry/config.go @@ -58,13 +58,18 @@ func (c *Config) CreateSinks(log logrus.FieldLogger) ([]output.Sink, error) { sinks := make([]output.Sink, len(c.Outputs)) for i, out := range c.Outputs { + if out.ShippingMethod == nil { + shippingMethod := processor.ShippingMethodAsync + out.ShippingMethod = &shippingMethod + } + sink, err := output.NewSink( out.Name, out.SinkType, out.Config, log, out.FilterConfig, - processor.ShippingMethodAsync, + *out.ShippingMethod, ) if err != nil { return nil, err diff --git a/pkg/output/config.go b/pkg/output/config.go index 246ac488..ebe8266b 100644 --- a/pkg/output/config.go +++ b/pkg/output/config.go @@ -21,6 +21,8 @@ type Config struct { Config *RawMessage `yaml:"config"` FilterConfig pxatu.EventFilterConfig `yaml:"filter"` + + ShippingMethod *processor.ShippingMethod `yaml:"shippingMethod"` } func (c *Config) Validate() error { diff --git a/pkg/relaymonitor/config.go b/pkg/relaymonitor/config.go index 2e40b613..0c999c9f 100644 --- a/pkg/relaymonitor/config.go +++ b/pkg/relaymonitor/config.go @@ -85,13 +85,18 @@ func (c *Config) CreateSinks(log logrus.FieldLogger) ([]output.Sink, error) { sinks := make([]output.Sink, len(c.Outputs)) for i, out := range c.Outputs { + if out.ShippingMethod == nil { + shippingMethod := processor.ShippingMethodAsync + out.ShippingMethod = &shippingMethod + } + sink, err := output.NewSink( out.Name, out.SinkType, out.Config, log, out.FilterConfig, - processor.ShippingMethodAsync, + *out.ShippingMethod, ) if err != nil { return nil, err diff --git a/pkg/sentry/config.go b/pkg/sentry/config.go index c357536c..ce0f57dd 100644 --- a/pkg/sentry/config.go +++ b/pkg/sentry/config.go @@ -78,12 +78,17 @@ func (c *Config) CreateSinks(log logrus.FieldLogger) ([]output.Sink, error) { sinks := make([]output.Sink, len(c.Outputs)) for i, out := range c.Outputs { + if out.ShippingMethod == nil { + shippingMethod := processor.ShippingMethodAsync + out.ShippingMethod = &shippingMethod + } + sink, err := output.NewSink(out.Name, out.SinkType, out.Config, log, out.FilterConfig, - processor.ShippingMethodAsync, + *out.ShippingMethod, ) if err != nil { return nil, err diff --git a/pkg/server/service/event-ingester/ingester.go b/pkg/server/service/event-ingester/ingester.go index 9e002e81..e85e0cb6 100644 --- a/pkg/server/service/event-ingester/ingester.go +++ b/pkg/server/service/event-ingester/ingester.go @@ -138,12 +138,17 @@ func (e *Ingester) CreateSinks() ([]output.Sink, error) { sinks := make([]output.Sink, len(e.config.Outputs)) for i, out := range e.config.Outputs { + if out.ShippingMethod == nil { + shippingMethod := processor.ShippingMethodSync + out.ShippingMethod = &shippingMethod + } + sink, err := output.NewSink(out.Name, out.SinkType, out.Config, e.log, out.FilterConfig, - processor.ShippingMethodSync, + *out.ShippingMethod, ) if err != nil { return nil, err