Skip to content

Commit

Permalink
Merge pull request #42 from ing-bank/feature/events-emitter
Browse files Browse the repository at this point in the history
Add possiblity to emit AWS like events based on user requests
  • Loading branch information
WalkerTR committed Feb 18, 2019
2 parents 5802d9a + ee5c006 commit 35658c2
Show file tree
Hide file tree
Showing 18 changed files with 346 additions and 33 deletions.
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,23 @@ Lineage is done according following model
To check lineage that has been created, login to Atlas web UI console, [default url](http://localhost:21000) with
admin user and password

# Events Notification

Airlock can send event notification to message queue based on user requests, in [AWS format](https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html).

Currently, two types are emitted:

- s3:ObjectCreated:*
- s3:ObjectRemoved:*

In order to enable update application.conf, mainly:

```
enabled = ${?AIRLOCK_KAFKA_ENABLED}
bootstrapServers = ${?AIRLOCK_KAFKA_BOOTSTRAP_SERVERS}
createTopic = ${?AIRLOCK_KAFKA_CREATE_TOPIC}
deleteTopic = ${?AIRLOCK_KAFKA_DELETE_TOPIC}
```

# Setting Up AWS CLI

Expand Down
7 changes: 4 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import com.typesafe.sbt.packager.docker.ExecCmd
import scalariform.formatter.preferences._

name := "airlock"
version := "0.1.6"
version := "0.1.7"

scalaVersion := "2.12.8"

Expand Down Expand Up @@ -34,8 +34,9 @@ libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-http-spray-json" % akkaVersion,
"com.typesafe.akka" %% "akka-http-xml" % akkaVersion,
"com.amazonaws" % "aws-java-sdk-s3" % "1.11.437",
"com.lightbend.akka" %% "akka-stream-alpakka-s3" % "0.20",
"org.apache.ranger" % "ranger-plugins-common" % "1.1.0",
"org.apache.kafka" % "kafka-clients" % "2.0.0",
"net.manub" %% "scalatest-embedded-kafka" % "2.0.0" % IntegrationTest,
"org.apache.ranger" % "ranger-plugins-common" % "1.1.0" exclude("org.apache.kafka", "kafka_2.11"),
"io.github.twonote" % "radosgw-admin4j" % "1.0.2",
"com.typesafe.akka" %% "akka-http-testkit" % akkaVersion % Test,
"org.scalatest" %% "scalatest" % "3.0.5" % "it,test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import akka.stream.ActorMaterializer
import com.amazonaws.auth.BasicSessionCredentials
import com.amazonaws.services.securitytoken.AWSSecurityTokenService
import com.amazonaws.services.securitytoken.model.GetSessionTokenRequest
import com.ing.wbaa.airlock.proxy.config.{AtlasSettings, HttpSettings, StorageS3Settings, StsSettings}
import com.ing.wbaa.airlock.proxy.config._
import com.ing.wbaa.airlock.proxy.data._
import com.ing.wbaa.airlock.proxy.handler.RequestHandlerS3
import com.ing.wbaa.airlock.proxy.provider.{AuthenticationProviderSTS, LineageProviderAtlas, SignatureProviderAws}
import com.ing.wbaa.airlock.proxy.provider.{AuthenticationProviderSTS, LineageProviderAtlas, MessageProviderKafka, SignatureProviderAws}
import com.ing.wbaa.testkit.AirlockFixtures
import com.ing.wbaa.testkit.awssdk.{S3SdkHelpers, StsSdkHelpers}
import com.ing.wbaa.testkit.oauth.OAuth2TokenRequest
Expand Down Expand Up @@ -54,12 +54,13 @@ class AirlockS3ProxyItTest extends AsyncWordSpec with DiagrammedAssertions
* @return Future[Assertion]
*/
def withSdkToMockProxy(testCode: (AWSSecurityTokenService, Authority) => Future[Assertion]): Future[Assertion] = {
val proxy: AirlockS3Proxy = new AirlockS3Proxy with RequestHandlerS3 with AuthenticationProviderSTS with LineageProviderAtlas with SignatureProviderAws {
val proxy: AirlockS3Proxy = new AirlockS3Proxy with RequestHandlerS3 with AuthenticationProviderSTS with LineageProviderAtlas with SignatureProviderAws with MessageProviderKafka {
override implicit lazy val system: ActorSystem = testSystem
override val httpSettings: HttpSettings = airlockHttpSettings
override val storageS3Settings: StorageS3Settings = StorageS3Settings(testSystem)
override val stsSettings: StsSettings = StsSettings(testSystem)
override val atlasSettings: AtlasSettings = new AtlasSettings(testSystem.settings.config)
override val atlasSettings: AtlasSettings = AtlasSettings(testSystem)
override val kafkaSettings: KafkaSettings = KafkaSettings(testSystem)

override def isUserAuthorizedForRequest(request: S3Request, user: User, clientIPAddress: RemoteAddress, headerIPs: HeaderIPs): Boolean = true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import akka.http.scaladsl.model.Uri.{Authority, Host}
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.{CopyObjectRequest, ObjectMetadata}
import com.ing.wbaa.airlock.proxy.AirlockS3Proxy
import com.ing.wbaa.airlock.proxy.config.{AtlasSettings, HttpSettings, StorageS3Settings}
import com.ing.wbaa.airlock.proxy.config.{AtlasSettings, HttpSettings, KafkaSettings, StorageS3Settings}
import com.ing.wbaa.airlock.proxy.data._
import com.ing.wbaa.airlock.proxy.provider.LineageProviderAtlas.LineageProviderAtlasException
import com.ing.wbaa.airlock.proxy.provider.SignatureProviderAws
import com.ing.wbaa.airlock.proxy.provider.{MessageProviderKafka, SignatureProviderAws}
import com.ing.wbaa.testkit.AirlockFixtures
import org.scalatest._

Expand All @@ -37,12 +37,13 @@ class RequestHandlerS3ItTest extends AsyncWordSpec with DiagrammedAssertions wit
* @return Assertion
*/
def withS3SdkToMockProxy(awsSignerType: String)(testCode: AmazonS3 => Assertion): Future[Assertion] = {
val proxy: AirlockS3Proxy = new AirlockS3Proxy with RequestHandlerS3 with SignatureProviderAws {
val proxy: AirlockS3Proxy = new AirlockS3Proxy with RequestHandlerS3 with SignatureProviderAws with MessageProviderKafka {
override implicit lazy val system: ActorSystem = testSystem
override val httpSettings: HttpSettings = airlockHttpSettings
override def isUserAuthorizedForRequest(request: S3Request, user: User, clientIPAddress: RemoteAddress, headerIPs: HeaderIPs): Boolean = true
override val storageS3Settings: StorageS3Settings = StorageS3Settings(testSystem)
override val atlasSettings: AtlasSettings = new AtlasSettings(system.settings.config)
override val atlasSettings: AtlasSettings = AtlasSettings(testSystem)
override val kafkaSettings: KafkaSettings = KafkaSettings(testSystem)

override def areCredentialsActive(awsRequestCredential: AwsRequestCredential): Future[Option[User]] =
Future(Some(User(UserRawJson("userId", Set("group"), "accesskey", "secretkey"))))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.ing.wbaa.airlock.proxy.provider

import java.net.InetAddress

import akka.actor.ActorSystem
import akka.http.scaladsl.model.{HttpMethods, RemoteAddress}
import akka.stream.ActorMaterializer
import com.ing.wbaa.airlock.proxy.config.KafkaSettings
import com.ing.wbaa.airlock.proxy.data.{AwsAccessKey, AwsRequestCredential, Read, S3Request}
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.scalatest.{DiagrammedAssertions, WordSpecLike}
import org.scalatest.RecoverMethods._

import scala.concurrent.ExecutionContext

class MessageProviderKafkaItTest extends WordSpecLike with DiagrammedAssertions with EmbeddedKafka with MessageProviderKafka {

implicit val testSystem: ActorSystem = ActorSystem("kafkaTest")

override implicit val kafkaSettings: KafkaSettings = KafkaSettings(testSystem)

override implicit val materializer: ActorMaterializer = ActorMaterializer()

override implicit val executionContext: ExecutionContext = testSystem.dispatcher

val s3Request = S3Request(AwsRequestCredential(AwsAccessKey("a"), None), Some("demobucket"), Some("s3object"), Read)
val remoteClientIP = RemoteAddress(InetAddress.getByName("127.0.0.1"))

"KafkaMessageProvider" should {
"Send message to correct topic with Put or Post" in {
implicit val config = EmbeddedKafkaConfig(kafkaPort = 9092)

withRunningKafka {
val createEventsTopic = "create_events"
createCustomTopic(createEventsTopic)

emitEvent(s3Request, HttpMethods.PUT, "testUser", remoteClientIP)
val result = consumeFirstStringMessageFrom(createEventsTopic)
assert(result.contains("s3:ObjectCreated:PUT"))
}
}

"Send message to correct topic with Delete" in {
implicit val config = EmbeddedKafkaConfig(kafkaPort = 9092)

withRunningKafka {
val deleteEventsTopic = "delete_events"
createCustomTopic(deleteEventsTopic)

emitEvent(s3Request, HttpMethods.DELETE, "testUser", remoteClientIP)
assert(consumeFirstStringMessageFrom(deleteEventsTopic).contains("s3:ObjectRemoved:DELETE"))
}
}

"fail on incomplete data" in {
recoverToSucceededIf[Exception](emitEvent(s3Request.copy(s3Object = None), HttpMethods.PUT, "testUser", remoteClientIP))
}
}

}
11 changes: 11 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,14 @@ airlock {
principal = ${?AIRLOCK_KERBEROS_PRINCIPAL}
}
}

kafka.producer {
enabled = ${?AIRLOCK_KAFKA_ENABLED}
bootstrapServers = ${?AIRLOCK_KAFKA_BOOTSTRAP_SERVERS}
createTopic = ${?AIRLOCK_KAFKA_CREATE_TOPIC}
deleteTopic = ${?AIRLOCK_KAFKA_DELETE_TOPIC}
retries = ${?AIRLOCK_KAFKA_PRODUCER_RETRIES}
backoff = ${?AIRLOCK_KAFKA_PRODUCER_BACKOFF}
backoffMax = ${?AIRLOCK_KAFKA_PRODUCER_BACKOFFMAX}
}

11 changes: 11 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,16 @@ akka {
client.parsing.modeled-header-parsing = off
client.user-agent-header = ""
}
}

kafka.producer {
enabled = false
# A comma-separated list of host/port pairs to use for establishing the initial connection to the Kafka cluster
bootstrapServers = "127.0.0.1:9092"
createTopic = "create_events"
deleteTopic = "delete_events"
retries = 3
backoff = 3000
backoffMax = 3000
}

Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import com.ing.wbaa.airlock.proxy.api.{ HealthService, ProxyServiceWithListAllBuckets }
import com.ing.wbaa.airlock.proxy.api.{ HealthService, PostRequestActions, ProxyServiceWithListAllBuckets }
import com.ing.wbaa.airlock.proxy.config.HttpSettings
import com.typesafe.scalalogging.LazyLogging

import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success }

trait AirlockS3Proxy extends LazyLogging with ProxyServiceWithListAllBuckets with HealthService {
trait AirlockS3Proxy extends LazyLogging with ProxyServiceWithListAllBuckets with PostRequestActions with HealthService {

protected[this] implicit def system: ActorSystem
protected[this] implicit lazy val materializer: ActorMaterializer = ActorMaterializer()(system)
Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/com/ing/wbaa/airlock/proxy/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.ing.wbaa.airlock.proxy.provider._

object Server extends App {

new AirlockS3Proxy with AuthorizationProviderRanger with RequestHandlerS3 with AuthenticationProviderSTS with LineageProviderAtlas with SignatureProviderAws with KerberosLoginProvider {
new AirlockS3Proxy with AuthorizationProviderRanger with RequestHandlerS3 with AuthenticationProviderSTS with LineageProviderAtlas with SignatureProviderAws with KerberosLoginProvider with MessageProviderKafka {
override implicit lazy val system: ActorSystem = ActorSystem.create("airlock")

override def kerberosSettings: KerberosSettings = KerberosSettings(system)
Expand All @@ -17,6 +17,7 @@ object Server extends App {
override val storageS3Settings: StorageS3Settings = StorageS3Settings(system)
override val stsSettings: StsSettings = StsSettings(system)
override val atlasSettings: AtlasSettings = AtlasSettings(system)
override val kafkaSettings: KafkaSettings = KafkaSettings(system)

// Force Ranger plugin to initialise on startup
rangerPluginForceInit
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.ing.wbaa.airlock.proxy.api

import akka.Done
import akka.http.scaladsl.model._
import com.ing.wbaa.airlock.proxy.config.{ AtlasSettings, KafkaSettings }
import com.ing.wbaa.airlock.proxy.data.{ LineageResponse, S3Request, User }

import scala.concurrent.Future

trait PostRequestActions {
protected[this] def atlasSettings: AtlasSettings

protected[this] def kafkaSettings: KafkaSettings

protected[this] def createLineageFromRequest(httpRequest: HttpRequest, userSTS: User, clientIPAddress: RemoteAddress): Future[LineageResponse]

protected[this] def emitEvent(s3Request: S3Request, method: HttpMethod, principalId: String, clientIPAddress: RemoteAddress): Future[Done]

def createAtlasLineage(response: HttpResponse, httpRequest: HttpRequest, userSTS: User, clientIPAddress: RemoteAddress): Unit =
if (atlasSettings.atlasEnabled && (response.status == StatusCodes.OK || response.status == StatusCodes.NoContent))
// delete on AWS response 204
createLineageFromRequest(httpRequest, userSTS, clientIPAddress)

protected[this] def createBucketNotification(response: HttpResponse, httpRequest: HttpRequest, s3Request: S3Request,
userSTS: User, clientIPAddress: RemoteAddress): Future[Done] =
httpRequest.method match {
case HttpMethods.POST | HttpMethods.PUT | HttpMethods.DELETE if kafkaSettings.kafkaEnabled && (response.status == StatusCodes.OK || response.status == StatusCodes.NoContent) =>
emitEvent(s3Request, httpRequest.method, userSTS.userName.value, clientIPAddress)
case _ => Future.successful(Done)
}

protected[this] def handlePostRequestActions(response: HttpResponse, httpRequest: HttpRequest, s3Request: S3Request, userSTS: User, clientIPAddress: RemoteAddress): Unit = {
createAtlasLineage(response, httpRequest, userSTS, clientIPAddress)
createBucketNotification(response, httpRequest, s3Request, userSTS, clientIPAddress)
}

}
12 changes: 3 additions & 9 deletions src/main/scala/com/ing/wbaa/airlock/proxy/api/ProxyService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ package com.ing.wbaa.airlock.proxy.api

import akka.actor.ActorSystem
import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport._
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, RemoteAddress, StatusCodes }
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Route
import com.ing.wbaa.airlock.proxy.api.directive.ProxyDirectives
import com.ing.wbaa.airlock.proxy.config.AtlasSettings
import com.ing.wbaa.airlock.proxy.data._
import com.ing.wbaa.airlock.proxy.provider.aws.AwsErrorCodes
import com.typesafe.scalalogging.LazyLogging
Expand Down Expand Up @@ -36,10 +35,7 @@ trait ProxyService extends LazyLogging {
// Authorization methods
protected[this] def isUserAuthorizedForRequest(request: S3Request, user: User, clientIPAddress: RemoteAddress, headerIPs: HeaderIPs): Boolean

// Atlas Lineage
protected[this] def atlasSettings: AtlasSettings

protected[this] def createLineageFromRequest(httpRequest: HttpRequest, userSTS: User, clientIPAddress: RemoteAddress): Future[LineageResponse]
protected[this] def handlePostRequestActions(response: HttpResponse, httpRequest: HttpRequest, s3Request: S3Request, userSTS: User, clientIPAddress: RemoteAddress): Unit

val proxyServiceRoute: Route =
withoutSizeLimit {
Expand Down Expand Up @@ -70,9 +66,7 @@ trait ProxyService extends LazyLogging {
updateHeadersForRequest { newHttpRequest =>
val httpResponse = executeRequest(newHttpRequest, userSTS).andThen {
case Success(response: HttpResponse) =>
if (atlasSettings.atlasEnabled && (response.status == StatusCodes.OK || response.status == StatusCodes.NoContent))
// delete on AWS response 204
createLineageFromRequest(httpRequest, userSTS, clientIPAddress)
handlePostRequestActions(response, httpRequest, s3Request, userSTS, clientIPAddress)
}
complete(httpResponse)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.ing.wbaa.airlock.proxy.config

import akka.actor.{ ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }
import com.typesafe.config.Config

class KafkaSettings(config: Config) extends Extension {
val kafkaEnabled: Boolean = config.getBoolean("kafka.producer.enabled")
val bootstrapServers: String = config.getString("kafka.producer.bootstrapServers")
val createEventsTopic: String = config.getString("kafka.producer.createTopic")
val deleteEventsTopic: String = config.getString("kafka.producer.deleteTopic")
val retries: String = config.getString("kafka.producer.retries")
val retriesBackOff: String = config.getString("kafka.producer.backoff")
val retriesBackOffMax: String = config.getString("kafka.producer.backoffMax")
val kafkaConfig = config.getConfig("kafka.producer")
}

object KafkaSettings extends ExtensionId[KafkaSettings] with ExtensionIdProvider {
override def createExtension(system: ExtendedActorSystem): KafkaSettings = new KafkaSettings(system.settings.config)
override def lookup(): ExtensionId[KafkaSettings] = KafkaSettings
}
Loading

0 comments on commit 35658c2

Please sign in to comment.