Skip to content

Recent stable dependency versions & n1ql authorization introduction #77

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions driver/src/main/scala/org/reactivecouchbase/Couchbase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,13 @@ class CouchbaseBucket( private[reactivecouchbase] val cbDriver: ReactiveCouchbas
* Configuration for HTTP client
*/
private[reactivecouchbase] val config: AsyncHttpClientConfig = new AsyncHttpClientConfig.Builder()
.setAllowPoolingConnection(cbDriver.configuration.getBoolean("couchbase.http.pool").getOrElse(true))
.setCompressionEnabled(cbDriver.configuration.getBoolean("couchbase.http.compression").getOrElse(true))
.setRequestTimeoutInMs(cbDriver.configuration.getInt("couchbase.http.reqtimeout").getOrElse(60000))
.setIdleConnectionInPoolTimeoutInMs(cbDriver.configuration.getInt("couchbase.http.idlepool").getOrElse(60000))
.setIdleConnectionTimeoutInMs(cbDriver.configuration.getInt("couchbase.http.idleconnection").getOrElse(60000))
.setMaximumConnectionsTotal(cbDriver.configuration.getInt("couchbase.http.maxTotalConnections").getOrElse(-1))
.setMaximumConnectionsPerHost(cbDriver.configuration.getInt("couchbase.http.maxConnectionsPerHost").getOrElse(-1))
.setAllowPoolingConnections(cbDriver.configuration.getBoolean("couchbase.http.pool").getOrElse(true))
.setCompressionEnforced(cbDriver.configuration.getBoolean("couchbase.http.compression").getOrElse(true))
.setRequestTimeout(cbDriver.configuration.getInt("couchbase.http.reqtimeout").getOrElse(60000))
.setPooledConnectionIdleTimeout(cbDriver.configuration.getInt("couchbase.http.idlepool").getOrElse(60000))
.setConnectTimeout(cbDriver.configuration.getInt("couchbase.http.idleconnection").getOrElse(60000))
.setMaxConnections(cbDriver.configuration.getInt("couchbase.http.maxTotalConnections").getOrElse(-1))
.setMaxConnectionsPerHost(cbDriver.configuration.getInt("couchbase.http.maxConnectionsPerHost").getOrElse(-1))
.build()

/**
Expand Down Expand Up @@ -290,8 +290,8 @@ class ReactiveCouchbaseDriver(as: ActorSystem, config: Configuration, log: Logge
* @param query the actual query written in N1QL query language
* @return a new N1QL query
*/
def N1QL(query: String): N1QLQuery = {
CouchbaseN1QL.N1QL(query)(this.buckets.head._2)
def N1QL(query: String, authorizationFields: Option[AuthorizationFields]): N1QLQuery = {
CouchbaseN1QL.N1QL(query, authorizationFields)(this.buckets.head._2)
}

/**
Expand Down
36 changes: 29 additions & 7 deletions driver/src/main/scala/org/reactivecouchbase/N1QL.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package org.reactivecouchbase

import com.ning.http.client.Realm.AuthScheme

import scala.concurrent._
import play.api.libs.iteratee._
import play.api.libs.json._
import com.ning.http.client.{Response, AsyncCompletionHandler}
import com.ning.http.client.{Realm, AsyncHttpClient, Response, AsyncCompletionHandler}
import org.reactivecouchbase.client.ReactiveCouchbaseException

case class AuthorizationFields(user: String, password: String)

/**
* Container to run N1QL query against a Couchbase Server. Uses it's own HTTP client from CouchbaseBucket
*
Expand All @@ -14,7 +18,7 @@ import org.reactivecouchbase.client.ReactiveCouchbaseException
* @param host the host on which N1QL server is running
* @param port the port of the N1QL server
*/
class N1QLQuery(bucket: CouchbaseBucket, query: String, host: String, port: String) {
class N1QLQuery(bucket: CouchbaseBucket, query: String, host: String, port: String, authorizationFields: Option[AuthorizationFields]) {
/**
* The N1QL URL for querying
*/
Expand Down Expand Up @@ -74,6 +78,22 @@ class N1QLQuery(bucket: CouchbaseBucket, query: String, host: String, port: Stri
//Concurrent.unicast[T](onStart = c => enumerate[T](r, ec).map(_(Iteratee.foreach[T](c.push).map(_ => c.eofAndEnd()))))
}

/**
* Create the Realm for authorization
*
* @param authorizationFields
* @return the Realm
*/

def createRealm(authorizationFields: Option[AuthorizationFields]): Realm = {
new Realm.RealmBuilder()
.setPrincipal(authorizationFields.get.user)
.setPassword(authorizationFields.get.password)
.setUsePreemptiveAuth(true)
.setScheme(AuthScheme.BASIC)
.build()
}

/**
*
* Transform the query result as a JsArray
Expand All @@ -83,7 +103,9 @@ class N1QLQuery(bucket: CouchbaseBucket, query: String, host: String, port: Stri
*/
def toJsArray(implicit ec: ExecutionContext): Future[JsArray] = {
val result = Promise[JsValue]()
bucket.httpClient.preparePost(url).addQueryParameter("q", query).execute(new AsyncCompletionHandler[Response]() {
val builder = bucket.httpClient.preparePost(url).addQueryParam("statement", query)
val authorizedHttpBuilder = Option(authorizationFields).map(name => builder.setRealm(createRealm(name))).getOrElse(builder)
authorizedHttpBuilder.execute(new AsyncCompletionHandler[Response]() {
override def onCompleted(response: Response) = {
result.success(Json.parse(response.getResponseBody))
response
Expand Down Expand Up @@ -139,8 +161,8 @@ object CouchbaseN1QL {
* @param port the port of the N1QL server
* @return the created N1QLQuery instance
*/
def N1QL(bucket: CouchbaseBucket, query: String, host: String, port: String): N1QLQuery = {
new N1QLQuery(bucket, query, host, port)
def N1QL(bucket: CouchbaseBucket, query: String, host: String, port: String, authorizationFields: Option[AuthorizationFields]): N1QLQuery = {
new N1QLQuery(bucket, query, host, port, authorizationFields)
}

/**
Expand Down Expand Up @@ -169,10 +191,10 @@ object CouchbaseN1QL {
* @param bucket the targeted bucket
* @return the created N1QLQuery instance
*/
def N1QL(query: String)(implicit bucket: CouchbaseBucket): N1QLQuery = {
def N1QL(query: String, authorizationFields: Option[AuthorizationFields] )(implicit bucket: CouchbaseBucket): N1QLQuery = {
val host = bucket.N1QLHost.getOrElse(throw new ReactiveCouchbaseException("Cannot find N1QL host", "Cannot find N1QL host in couchbase.n1ql conf."))
val port = bucket.N1QLPort.getOrElse(throw new ReactiveCouchbaseException("Cannot find N1QL port", "Cannot find N1QL port in couchbase.n1ql conf."))
new N1QLQuery(bucket, query, host, port)
new N1QLQuery(bucket, query, host, port, authorizationFields)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.reactivecouchbase.client.{ReactiveCouchbaseException, RawRow, QueryEn
import akka.actor.{Props, ActorRef, ActorLogging, Actor}
import akka.pattern._
import java.util.concurrent.atomic.AtomicReference
import akka.routing.RoundRobinRouter
import akka.routing.{RoundRobinPool}
import akka.util.Timeout
import java.util.concurrent.TimeUnit

Expand Down Expand Up @@ -87,7 +87,7 @@ object Views {
private[experimental] def pass(view: View, query: Query, bucket: CouchbaseBucket, ec: ExecutionContext): Future[JsArray] = {
implicit val timeout = Timeout(bucket.ecTimeout, TimeUnit.MILLISECONDS)
val ref: ActorRef = workers.get.getOrElse({
workers.set(Some(bucket.cbDriver.system().actorOf(Props[QueryWorker].withRouter(RoundRobinRouter(bucket.workersNbr)), "queries-dispatcher")))
workers.set(Some(bucket.cbDriver.system().actorOf(Props[QueryWorker].withRouter(RoundRobinPool(bucket.workersNbr)), "queries-dispatcher")))
workers.get().get
})
(ref ? SendQuery(view, query, bucket, ec)).mapTo[JsArray]
Expand Down
14 changes: 7 additions & 7 deletions project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import Keys._
object ApplicationBuild extends Build {

val appName = "ReactiveCouchbase-core"
val appVersion = "0.5-SNAPSHOT"
val appVersion = "0.6-SNAPSHOT"
val appScalaVersion = "2.11.7"
//val appScalaBinaryVersion = "2.10"
val appScalaCrossVersions = Seq("2.11.1", "2.10.4")
Expand Down Expand Up @@ -45,12 +45,12 @@ object ApplicationBuild extends Build {
.settings(
resolvers += "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/",
libraryDependencies += "com.couchbase.client" % "couchbase-client" % "1.4.11",
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.3.6" cross CrossVersion.binary,
libraryDependencies += "com.typesafe.play" %% "play-iteratees" % "2.3.10" cross CrossVersion.binary,
libraryDependencies += "com.typesafe.play" %% "play-json" % "2.3.10" cross CrossVersion.binary,
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.1.1",
libraryDependencies += "com.ning" % "async-http-client" % "1.8.14",
libraryDependencies += "com.typesafe" % "config" % "1.2.1",
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.1" cross CrossVersion.binary,
libraryDependencies += "com.typesafe.play" %% "play-iteratees" % "2.4.3" cross CrossVersion.binary,
libraryDependencies += "com.typesafe.play" %% "play-json" % "2.4.3" cross CrossVersion.binary,
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.1.3",
libraryDependencies += "com.ning" % "async-http-client" % "1.9.33",
libraryDependencies += "com.typesafe" % "config" % "1.3.0",
libraryDependencies += "org.specs2" %% "specs2" % "2.3.12" % "test" cross CrossVersion.binary,
libraryDependencies += "com.codahale.metrics" % "metrics-core" % "3.0.1",
organization := "org.reactivecouchbase",
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=0.13.6
sbt.version=0.13.8