Skip to content

Commit

Permalink
Merge pull request #11 from ing-bank/feature/list-all-buckets
Browse files Browse the repository at this point in the history
Feature/list all buckets
  • Loading branch information
arempter authored Oct 29, 2018
2 parents 7291335 + 3a0f12f commit ece95a7
Show file tree
Hide file tree
Showing 11 changed files with 216 additions and 48 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ services:
language: scala

scala:
- 2.12.6
- 2.12.7

env:
global:
Expand Down
13 changes: 7 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ import com.typesafe.sbt.packager.docker.ExecCmd
import scalariform.formatter.preferences._

name := "airlock"
version := "0.1"
version := "0.1.1"

scalaVersion := "2.12.6"
scalaVersion := "2.12.7"

scalacOptions += "-unchecked"
scalacOptions += "-deprecation"
Expand All @@ -18,22 +18,23 @@ scalacOptions += "-Xfatal-warnings"
// Experimental: improved update resolution.
updateOptions := updateOptions.value.withCachedResolution(cachedResoluton = true)

val akkaVersion = "10.1.3"
val akkaStreamVersion = "2.5.14"
val akkaVersion = "10.1.5"
val akkaStreamVersion = "2.5.17"

libraryDependencies ++= Seq(
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.0",
"ch.qos.logback" % "logback-classic" % "1.2.3" % Runtime,
"com.typesafe.akka" %% "akka-http" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaStreamVersion,
"com.typesafe.akka" %% "akka-http-spray-json" % akkaVersion,
"com.typesafe.akka" %% "akka-http-xml" % akkaVersion,
"com.typesafe.akka" %% "akka-http-testkit" % akkaVersion,
"com.amazonaws" % "aws-java-sdk-s3" % "1.11.372",
"com.amazonaws" % "aws-java-sdk-s3" % "1.11.437",
"com.lightbend.akka" %% "akka-stream-alpakka-s3" % "0.20",
"org.apache.ranger" % "ranger-plugins-common" % "1.1.0",
"io.github.twonote" % "radosgw-admin4j" % "1.0.2",
"org.scalatest" %% "scalatest" % "3.0.5" % "it,test",
"com.amazonaws" % "aws-java-sdk-sts" % "1.11.372" % IntegrationTest
"com.amazonaws" % "aws-java-sdk-sts" % "1.11.437" % IntegrationTest
)

// Fix logging dependencies:
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version = 1.1.6
sbt.version = 1.2.6
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import scala.util.Random
class RadosGatewayHandlerItTest extends WordSpec with DiagrammedAssertions with RadosGatewayHandler {

override protected[this] implicit def system: ActorSystem = ActorSystem("test-system")

override protected[this] def storageS3Settings: StorageS3Settings = StorageS3Settings(system)

private[this] val rgwAdmin: RgwAdmin = new RgwAdminBuilder()
Expand Down Expand Up @@ -94,5 +95,9 @@ class RadosGatewayHandlerItTest extends WordSpec with DiagrammedAssertions with
assert(!isUpdated)
}
}

"list all buckets" in {
assert(listAllBuckets.head == "demobucket")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ class AuthorizationProviderRangerItTest extends AsyncWordSpec with DiagrammedAss
assert(apr.isUserAuthorizedForRequest(s3Request.copy(bucketObjectRoot = Some("object")), user, clientIPAddress))
}

"doesn't authorize for requests without bucket" in withAuthorizationProviderRanger() { apr =>
assert(!apr.isUserAuthorizedForRequest(s3Request.copy(bucket = None), user, clientIPAddress))
"authorize for requests without bucket" in withAuthorizationProviderRanger() { apr =>
assert(apr.isUserAuthorizedForRequest(s3Request.copy(bucket = None), user, clientIPAddress))
}

"doesn't authorize for requests that are not supposed to be (Write)" in withAuthorizationProviderRanger() { apr =>
Expand All @@ -72,8 +72,8 @@ class AuthorizationProviderRangerItTest extends AsyncWordSpec with DiagrammedAss
assert(apr.isUserAuthorizedForRequest(s3Request, user.copy(userAssumedGroup = Some(UserAssumedGroup("unauthorized"))), clientIPAddress))
}

"doesn't authorize allow-list-buckets with default settings" in withAuthorizationProviderRanger() { apr =>
assert(!apr.isUserAuthorizedForRequest(s3Request.copy(bucket = None, bucketObjectRoot = None, accessType = Read), user, clientIPAddress))
"authorize allow-list-buckets with default settings" in withAuthorizationProviderRanger() { apr =>
assert(apr.isUserAuthorizedForRequest(s3Request.copy(bucket = None, bucketObjectRoot = None, accessType = Read), user, clientIPAddress))
}

"does authorize allow-list-buckets set to true" in withAuthorizationProviderRanger(new RangerSettings(testSystem.settings.config) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ airlock {
# ranger-s3-security.xml
service_type = "s3"
app_id = "testservice"
allow-list-buckets = false
allow-list-buckets = true
allow-create-buckets = false
user-domain-postfix = ""
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import com.ing.wbaa.airlock.proxy.api.{ HealthService, ProxyService }
import com.ing.wbaa.airlock.proxy.api.{ HealthService, ProxyServiceWithListAllBuckets }
import com.ing.wbaa.airlock.proxy.config.HttpSettings
import com.typesafe.scalalogging.LazyLogging

import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success }

trait AirlockS3Proxy extends LazyLogging with ProxyService {
trait AirlockS3Proxy extends LazyLogging with ProxyServiceWithListAllBuckets {

protected[this] implicit def system: ActorSystem
protected[this] implicit lazy val materializer: ActorMaterializer = ActorMaterializer()(system)
Expand Down
62 changes: 34 additions & 28 deletions src/main/scala/com/ing/wbaa/airlock/proxy/api/ProxyService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ trait ProxyService extends LazyLogging {
import akka.http.scaladsl.server.Directives._

protected[this] implicit def system: ActorSystem

protected[this] implicit def executionContext: ExecutionContext

// Request Handler methods
Expand All @@ -35,6 +36,7 @@ trait ProxyService extends LazyLogging {

// Atlas Lineage
protected[this] def atlasSettings: AtlasSettings

protected[this] def createLineageFromRequest(httpRequest: HttpRequest, userSTS: User): Future[LineageResponse]

val proxyServiceRoute: Route =
Expand All @@ -44,41 +46,14 @@ trait ProxyService extends LazyLogging {
s"${clientIPAddress.toOption.map(_.getHostAddress).getOrElse("unknown")}")
extractRequest { httpRequest =>
extracts3Request { s3Request =>

onComplete(areCredentialsActive(s3Request.credential)) {
case Success(Some(userSTS: User)) =>
logger.debug(s"Credentials active for request, user retrieved: $userSTS")

if (isUserAuthenticated(httpRequest, userSTS.secretKey)) {
logger.debug(s"Request authenticated: $httpRequest")

if (isUserAuthorizedForRequest(s3Request, userSTS, clientIPAddress)) {
logger.info(s"User (${userSTS.userName}) successfully authorized for request: $s3Request")

updateHeadersForRequest { newHttpRequest =>
val httpResponse = executeRequest(newHttpRequest, userSTS).andThen {
case Success(response: HttpResponse) =>
if (atlasSettings.atlasEnabled && (response.status == StatusCodes.OK || response.status == StatusCodes.NoContent))
// delete on AWS response 204
createLineageFromRequest(httpRequest, userSTS)
}
complete(httpResponse)
}

} else {
logger.warn(s"User (${userSTS.userName}) not authorized for request: $s3Request")
reject(AuthorizationFailedRejection)
}
} else {
logger.warn(s"Request not authenticated: $httpRequest")
complete(StatusCodes.Forbidden)
}

processRequestForValidUser(clientIPAddress, httpRequest, s3Request, userSTS)
case Success(None) =>
val msg = s"Request not authenticated: $s3Request"
logger.warn(msg)
complete((StatusCodes.Forbidden, msg))

case Failure(exception) =>
logger.error(s"An error occurred checking authentication with STS service", exception)
complete(StatusCodes.InternalServerError)
Expand All @@ -87,4 +62,35 @@ trait ProxyService extends LazyLogging {
}
}
}

protected[this] def processAuthorizedRequest(httpRequest: HttpRequest, s3Request: S3Request, userSTS: User): Route = {
updateHeadersForRequest { newHttpRequest =>
val httpResponse = executeRequest(newHttpRequest, userSTS).andThen {
case Success(response: HttpResponse) =>
if (atlasSettings.atlasEnabled && (response.status == StatusCodes.OK || response.status == StatusCodes.NoContent))
// delete on AWS response 204
createLineageFromRequest(httpRequest, userSTS)
}
complete(httpResponse)
}
}

private def processRequestForValidUser(clientIPAddress: RemoteAddress, httpRequest: HttpRequest, s3Request: S3Request, userSTS: User) = {
if (isUserAuthenticated(httpRequest, userSTS.secretKey)) {
logger.debug(s"Request authenticated: $httpRequest")

if (isUserAuthorizedForRequest(s3Request, userSTS, clientIPAddress)) {
logger.info(s"User (${userSTS.userName}) successfully authorized for request: $s3Request")

processAuthorizedRequest(httpRequest, s3Request, userSTS)

} else {
logger.warn(s"User (${userSTS.userName}) not authorized for request: $s3Request")
reject(AuthorizationFailedRejection)
}
} else {
logger.warn(s"Request not authenticated: $httpRequest")
complete(StatusCodes.Forbidden)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.ing.wbaa.airlock.proxy.api

import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.server.Route
import com.ing.wbaa.airlock.proxy.data.{ Read, S3Request, User }
import akka.http.scaladsl.server.Directives._

import scala.xml.NodeSeq

/**
* Because aws api allows only list own buckets and we want user to see all bucket in our system
* there is a hack to list bucket by radosgw-admin request
* The standard aws functionality are in the ProxyService trait
*/
trait ProxyServiceWithListAllBuckets extends ProxyService with ScalaXmlSupport {

protected[this] def listAllBuckets: Seq[String]

override protected[this] def processAuthorizedRequest(httpRequest: HttpRequest, s3Request: S3Request, userSTS: User): Route = {
s3Request match {
//only when list buckets is requested we show all buckets
case S3Request(_, None, None, accessType) if accessType == Read =>
complete(getListAllMyBucketsXml())
case _ => super.processAuthorizedRequest(httpRequest, s3Request, userSTS)
}
}

private def getListAllMyBucketsXml(user: String = "npa", createDate: String = "2018-01-01T00:00:00.000Z"): NodeSeq = {
<ListAllMyBucketsResult>
<Owner>
<ID>{ user }</ID>
<DisplayName>{ user }</DisplayName>
</Owner>
<Buckets>
{ for (bucketName <- listAllBuckets) yield <Bucket><Name>{ bucketName }</Name><CreationDate>{ createDate }</CreationDate></Bucket> }
</Buckets>
</ListAllMyBucketsResult>
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.ing.wbaa.airlock.proxy.config.StorageS3Settings
import com.ing.wbaa.airlock.proxy.data.{ AwsAccessKey, AwsSecretKey, User, UserName }
import com.typesafe.scalalogging.LazyLogging
import org.twonote.rgwadmin4j.{ RgwAdmin, RgwAdminBuilder }
import scala.collection.JavaConverters._

import scala.util.{ Failure, Success, Try }

Expand All @@ -15,6 +16,7 @@ trait RadosGatewayHandler extends LazyLogging {
protected[this] def storageS3Settings: StorageS3Settings

private[this] case class CredentialsOnCeph(awsAccessKey: AwsAccessKey, awsSecretKey: AwsSecretKey)

private[this] case class UserOnCeph(userName: UserName, credentials: List[CredentialsOnCeph])

private[this] lazy val rgwAdmin: RgwAdmin = new RgwAdminBuilder()
Expand All @@ -24,7 +26,6 @@ trait RadosGatewayHandler extends LazyLogging {
.build

private[this] def createCredentialsOnCeph(userName: UserName, awsAccessKey: AwsAccessKey, awsSecretKey: AwsSecretKey): Boolean = {
import scala.collection.JavaConverters._

Try {
rgwAdmin.createUser(
Expand Down Expand Up @@ -69,7 +70,6 @@ trait RadosGatewayHandler extends LazyLogging {
}

private[this] def getUserOnCeph(userName: UserName): Option[UserOnCeph] = {
import scala.collection.JavaConverters._

Try(rgwAdmin.getUserInfo(userName.value)).toOption.flatMap(cuo =>
if (cuo.isPresent) {
Expand All @@ -88,11 +88,11 @@ trait RadosGatewayHandler extends LazyLogging {
* Checks how to handle the current inconsistent situation, these optional cases apply:
*
* 1. The user with accesskey/secretkey pair doesn't exist yet on S3
* solution: with the User information retrieved from the STS service we can create them
* solution: with the User information retrieved from the STS service we can create them
* 2. The user exists, but his accesskey/secretkey pair changed
* solution: update accesskey/secretkey
* solution: update accesskey/secretkey
* 3. Any other reason (e.g. invalid accesskey/secretkey used for this user)
* left as is
* left as is
*
* @param userSTS User as retrieved from STS
* @return True if a change was done on RadosGw
Expand Down Expand Up @@ -128,4 +128,12 @@ trait RadosGatewayHandler extends LazyLogging {
}
}
}

/**
* List all buckets - no matters who is the owner
* @return - list of all buckets
*/
protected[this] def listAllBuckets: Seq[String] = {
rgwAdmin.listBucket("").asScala
}
}
Loading

0 comments on commit ece95a7

Please sign in to comment.