Skip to content

Commit

Permalink
[finagle-core]: Pass through configured deadline params rather than u…
Browse files Browse the repository at this point in the history
…se default

Problem

We can set the `TimeoutFilter.PropagateDeadlines` and
`TimeoutFilter.PreferDeadlineOverTimeout` to configure behavior on the Client.
When the MethodBuilder interface is used, the `perRequestModule` does only look
at the `Default` values of these configurations, preventing custom configuration.

Solution

Pass the `TimeoutFilter.PropagateDeadlines` and
`TimeoutFilter.PreferDeadlineOverTimeout` as Params to the `perRequestModule` to
instantiate the `TimeoutFilter` with correct configuration.

Result

The MethodBuilder will pick up configured `TimeoutFilter.PropagateDeadlines`
and `TimeoutFilter.PreferDeadlineOverTimeout` parameters.

Closes #937

compile

Differential Revision: https://phabricator.twitter.biz/D1107765
  • Loading branch information
DieBauer authored and jenkins committed Nov 1, 2023
1 parent 610a21c commit aa73d41
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 11 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ Bug Fixes
* finagle: Deposit budget once in MethodBuilder ``PHAB_ID=D1107653``

* finagle: Pass through configured deadline params rather than use default ``PHAB_ID=D1107765``


22.12.0
-------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,31 +95,38 @@ object DynamicTimeout {
* @see [[LatencyCompensation]]
*/
def perRequestModule[Req, Rep]: Stackable[ServiceFactory[Req, Rep]] =
new Stack.Module3[
TimeoutFilter.Param,
param.Timer,
LatencyCompensation.Compensation,
new Stack.ModuleParams[
ServiceFactory[Req, Rep]
] {
val role: Stack.Role = TimeoutFilter.role
val description: String =
"Apply a dynamic timeout-derived deadline to request"

def make(
defaultTimeout: TimeoutFilter.Param,
timer: param.Timer,
compensation: LatencyCompensation.Compensation,
val parameters = Seq(
implicitly[Stack.Param[TimeoutFilter.Param]],
implicitly[Stack.Param[param.Timer]],
implicitly[Stack.Param[LatencyCompensation.Compensation]],
implicitly[Stack.Param[TimeoutFilter.PropagateDeadlines]],
implicitly[Stack.Param[TimeoutFilter.PreferDeadlineOverTimeout]]
)

override def make(
params: Stack.Params,
next: ServiceFactory[Req, Rep]
): ServiceFactory[Req, Rep] = {

val filter = new TimeoutFilter[Req, Rep](
timeoutFn(
PerRequestKey,
defaultTimeout.tunableTimeout,
params[TimeoutFilter.Param].tunableTimeout,
TimeoutFilter.Param.Default, // tunableTimeout() should always produce a value,
compensation.howlong // but we fall back on the default if not
params[
LatencyCompensation.Compensation].howlong // but we fall back on the default if not
),
duration => new IndividualRequestTimeoutException(duration),
timer.timer
params[param.Timer].timer,
params[TimeoutFilter.PropagateDeadlines].enabled,
params[TimeoutFilter.PreferDeadlineOverTimeout].enabled
)
filter.andThen(next)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@ import com.twitter.finagle.server.StackServer
import com.twitter.finagle.util.DefaultTimer
import com.twitter.finagle.mux
import com.twitter.finagle._
import com.twitter.finagle.service.TimeoutFilter.PropagateDeadlines
import com.twitter.util.Await
import com.twitter.util.Future

import java.net.InetSocketAddress
import org.scalatest.funsuite.AnyFunSuite

import java.util.concurrent.atomic.AtomicBoolean

class MethodBuilderTest extends AnyFunSuite {

private def await[T](f: Future[T]): T = Await.result(f, 15.seconds)
Expand Down Expand Up @@ -153,4 +157,38 @@ class MethodBuilderTest extends AnyFunSuite {
mux.Request.empty,
mux.Response.empty
)

test("Methodbuilder client does not propagate Deadlines") {
val deadlinePresent = new AtomicBoolean(true)
val service = Service.mk { request: http.Request =>
deadlinePresent.set(
request.headerMap.get("Finagle-Ctx-com.twitter.finagle.Deadline").isDefined)
Future.value(http.Response())
}

val server = Http.server
.serve("localhost:*", service)
val addr = server.boundAddress.asInstanceOf[InetSocketAddress]

val noPropagationClient = Http.client
.withLabel("backend-noprop")
.configured(PropagateDeadlines(false).mk())
.methodBuilder(s"${addr.getHostName}:${addr.getPort}")
.newService

val defaultClient = Http.client
.withLabel("backend")
.methodBuilder(s"${addr.getHostName}:${addr.getPort}")
.newService

await(noPropagationClient(http.Request("/")))
assert(!deadlinePresent.get())
await(defaultClient(http.Request("/")))
assert(deadlinePresent.get())

await(server.close())
await(noPropagationClient.close())
await(defaultClient.close())
}

}

0 comments on commit aa73d41

Please sign in to comment.