diff --git a/driver/src/main/scala/org/reactivecouchbase/Couchbase.scala b/driver/src/main/scala/org/reactivecouchbase/Couchbase.scala index 4ec1ecc..9ea004a 100644 --- a/driver/src/main/scala/org/reactivecouchbase/Couchbase.scala +++ b/driver/src/main/scala/org/reactivecouchbase/Couchbase.scala @@ -161,13 +161,13 @@ class CouchbaseBucket( private[reactivecouchbase] val cbDriver: ReactiveCouchbas * Configuration for HTTP client */ private[reactivecouchbase] val config: AsyncHttpClientConfig = new AsyncHttpClientConfig.Builder() - .setAllowPoolingConnection(cbDriver.configuration.getBoolean("couchbase.http.pool").getOrElse(true)) - .setCompressionEnabled(cbDriver.configuration.getBoolean("couchbase.http.compression").getOrElse(true)) - .setRequestTimeoutInMs(cbDriver.configuration.getInt("couchbase.http.reqtimeout").getOrElse(60000)) - .setIdleConnectionInPoolTimeoutInMs(cbDriver.configuration.getInt("couchbase.http.idlepool").getOrElse(60000)) - .setIdleConnectionTimeoutInMs(cbDriver.configuration.getInt("couchbase.http.idleconnection").getOrElse(60000)) - .setMaximumConnectionsTotal(cbDriver.configuration.getInt("couchbase.http.maxTotalConnections").getOrElse(-1)) - .setMaximumConnectionsPerHost(cbDriver.configuration.getInt("couchbase.http.maxConnectionsPerHost").getOrElse(-1)) + .setAllowPoolingConnections(cbDriver.configuration.getBoolean("couchbase.http.pool").getOrElse(true)) + .setCompressionEnforced(cbDriver.configuration.getBoolean("couchbase.http.compression").getOrElse(true)) + .setRequestTimeout(cbDriver.configuration.getInt("couchbase.http.reqtimeout").getOrElse(60000)) + .setPooledConnectionIdleTimeout(cbDriver.configuration.getInt("couchbase.http.idlepool").getOrElse(60000)) + .setConnectTimeout(cbDriver.configuration.getInt("couchbase.http.idleconnection").getOrElse(60000)) + .setMaxConnections(cbDriver.configuration.getInt("couchbase.http.maxTotalConnections").getOrElse(-1)) + .setMaxConnectionsPerHost(cbDriver.configuration.getInt("couchbase.http.maxConnectionsPerHost").getOrElse(-1)) .build() /** @@ -290,8 +290,8 @@ class ReactiveCouchbaseDriver(as: ActorSystem, config: Configuration, log: Logge * @param query the actual query written in N1QL query language * @return a new N1QL query */ - def N1QL(query: String): N1QLQuery = { - CouchbaseN1QL.N1QL(query)(this.buckets.head._2) + def N1QL(query: String, authorizationFields: Option[AuthorizationFields]): N1QLQuery = { + CouchbaseN1QL.N1QL(query, authorizationFields)(this.buckets.head._2) } /** diff --git a/driver/src/main/scala/org/reactivecouchbase/N1QL.scala b/driver/src/main/scala/org/reactivecouchbase/N1QL.scala index ea67068..f7b1fd4 100644 --- a/driver/src/main/scala/org/reactivecouchbase/N1QL.scala +++ b/driver/src/main/scala/org/reactivecouchbase/N1QL.scala @@ -1,11 +1,15 @@ package org.reactivecouchbase +import com.ning.http.client.Realm.AuthScheme + import scala.concurrent._ import play.api.libs.iteratee._ import play.api.libs.json._ -import com.ning.http.client.{Response, AsyncCompletionHandler} +import com.ning.http.client.{Realm, AsyncHttpClient, Response, AsyncCompletionHandler} import org.reactivecouchbase.client.ReactiveCouchbaseException +case class AuthorizationFields(user: String, password: String) + /** * Container to run N1QL query against a Couchbase Server. Uses it's own HTTP client from CouchbaseBucket * @@ -14,7 +18,7 @@ import org.reactivecouchbase.client.ReactiveCouchbaseException * @param host the host on which N1QL server is running * @param port the port of the N1QL server */ -class N1QLQuery(bucket: CouchbaseBucket, query: String, host: String, port: String) { +class N1QLQuery(bucket: CouchbaseBucket, query: String, host: String, port: String, authorizationFields: Option[AuthorizationFields]) { /** * The N1QL URL for querying */ @@ -74,6 +78,22 @@ class N1QLQuery(bucket: CouchbaseBucket, query: String, host: String, port: Stri //Concurrent.unicast[T](onStart = c => enumerate[T](r, ec).map(_(Iteratee.foreach[T](c.push).map(_ => c.eofAndEnd())))) } + /** + * Create the Realm for authorization + * + * @param authorizationFields + * @return the Realm + */ + + def createRealm(authorizationFields: Option[AuthorizationFields]): Realm = { + new Realm.RealmBuilder() + .setPrincipal(authorizationFields.get.user) + .setPassword(authorizationFields.get.password) + .setUsePreemptiveAuth(true) + .setScheme(AuthScheme.BASIC) + .build() + } + /** * * Transform the query result as a JsArray @@ -83,7 +103,9 @@ class N1QLQuery(bucket: CouchbaseBucket, query: String, host: String, port: Stri */ def toJsArray(implicit ec: ExecutionContext): Future[JsArray] = { val result = Promise[JsValue]() - bucket.httpClient.preparePost(url).addQueryParameter("q", query).execute(new AsyncCompletionHandler[Response]() { + val builder = bucket.httpClient.preparePost(url).addQueryParam("statement", query) + val authorizedHttpBuilder = Option(authorizationFields).map(name => builder.setRealm(createRealm(name))).getOrElse(builder) + authorizedHttpBuilder.execute(new AsyncCompletionHandler[Response]() { override def onCompleted(response: Response) = { result.success(Json.parse(response.getResponseBody)) response @@ -139,8 +161,8 @@ object CouchbaseN1QL { * @param port the port of the N1QL server * @return the created N1QLQuery instance */ - def N1QL(bucket: CouchbaseBucket, query: String, host: String, port: String): N1QLQuery = { - new N1QLQuery(bucket, query, host, port) + def N1QL(bucket: CouchbaseBucket, query: String, host: String, port: String, authorizationFields: Option[AuthorizationFields]): N1QLQuery = { + new N1QLQuery(bucket, query, host, port, authorizationFields) } /** @@ -169,10 +191,10 @@ object CouchbaseN1QL { * @param bucket the targeted bucket * @return the created N1QLQuery instance */ - def N1QL(query: String)(implicit bucket: CouchbaseBucket): N1QLQuery = { + def N1QL(query: String, authorizationFields: Option[AuthorizationFields] )(implicit bucket: CouchbaseBucket): N1QLQuery = { val host = bucket.N1QLHost.getOrElse(throw new ReactiveCouchbaseException("Cannot find N1QL host", "Cannot find N1QL host in couchbase.n1ql conf.")) val port = bucket.N1QLPort.getOrElse(throw new ReactiveCouchbaseException("Cannot find N1QL port", "Cannot find N1QL port in couchbase.n1ql conf.")) - new N1QLQuery(bucket, query, host, port) + new N1QLQuery(bucket, query, host, port, authorizationFields) } } diff --git a/driver/src/main/scala/org/reactivecouchbase/experimental/Views.scala b/driver/src/main/scala/org/reactivecouchbase/experimental/Views.scala index 46c9e32..438a1a4 100644 --- a/driver/src/main/scala/org/reactivecouchbase/experimental/Views.scala +++ b/driver/src/main/scala/org/reactivecouchbase/experimental/Views.scala @@ -10,7 +10,7 @@ import org.reactivecouchbase.client.{ReactiveCouchbaseException, RawRow, QueryEn import akka.actor.{Props, ActorRef, ActorLogging, Actor} import akka.pattern._ import java.util.concurrent.atomic.AtomicReference -import akka.routing.RoundRobinRouter +import akka.routing.{RoundRobinPool} import akka.util.Timeout import java.util.concurrent.TimeUnit @@ -87,7 +87,7 @@ object Views { private[experimental] def pass(view: View, query: Query, bucket: CouchbaseBucket, ec: ExecutionContext): Future[JsArray] = { implicit val timeout = Timeout(bucket.ecTimeout, TimeUnit.MILLISECONDS) val ref: ActorRef = workers.get.getOrElse({ - workers.set(Some(bucket.cbDriver.system().actorOf(Props[QueryWorker].withRouter(RoundRobinRouter(bucket.workersNbr)), "queries-dispatcher"))) + workers.set(Some(bucket.cbDriver.system().actorOf(Props[QueryWorker].withRouter(RoundRobinPool(bucket.workersNbr)), "queries-dispatcher"))) workers.get().get }) (ref ? SendQuery(view, query, bucket, ec)).mapTo[JsArray] diff --git a/project/Build.scala b/project/Build.scala index abaadcd..2798b74 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -4,7 +4,7 @@ import Keys._ object ApplicationBuild extends Build { val appName = "ReactiveCouchbase-core" - val appVersion = "0.5-SNAPSHOT" + val appVersion = "0.6-SNAPSHOT" val appScalaVersion = "2.11.7" //val appScalaBinaryVersion = "2.10" val appScalaCrossVersions = Seq("2.11.1", "2.10.4") @@ -45,12 +45,12 @@ object ApplicationBuild extends Build { .settings( resolvers += "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/", libraryDependencies += "com.couchbase.client" % "couchbase-client" % "1.4.11", - libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.3.6" cross CrossVersion.binary, - libraryDependencies += "com.typesafe.play" %% "play-iteratees" % "2.3.10" cross CrossVersion.binary, - libraryDependencies += "com.typesafe.play" %% "play-json" % "2.3.10" cross CrossVersion.binary, - libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.1.1", - libraryDependencies += "com.ning" % "async-http-client" % "1.8.14", - libraryDependencies += "com.typesafe" % "config" % "1.2.1", + libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.1" cross CrossVersion.binary, + libraryDependencies += "com.typesafe.play" %% "play-iteratees" % "2.4.3" cross CrossVersion.binary, + libraryDependencies += "com.typesafe.play" %% "play-json" % "2.4.3" cross CrossVersion.binary, + libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.1.3", + libraryDependencies += "com.ning" % "async-http-client" % "1.9.33", + libraryDependencies += "com.typesafe" % "config" % "1.3.0", libraryDependencies += "org.specs2" %% "specs2" % "2.3.12" % "test" cross CrossVersion.binary, libraryDependencies += "com.codahale.metrics" % "metrics-core" % "3.0.1", organization := "org.reactivecouchbase", diff --git a/project/build.properties b/project/build.properties index 64abd37..a6e117b 100755 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=0.13.6 +sbt.version=0.13.8