Skip to content

Commit

Permalink
Merge pull request #58 from ing-bank/feature/add-request-id-to-log
Browse files Browse the repository at this point in the history
add a request id to logs
  • Loading branch information
nielsdenissen committed Mar 18, 2019
2 parents 4bdcdb5 + 6a0e28e commit 163316f
Show file tree
Hide file tree
Showing 28 changed files with 183 additions and 103 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import com.typesafe.sbt.packager.docker.ExecCmd
import scalariform.formatter.preferences._

name := "airlock"
version := "0.1.18"
version := "0.1.19"

scalaVersion := "2.12.8"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import com.amazonaws.services.s3.model.{GroupGrantee, Permission}
import com.amazonaws.services.securitytoken.AWSSecurityTokenService
import com.amazonaws.services.securitytoken.model.GetSessionTokenRequest
import com.ing.wbaa.airlock.proxy.config._
import com.ing.wbaa.airlock.proxy.data.{S3Request, User}
import com.ing.wbaa.airlock.proxy.data.{RequestId, S3Request, User}
import com.ing.wbaa.airlock.proxy.handler.{FilterRecursiveListBucketHandler, RequestHandlerS3}
import com.ing.wbaa.airlock.proxy.provider._
import com.ing.wbaa.airlock.proxy.provider.aws.S3Client
Expand Down Expand Up @@ -82,7 +82,7 @@ class AirlockS3ProxyItTest extends AsyncWordSpec with DiagrammedAssertions

override protected def rangerSettings: RangerSettings = RangerSettings(testSystem)

override def isUserAuthorizedForRequest(request: S3Request, user: User): Boolean = {
override def isUserAuthorizedForRequest(request: S3Request, user: User)(implicit id: RequestId): Boolean = {
user match {
case User(userName, _, _, _) if userName.value == "testuser" => true
case _ => super.isUserAuthorizedForRequest(request, user)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.ing.wbaa.airlock.proxy.handler

import akka.actor.ActorSystem
import com.ing.wbaa.airlock.proxy.config.StorageS3Settings
import com.ing.wbaa.airlock.proxy.data.{User, UserRawJson}
import com.ing.wbaa.airlock.proxy.data.{RequestId, User, UserRawJson}
import com.ing.wbaa.airlock.proxy.handler.radosgw.RadosGatewayHandler
import com.ing.wbaa.airlock.proxy.provider.aws.S3Client
import org.scalatest.{DiagrammedAssertions, WordSpec}
Expand All @@ -16,6 +16,8 @@ class RadosGatewayHandlerItTest extends WordSpec with DiagrammedAssertions with

override protected[this] def storageS3Settings: StorageS3Settings = StorageS3Settings(system)

implicit val requestId: RequestId = RequestId("test")

private[this] val rgwAdmin: RgwAdmin = new RgwAdminBuilder()
.accessKey(storageS3Settings.storageS3AdminAccesskey)
.secretKey(storageS3Settings.storageS3AdminSecretkey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ class RequestHandlerS3ItTest extends AsyncWordSpec with DiagrammedAssertions wit
with FilterRecursiveListBucketHandler with MessageProviderKafka {
override implicit lazy val system: ActorSystem = testSystem
override val httpSettings: HttpSettings = airlockHttpSettings
override def isUserAuthorizedForRequest(request: S3Request, user: User): Boolean = true
override def isUserAuthorizedForRequest(request: S3Request, user: User)(implicit id: RequestId): Boolean = true
override val storageS3Settings: StorageS3Settings = StorageS3Settings(testSystem)
override val atlasSettings: AtlasSettings = AtlasSettings(testSystem)
override val kafkaSettings: KafkaSettings = KafkaSettings(testSystem)

override def areCredentialsActive(awsRequestCredential: AwsRequestCredential): Future[Option[User]] =
override def areCredentialsActive(awsRequestCredential: AwsRequestCredential)(implicit id: RequestId): Future[Option[User]] =
Future(Some(User(UserRawJson("userId", Set("group"), "accesskey", "secretkey"))))

def createLineageFromRequest(httpRequest: HttpRequest, userSTS: User, clientIPAddress: RemoteAddress): Future[LineagePostGuidResponse] = Future.failed(LineageProviderAtlasException("Create lineage failed"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class AuthenticationProviderSTSItTest extends AsyncWordSpec with DiagrammedAsser

override val stsSettings: StsSettings = StsSettings(testSystem)

implicit val requestId: RequestId = RequestId("test")

private val validKeycloakCredentials = Map(
"grant_type" -> "password",
"username" -> "testuser",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import scala.concurrent.Future
class AuthorizationProviderRangerItTest extends AsyncWordSpec with DiagrammedAssertions {
final implicit val testSystem: ActorSystem = ActorSystem.create("test-system")

implicit val requestId: RequestId = RequestId("test")

val s3Request = S3Request(
AwsRequestCredential(AwsAccessKey("accesskey"), Some(AwsSessionToken("sessiontoken"))),
Some("/demobucket"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import akka.actor.ActorSystem
import akka.http.scaladsl.model.{HttpMethods, RemoteAddress}
import akka.stream.ActorMaterializer
import com.ing.wbaa.airlock.proxy.config.KafkaSettings
import com.ing.wbaa.airlock.proxy.data.{AwsAccessKey, AwsRequestCredential, Read, S3Request}
import com.ing.wbaa.airlock.proxy.data._
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.scalatest.{DiagrammedAssertions, WordSpecLike}
import org.scalatest.RecoverMethods._
Expand All @@ -23,6 +23,8 @@ class MessageProviderKafkaItTest extends WordSpecLike with DiagrammedAssertions

override implicit val executionContext: ExecutionContext = testSystem.dispatcher

implicit val requestId: RequestId = RequestId("test")

val s3Request = S3Request(AwsRequestCredential(AwsAccessKey("a"), None), Some("demobucket"), Some("s3object"), Read)
.copy(clientIPAddress = RemoteAddress(InetAddress.getByName("127.0.0.1")))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,26 @@ package com.ing.wbaa.airlock.proxy.api
import akka.Done
import akka.http.scaladsl.model._
import com.ing.wbaa.airlock.proxy.config.{ AtlasSettings, KafkaSettings }
import com.ing.wbaa.airlock.proxy.data.{ LineageResponse, S3Request, User }
import com.typesafe.scalalogging.LazyLogging
import com.ing.wbaa.airlock.proxy.data.{ LineageResponse, RequestId, S3Request, User }
import com.ing.wbaa.airlock.proxy.handler.LoggerHandlerWithId

import scala.concurrent.Future
import scala.util.Failure
import scala.util.matching.Regex

trait PostRequestActions extends LazyLogging {
trait PostRequestActions {
import PostRequestActions._
import scala.concurrent.ExecutionContext.Implicits.global

private val logger = new LoggerHandlerWithId

protected[this] def atlasSettings: AtlasSettings

protected[this] def kafkaSettings: KafkaSettings

protected[this] def createLineageFromRequest(httpRequest: HttpRequest, userSTS: User, clientIPAddress: RemoteAddress): Future[LineageResponse]

protected[this] def emitEvent(s3Request: S3Request, method: HttpMethod, principalId: String): Future[Done]
protected[this] def emitEvent(s3Request: S3Request, method: HttpMethod, principalId: String)(implicit id: RequestId): Future[Done]

protected[this] def setDefaultBucketAcl(bucketName: String): Future[Unit]

Expand All @@ -33,7 +35,7 @@ trait PostRequestActions extends LazyLogging {
}

private[this] def createBucketNotification(response: HttpResponse, httpRequest: HttpRequest, s3Request: S3Request,
userSTS: User): Future[Done] =
userSTS: User)(implicit id: RequestId): Future[Done] =
httpRequest.method match {
case HttpMethods.POST | HttpMethods.PUT | HttpMethods.DELETE if kafkaSettings.kafkaEnabled && (response.status == StatusCodes.OK || response.status == StatusCodes.NoContent) =>
emitEvent(s3Request, httpRequest.method, userSTS.userName.value)
Expand All @@ -50,7 +52,7 @@ trait PostRequestActions extends LazyLogging {
}
}

protected[this] def handlePostRequestActions(response: HttpResponse, httpRequest: HttpRequest, s3Request: S3Request, userSTS: User): Unit = {
protected[this] def handlePostRequestActions(response: HttpResponse, httpRequest: HttpRequest, s3Request: S3Request, userSTS: User)(implicit id: RequestId): Unit = {
val lineage = createAtlasLineage(response, httpRequest, userSTS, s3Request.clientIPAddress)
val notification = createBucketNotification(response, httpRequest, s3Request, userSTS)
val permissions = updateBucketPermissions(httpRequest, s3Request)
Expand All @@ -66,7 +68,6 @@ trait PostRequestActions extends LazyLogging {
case Failure(err) => logger.error(s"Error while setting bucket permissions: ${err}")
})
}

}

object PostRequestActions {
Expand Down
24 changes: 15 additions & 9 deletions src/main/scala/com/ing/wbaa/airlock/proxy/api/ProxyService.scala
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
package com.ing.wbaa.airlock.proxy.api

import java.util.UUID

import akka.actor.ActorSystem
import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport._
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Route
import com.ing.wbaa.airlock.proxy.api.directive.ProxyDirectives
import com.ing.wbaa.airlock.proxy.data._
import com.ing.wbaa.airlock.proxy.handler.LoggerHandlerWithId
import com.ing.wbaa.airlock.proxy.provider.aws.AwsErrorCodes
import com.typesafe.scalalogging.LazyLogging

import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success }

trait ProxyService extends LazyLogging {
trait ProxyService {

private val logger = new LoggerHandlerWithId

// no validation of request currently
// once we get comfortable with get/put/del we can add permCheck
Expand All @@ -24,21 +28,23 @@ trait ProxyService extends LazyLogging {
protected[this] implicit def executionContext: ExecutionContext

// Request Handler methods
protected[this] def executeRequest(request: HttpRequest, userSTS: User, s3request: S3Request): Future[HttpResponse]
protected[this] def executeRequest(request: HttpRequest, userSTS: User, s3request: S3Request)(implicit id: RequestId): Future[HttpResponse]

// Authentication methods
protected[this] def areCredentialsActive(awsRequestCredential: AwsRequestCredential): Future[Option[User]]
protected[this] def areCredentialsActive(awsRequestCredential: AwsRequestCredential)(implicit id: RequestId): Future[Option[User]]

// AWS Signature methods
protected[this] def isUserAuthenticated(httpRequest: HttpRequest, awsSecretKey: AwsSecretKey): Boolean
protected[this] def isUserAuthenticated(httpRequest: HttpRequest, awsSecretKey: AwsSecretKey)(implicit id: RequestId): Boolean

// Authorization methods
protected[this] def isUserAuthorizedForRequest(request: S3Request, user: User): Boolean
protected[this] def isUserAuthorizedForRequest(request: S3Request, user: User)(implicit id: RequestId): Boolean

protected[this] def handlePostRequestActions(response: HttpResponse, httpRequest: HttpRequest, s3Request: S3Request, userSTS: User): Unit
protected[this] def handlePostRequestActions(response: HttpResponse, httpRequest: HttpRequest, s3Request: S3Request, userSTS: User)(implicit id: RequestId): Unit

val proxyServiceRoute: Route =

metricDuration {
implicit val requestId: RequestId = RequestId(UUID.randomUUID().toString)
withoutSizeLimit {
extractRequest { httpRequest =>
extracts3Request { s3Request =>
Expand All @@ -59,7 +65,7 @@ trait ProxyService extends LazyLogging {
}
}

protected[this] def processAuthorizedRequest(httpRequest: HttpRequest, s3Request: S3Request, userSTS: User): Route = {
protected[this] def processAuthorizedRequest(httpRequest: HttpRequest, s3Request: S3Request, userSTS: User)(implicit id: RequestId): Route = {
updateHeadersForRequest { newHttpRequest =>
val httpResponse = executeRequest(newHttpRequest, userSTS, s3Request).andThen {
case Success(response: HttpResponse) =>
Expand All @@ -69,7 +75,7 @@ trait ProxyService extends LazyLogging {
}
}

private def processRequestForValidUser(httpRequest: HttpRequest, s3Request: S3Request, userSTS: User) = {
private def processRequestForValidUser(httpRequest: HttpRequest, s3Request: S3Request, userSTS: User)(implicit id: RequestId) = {
if (isUserAuthenticated(httpRequest, userSTS.secretKey)) {
logger.debug(s"Request authenticated: $httpRequest")

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.ing.wbaa.airlock.proxy.api

import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport
import akka.http.scaladsl.model.{ HttpRequest }
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import com.ing.wbaa.airlock.proxy.data.{ Read, S3Request, User }
import com.ing.wbaa.airlock.proxy.data.{ Read, RequestId, S3Request, User }

import scala.xml.NodeSeq

Expand All @@ -17,7 +17,7 @@ trait ProxyServiceWithListAllBuckets extends ProxyService with ScalaXmlSupport {

protected[this] def listAllBuckets: Seq[String]

override protected[this] def processAuthorizedRequest(httpRequest: HttpRequest, s3Request: S3Request, userSTS: User): Route = {
override protected[this] def processAuthorizedRequest(httpRequest: HttpRequest, s3Request: S3Request, userSTS: User)(implicit id: RequestId): Route = {
s3Request match {
//only when list buckets is requested we show all buckets
case S3Request(_, None, None, accessType, _, _) if accessType == Read =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package com.ing.wbaa.airlock.proxy.data

case class RequestId(value: String) extends AnyVal
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ import akka.stream.alpakka.xml.scaladsl.{ XmlParsing, XmlWriting }
import akka.stream.alpakka.xml.{ EndElement, ParseEvent, StartElement, TextEvent }
import akka.stream.scaladsl.Flow
import akka.util.ByteString
import com.ing.wbaa.airlock.proxy.data.{ Read, S3Request, User }
import com.typesafe.scalalogging.LazyLogging
import com.ing.wbaa.airlock.proxy.data.{ Read, RequestId, S3Request, User }

import scala.collection.immutable
import scala.collection.mutable.ListBuffer
Expand All @@ -20,9 +19,9 @@ import scala.collection.mutable.ListBuffer
* If there is a ranger policy only for "read" the bucket (non recursively) we need to check all subdirs of the bucket
* in ranger as well
*/
trait FilterRecursiveListBucketHandler extends LazyLogging {
trait FilterRecursiveListBucketHandler {

protected[this] def isUserAuthorizedForRequest(request: S3Request, user: User): Boolean
protected[this] def isUserAuthorizedForRequest(request: S3Request, user: User)(implicit id: RequestId): Boolean

/**
* for list objects in bucket we need filter response when --recursive is set
Expand All @@ -34,7 +33,7 @@ trait FilterRecursiveListBucketHandler extends LazyLogging {
* @param response
* @return for recursive request it returns filtered response for others the original response
*/
protected[this] def filterResponse(request: HttpRequest, userSTS: User, s3request: S3Request, response: HttpResponse): HttpResponse = {
protected[this] def filterResponse(request: HttpRequest, userSTS: User, s3request: S3Request, response: HttpResponse)(implicit id: RequestId): HttpResponse = {
val noDelimiterWithReadAndNoObject =
!request.uri.rawQueryString.getOrElse("").contains("delimiter") && s3request.accessType == Read && s3request.s3Object.isEmpty
if (noDelimiterWithReadAndNoObject) {
Expand All @@ -51,7 +50,7 @@ trait FilterRecursiveListBucketHandler extends LazyLogging {
* @param requestS3
* @return xml as stream of bytes with only authorised resources
*/
protected[this] def filterRecursiveListObjects(user: User, requestS3: S3Request): Flow[ByteString, ByteString, NotUsed] = {
protected[this] def filterRecursiveListObjects(user: User, requestS3: S3Request)(implicit id: RequestId): Flow[ByteString, ByteString, NotUsed] = {
def elementResult(allContentsElements: ListBuffer[ParseEvent], isContentsTag: Boolean, element: ParseEvent): immutable.Seq[ParseEvent] = {
if (isContentsTag) {
allContentsElements += element
Expand All @@ -61,10 +60,9 @@ trait FilterRecursiveListBucketHandler extends LazyLogging {
}
}

def isPathOkInRangerPolicy(path: String): Boolean = {
def isPathOkInRangerPolicy(path: String)(implicit id: RequestId): Boolean = {
val pathToCheck = normalizePath(path)
val isUserAuthorized = isUserAuthorizedForRequest(requestS3.copy(s3BucketPath = Some(pathToCheck)), user)
logger.debug("user {} isUserAuthorized for path {} = {}", user.userName, pathToCheck, isUserAuthorized)
isUserAuthorized
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.ing.wbaa.airlock.proxy.handler

import com.ing.wbaa.airlock.proxy.data.RequestId
import com.typesafe.scalalogging.Logger
import org.slf4j.{ LoggerFactory, MDC }

class LoggerHandlerWithId {

@transient
private lazy val log: Logger =
Logger(LoggerFactory.getLogger(getClass.getName))

private val requestIdKey = "request.id"

def debug(message: String, args: Any*)(implicit id: RequestId): Unit = {
MDC.put(requestIdKey, id.value)
log.debug(message, args)
MDC.remove(requestIdKey)
}

def info(message: String, args: Any*)(implicit id: RequestId): Unit = {
MDC.put(requestIdKey, id.value)
log.info(message, args)
MDC.remove(requestIdKey)
}

def warn(message: String, args: Any*)(implicit id: RequestId): Unit = {
MDC.put(requestIdKey, id.value)
log.warn(message, args)
MDC.remove(requestIdKey)
}

def error(message: String, args: Any*)(implicit id: RequestId): Unit = {
MDC.put(requestIdKey, id.value)
log.error(message, args)
MDC.remove(requestIdKey)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,32 @@ import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.RawHeader
import com.ing.wbaa.airlock.proxy.config.StorageS3Settings
import com.ing.wbaa.airlock.proxy.data.{ S3Request, User }
import com.ing.wbaa.airlock.proxy.data.{ RequestId, S3Request, User }
import com.ing.wbaa.airlock.proxy.handler.radosgw.RadosGatewayHandler
import com.ing.wbaa.airlock.proxy.provider.aws.S3Client
import com.typesafe.scalalogging.LazyLogging

import scala.concurrent.{ ExecutionContext, Future }
import scala.util.Success

trait RequestHandlerS3 extends LazyLogging with RadosGatewayHandler with S3Client {
trait RequestHandlerS3 extends RadosGatewayHandler with S3Client {

private val logger = new LoggerHandlerWithId

protected[this] implicit def system: ActorSystem

protected[this] implicit def executionContext: ExecutionContext

protected[this] def storageS3Settings: StorageS3Settings

protected[this] def filterResponse(request: HttpRequest, userSTS: User, s3request: S3Request, response: HttpResponse): HttpResponse
protected[this] def filterResponse(request: HttpRequest, userSTS: User, s3request: S3Request, response: HttpResponse)(implicit id: RequestId): HttpResponse

/**
* Updates the URI for S3 and sends the request to S3.
*
* If we get back a Forbidden code, we can try to check if there's new credentials for Ceph first.
* If so, we can retry the request.
*/
protected[this] def executeRequest(request: HttpRequest, userSTS: User, s3request: S3Request): Future[HttpResponse] = {
protected[this] def executeRequest(request: HttpRequest, userSTS: User, s3request: S3Request)(implicit id: RequestId): Future[HttpResponse] = {
val userAgent = request.getHeader("User-Agent").orElse(RawHeader("User-Agent", "unknown")).value()
val newRequest = request
.withUri(request.uri.withAuthority(storageS3Settings.storageS3Authority))
Expand All @@ -51,7 +52,7 @@ trait RequestHandlerS3 extends LazyLogging with RadosGatewayHandler with S3Clien
* @param request request to fire to S3
* @return response from S3
*/
protected[this] def fireRequestToS3(request: HttpRequest): Future[HttpResponse] = {
protected[this] def fireRequestToS3(request: HttpRequest)(implicit id: RequestId): Future[HttpResponse] = {
logger.debug(s"Request to send to Ceph: $request")
Http()
.singleRequest(request)
Expand Down
Loading

0 comments on commit 163316f

Please sign in to comment.