Skip to content

Commit

Permalink
Improve resiliency and add more logging (#15)
Browse files Browse the repository at this point in the history
* Improve resiliency

* Add some more logging in http executor

* Make the connection retries non-blocking

* Add more logging

* Add more configurable retrier

* Improve retrier
  • Loading branch information
radito3 authored May 13, 2024
1 parent daa200c commit 76b74c2
Show file tree
Hide file tree
Showing 21 changed files with 252 additions and 63 deletions.
21 changes: 7 additions & 14 deletions cmd/remote-work-processor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/SAP/remote-work-processor/internal/kubernetes/controller"
meta "github.com/SAP/remote-work-processor/internal/kubernetes/metadata"
"github.com/SAP/remote-work-processor/internal/opt"
"github.com/SAP/remote-work-processor/internal/utils"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -81,23 +82,23 @@ func main() {

connAttemptChan := make(chan struct{}, 1)
connAttemptChan <- struct{}{}
var connAttempts uint = 0
retryConfig := utils.CreateRetryConfig(opts.RetryInterval, opts.RetryStrategy.Unmarshall(), opts.MaxConnRetries, connAttemptChan)

Loop:
for connAttempts < opts.MaxConnRetries {
for retryConfig.CanRetry() {
select {
case <-rootCtx.Done():
log.Println("Received cancellation signal. Stopping Remote Work Processor...")
break Loop
case <-connAttemptChan:
err := grpcClient.InitSession(rootCtx, rwpMetadata.SessionID())
if err != nil {
signalRetry(&connAttempts, connAttemptChan, err)
utils.Retry(rootCtx, retryConfig, err)
}
default:
operation, err := grpcClient.ReceiveMsg()
if err != nil {
signalRetry(&connAttempts, connAttemptChan, err)
utils.Retry(rootCtx, retryConfig, err)
continue
}
if operation == nil {
Expand All @@ -116,15 +117,15 @@ Loop:

msg, err := processor.Process(rootCtx)
if err != nil {
signalRetry(&connAttempts, connAttemptChan, fmt.Errorf("error processing operation: %v", err))
utils.Retry(rootCtx, retryConfig, fmt.Errorf("error processing operation: %v", err))
continue
}
if msg == nil {
continue
}

if err = grpcClient.Send(msg); err != nil {
signalRetry(&connAttempts, connAttemptChan, err)
utils.Retry(rootCtx, retryConfig, err)
}
}
}
Expand Down Expand Up @@ -154,11 +155,3 @@ func getKubeConfig() *rest.Config {
}
return config
}

func signalRetry(attempts *uint, retryChan chan<- struct{}, err error) {
if err != nil {
log.Println(err)
}
retryChan <- struct{}{}
*attempts++
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/SAP/remote-work-processor
go 1.20

require (
github.com/google/uuid v1.3.0
github.com/itchyny/gojq v0.12.12
google.golang.org/grpc v1.58.3
google.golang.org/protobuf v1.33.0
Expand All @@ -29,7 +30,6 @@ require (
github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/itchyny/timefmt-go v0.1.5 // indirect
github.com/josharian/intern v1.0.0 // indirect
Expand Down
14 changes: 8 additions & 6 deletions internal/executors/executor_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ type ExecutorResult struct {
type ExecutorResultOption func(*ExecutorResult)

func NewExecutorResult(opts ...ExecutorResultOption) *ExecutorResult {
r := &ExecutorResult{}
r := &ExecutorResult{
Output: make(map[string]string),
}

for _, opt := range opts {
opt(r)
Expand All @@ -22,7 +24,9 @@ func NewExecutorResult(opts ...ExecutorResultOption) *ExecutorResult {

func Output(m map[string]string) ExecutorResultOption {
return func(er *ExecutorResult) {
er.Output = m
for key, val := range m {
er.Output[key] = val
}
}
}

Expand All @@ -34,11 +38,9 @@ func Status(s pb.TaskExecutionResponseMessage_TaskState) ExecutorResultOption {

func Error(err error) ExecutorResultOption {
return func(er *ExecutorResult) {
if err == nil {
return
if err != nil {
er.Error = err.Error()
}

er.Error = err.Error()
}
}

Expand Down
19 changes: 15 additions & 4 deletions internal/executors/factory/executor_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,24 @@ import (
"github.com/SAP/remote-work-processor/internal/executors/void"
)

func CreateExecutor(t pb.TaskType) (executors.Executor, error) {
type errorExecutor func() error

func CreateExecutor(t pb.TaskType) executors.Executor {
switch t {
case pb.TaskType_TASK_TYPE_VOID:
return void.VoidExecutor{}, nil
return void.VoidExecutor{}
case pb.TaskType_TASK_TYPE_HTTP:
return http.NewDefaultHttpRequestExecutor(), nil
return http.NewDefaultHttpRequestExecutor()
default:
return nil, fmt.Errorf("cannot create executor of type %q", t)
return errorExecutor(func() error {
return fmt.Errorf("cannot create executor of type %q: unsupported task type", t)
})
}
}

func (exec errorExecutor) Execute(_ executors.Context) *executors.ExecutorResult {
return executors.NewExecutorResult(
executors.Status(pb.TaskExecutionResponseMessage_TASK_STATE_FAILED_NON_CHARGEABLE),
executors.Error(exec()),
)
}
7 changes: 7 additions & 0 deletions internal/executors/http/authorization_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package http

import (
"github.com/SAP/remote-work-processor/internal/executors"
"log"
"regexp"
)

Expand All @@ -16,6 +17,7 @@ var iasTokenUrlRegex = regexp.MustCompile(IasTokenUrlPattern)
// OAuth 2.0 will be added later

func CreateAuthorizationHeader(params *HttpRequestParameters) (string, error) {
log.Println("HTTP Client: creating authorization header...")
authHeader := params.GetAuthorizationHeader()

if authHeader != "" {
Expand All @@ -28,19 +30,24 @@ func CreateAuthorizationHeader(params *HttpRequestParameters) (string, error) {

if tokenUrl != "" {
if user != "" && iasTokenUrlRegex.Match([]byte(tokenUrl)) {
log.Println("HTTP Client: using IAS Authorization Header...")
return NewIasAuthorizationHeader(tokenUrl, user, params.GetCertificateAuthentication().GetClientCertificate()).Generate()
}
log.Println("HTTP Client: using OAuth Authorization Header...")
return NewOAuthHeaderGenerator(params).GenerateWithCacheAside()
}

if user != "" {
log.Println("HTTP Client: using basic auth...")
return NewBasicAuthorizationHeader(user, pass).Generate()
}

if noAuthorizationRequired(params) {
log.Println("HTTP Client: not using authorization...")
return "", nil
}

log.Println("HTTP Client: failed to determine auth header...")
return "", executors.NewNonRetryableError("Input values for the authentication-related keys " +
"(user, password & authorizationHeader) are not combined properly.")
}
Expand Down
2 changes: 2 additions & 0 deletions internal/executors/http/basic_authorization_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package http
import (
"encoding/base64"
"fmt"
"log"
)

type basicAuthorizationHeader struct {
Expand All @@ -18,6 +19,7 @@ func NewBasicAuthorizationHeader(u string, p string) AuthorizationHeaderGenerato
}

func (h *basicAuthorizationHeader) Generate() (string, error) {
log.Println("Basic Authorization Header: generating auth header...")
encoded := base64.StdEncoding.EncodeToString(
fmt.Appendf(nil, "%s:%s", h.username, h.password),
)
Expand Down
5 changes: 5 additions & 0 deletions internal/executors/http/csrf_token_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package http
import (
"fmt"
"github.com/SAP/remote-work-processor/internal/utils"
"log"
"net/http"
)

Expand All @@ -27,10 +28,12 @@ func NewCsrfTokenFetcher(p *HttpRequestParameters, authHeader string) TokenFetch
}

func (f *csrfTokenFetcher) Fetch() (string, error) {
log.Println("CSRF token fetcher: fetching new CSRF token from:", f.csrfUrl)
params, _ := f.createRequestParameters()

resp, err := f.HttpExecutor.ExecuteWithParameters(params)
if err != nil {
log.Println("CSRF token fetcher: failed to fetch CSRF token:", err)
return "", err
}

Expand All @@ -39,6 +42,8 @@ func (f *csrfTokenFetcher) Fetch() (string, error) {
return value, nil
}
}

log.Println("CSRF token fetcher: CSRF token header not found in response")
return "", fmt.Errorf("no csrf header present in response from %s", f.csrfUrl)
}

Expand Down
4 changes: 4 additions & 0 deletions internal/executors/http/http_client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package http

import (
"log"
"net/http"
"time"

Expand All @@ -12,10 +13,12 @@ const (
)

func CreateHttpClient(timeoutInS uint64, certAuth *tls.CertificateAuthentication) (*http.Client, error) {
log.Println("HTTP Client: creating HTTP Client...")
var tp http.RoundTripper
if certAuth != nil {
var err error

log.Println("HTTP Client: creating TLS transport...")
tp, err = tls.NewTLSConfigurationProvider(certAuth).CreateTransport()
if err != nil {
return nil, err
Expand All @@ -32,6 +35,7 @@ func CreateHttpClient(timeoutInS uint64, certAuth *tls.CertificateAuthentication
} else {
c.Timeout = time.Duration(timeoutInS) * time.Second
}
log.Println("HTTP Client: using timeout:", c.Timeout.String())

return c, nil
}
Expand Down
21 changes: 18 additions & 3 deletions internal/executors/http/http_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func (e *HttpRequestExecutor) Execute(ctx executors.Context) *executors.Executor
log.Println("Executing HttpRequest command...")
params, err := NewHttpRequestParametersFromContext(ctx)
if err != nil {
log.Println("Could not create HTTP request params: returning Task state Failed Non-Retryable Error with:", err)
return executors.NewExecutorResult(
executors.Status(pb.TaskExecutionResponseMessage_TASK_STATE_FAILED_NON_RETRYABLE),
executors.Error(err),
Expand All @@ -42,25 +43,29 @@ func (e *HttpRequestExecutor) Execute(ctx executors.Context) *executors.Executor

switch typedErr := err.(type) {
case *executors.RetryableError:
log.Println("Returning Task state Failed Retryable Error...")
return executors.NewExecutorResult(
executors.Status(pb.TaskExecutionResponseMessage_TASK_STATE_FAILED_RETRYABLE),
executors.Error(typedErr),
)
case *executors.NonRetryableError:
log.Println("Returning Task state Failed Non-Retryable Error...")
return executors.NewExecutorResult(
executors.Status(pb.TaskExecutionResponseMessage_TASK_STATE_FAILED_NON_RETRYABLE),
executors.Error(typedErr),
)
default:
m := resp.ToMap()
if !resp.successful {
log.Println("Returning Task state Failed Retryable Error from HTTP response...")
return executors.NewExecutorResult(
executors.Output(m),
executors.Status(pb.TaskExecutionResponseMessage_TASK_STATE_FAILED_RETRYABLE),
executors.ErrorString(buildHttpError(resp)),
)
}

log.Println("Returning Task state Completed...")
return executors.NewExecutorResult(
executors.Output(m),
executors.Status(pb.TaskExecutionResponseMessage_TASK_STATE_COMPLETED),
Expand Down Expand Up @@ -104,26 +109,34 @@ func execute(c *http.Client, p *HttpRequestParameters, authHeader string) (*Http
return nil, executors.NewNonRetryableError("could not create http request: %v", err).WithCause(err)
}

log.Printf("Executing request %s %s...\n", p.method, p.url)
log.Printf("HTTP Client: executing request %s %s...\n", p.method, p.url)
resp, err := c.Do(req)
if requestTimedOut(err) {
log.Println("HTTP Client: request timed out after", p.timeout, "seconds")
if p.succeedOnTimeout {
log.Println("HTTP Client: SucceedOnTimeout has been configured. Returning successful response...")
return newTimedOutHttpResponse(req, resp)
}

return nil, executors.NewRetryableError("HTTP request timed out after %d seconds", p.timeout).WithCause(err)
}

if err != nil {
return nil, executors.NewNonRetryableError("Error occurred while trying to execute actual HTTP request: %v", err).WithCause(err)
log.Println("HTTP Client: error occurred while executing request:", err)
return nil, executors.NewNonRetryableError("Error occurred while executing HTTP request: %v", err).WithCause(err)
}
defer resp.Body.Close()

log.Println("HTTP Client: received response:", resp.Status)

log.Println("HTTP Client: reading response body...")
body, err := io.ReadAll(resp.Body)
if err != nil {
log.Println("HTTP Client: error reading response body:", err)
return nil, executors.NewNonRetryableError("Error occurred while trying to read HTTP response body: %v", err).WithCause(err)
}

log.Println("HTTP Client: building response object...")
r, err := NewHttpResponse(
Url(req.URL.String()),
Method(req.Method),
Expand All @@ -135,9 +148,9 @@ func execute(c *http.Client, p *HttpRequestParameters, authHeader string) (*Http
Time(<-timeCh),
)
if err != nil {
log.Println("HTTP Client: could not build response object:", err)
return nil, executors.NewNonRetryableError("Error occurred while trying to build HTTP response: %v", err).WithCause(err)
}

return r, nil
}

Expand All @@ -147,10 +160,12 @@ func requestTimedOut(err error) bool {
}

func createRequest(method string, url string, headers map[string]string, body, authHeader string) (*http.Request, <-chan int64, error) {
log.Println("HTTP Client: creating request:", method, url)
timeCh := make(chan int64, 1)

req, err := http.NewRequest(method, url, strings.NewReader(body))
if err != nil {
log.Println("HTTP Client: error creating request:", err)
return nil, nil, err
}
addHeaders(req, headers, authHeader)
Expand Down
4 changes: 4 additions & 0 deletions internal/executors/http/ias_authorization_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package http
import (
"fmt"
"github.com/SAP/remote-work-processor/internal/utils"
"log"
)

const PASSCODE string = "passcode"
Expand All @@ -27,13 +28,16 @@ func (h *iasAuthorizationHeader) Generate() (string, error) {

parsed := make(map[string]any)
if err = utils.FromJson(raw, &parsed); err != nil {
log.Println("IAS authorization header: failed to parse IAS token response:", err)
return "", fmt.Errorf("failed to parse IAS token response: %v", err)
}

pass, prs := parsed[PASSCODE]
if !prs {
log.Println("IAS authorization header: passcode does not exist in the HTTP response")
return "", fmt.Errorf("passcode does not exist in the HTTP response")
}

log.Println("IAS authorization header: using basic auth with passcode...")
return NewBasicAuthorizationHeader(h.user, pass.(string)).Generate()
}
Loading

0 comments on commit 76b74c2

Please sign in to comment.