Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffer some requests in order to reduce "over capacity" errors without also killing Elasticsearch #2481

Closed
Logic-32 opened this issue Apr 2, 2019 · 26 comments · Fixed by #2502

Comments

@Logic-32
Copy link
Contributor

Logic-32 commented Apr 2, 2019

Feature:
Update feature from #1760 to allow for a bounded-queue implementation that can buffer some requests so that a spike in requests doesn't cause the majority to drop but also still doesn't cause OutOfMemory errors.

Rational
From #2023:

High level plan is to replace the existing Semaphore in HttpCall with something that allows for a queue of some kind.

My original plan was to update the OkHttpClient.Builder in ZipkinElasticsearchOkHttpAutoConfiguration to have a Dispatcher with an ExecutorService that allowed for fixed-length queue. Then I realized that the Dispatcher maintains it's own unbounded queue (which is a real problem and should be fixed/configurable in OkHttp itself, but I digress) so the ExecutorService doesn't help.

Now my general idea is to just inspect the Dispatcher queue length to see if we exceed a certain threshold (this is where I want an autoconfigure property but can't easily get one) and drop requests if we do. Given that ZipkinElasticsearchOkHttpAutoConfiguration exists I assumed it'd be ok to have a setting from it accessible in HttpCall. But since I'm not seeing any obvious way to do the plumbing I suppose it's better to not. However, if I don't, then I'm not quite sure where else to put the threshold check.

I could always wrap everything up in an ExecutorService somewhere (like ElasticsearchSpanConsumer) so that things are easy to control. But I'm not sure how I'd properly transition all existing configurations over to using that without some unexpected behavior.

My main concern here is the usage of the HttpBulkIndexer in ElasticsearchSpanConsumer. The reason that is my primary concern is because of the issue that brought about #1760. The fix for that was, perhaps unfortunately, too broad and limited any interaction with ES. The change I'm proposing is more focused on the actual collection of spans which can cause back ups and OOM errors.

To emphasize one point: the purpose of the queue is to help work through spike-load and semi-sluggish flushing to ES. OkHttp's unbounded queue seems like the real source of the problem in #1760 and what I think the Semaphore was trying to work around and therefore what I want to as well but in a less aggressive way.

Example Scenario
200 different servers all decide to report a reasonable number of spans to Zipkin in the same instant. If ES_MAX_REQUESTS is left at it's default value, then only 64 of the requests will succeed. If that ES_MAX_REQUESTS is configured higher, then Elasticsearch could reject the bulk indexing request as a result of it's own queue filling up (especially if multiple zipkin-server instances are handling the load). This creates an awkward balancing act that can be solved by having a moderately sized, bounded queue in Zipkin to allow ES_MAX_REQUESTS to be set to something that won't kill ES, drop 70% of the requests, and not cause OOM errors.

@codefromthecrypt
Copy link
Member

I think you mention earlier a separate issue which was a flaw in the semaphore as it was implemented.. I still think this should be fixed independently and at the call abstraction.

one interesting food for thought. if this includes some concerns about okhttp dispatcher queue, it is similar to ones we had in brave (where traces get lost on backlog) to solve this, we intercept and use Call.Factory as opposed to OkHttpClient this technique might be irrelevant to the issue. openzipkin/brave#292

@codefromthecrypt
Copy link
Member

on the direct issue you are discussing here, it sounds like you want a buffer because there is some absolute number of in flight requests, and a surge will go over that, but you think that this surge is short term and ES will quiesce. Put another way, you think we can do a better job back logging than elasticsearch can (because the queue that is overloaded is a configurable length in elasticsearch). Is that correct? In your experience, how big does this queue need to go to be useful (outlast the capacity deficit) in terms of spans messages and bytes? you can look at collector metrics for an idea of how much you are dropping as that would likely tell you

@Logic-32
Copy link
Contributor Author

Logic-32 commented Apr 3, 2019

Follow up from conversation in #2023

... this type of issue you are working with seems a
half implementation of our asynchronous reporter and likely going to cause
problems that the reporter has solved already.

After further comments from you I realize you're talking about AsyncReporter which is in Zipkin, yes, but used in client-side instrumentations for actually reporting to Zipkin. I just want to confirm that we are using this on the client-side but I am not sure how we'd actually make use of it "in Zipkin." Though I do see how it could resolve my issues if we were able to.

I think you mention earlier a separate issue which was a flaw in the semaphore as it was implemented.. I still think this should be fixed independently and at the call abstraction.

Yes. But rather than fixing it independently it is my intended focus of this case. And from what I've been experimenting with, a Call abstraction is definitely helping :)

Put another way, you think we can do a better job back logging than elasticsearch can (because the queue that is overloaded is a configurable length in elasticsearch). Is that correct? In your experience, how big does this queue need to go to be useful (outlast the capacity deficit) in terms of spans messages and bytes? you can look at collector metrics for an idea of how much you are dropping as that would likely tell you

Yes, we do think the surge is short term. But even if it is an excessive surge or longer than anticipated we will still drop messages as needed to keep Zipkin from going OOM. We're not sure how big the queue will need to be at the moment. Unfortunately, Prometheus doesn't seem to track zipkin_collector_messages_dropped_total properly or we are only dropping spans and not messages. Initial guess from other metrics would be a queue of 100-ish would be ok but I'm starting my experiment with 500. Note: this is number of messages, not spans. I'm not sure what the byte limit on that would be due to the variety in number of spans per message. And since ByteBoundedQueue isn't a BlockingQueue I can't use it to further ensure no OOM errors happen :(

From Collector surge and error handling

It is possible that for non-buffered transports like http, we could have a similar async memory queue (possibly the same implementation as application) to coalesce storage requests into fixed size and rate outbound requests to storage. Clearing this queue could be done via an async SpanConsumer call, possibly itself controlled by a semaphore. This buffer can change the problem of http into one similar to how we address kafka, except that the queue is not durable or checkpointed etc.

I should further elaborate on my 200 server example: in our case, each of those 200 servers will actually be starting/stopping various, independent JVM processes. Meaning that unlike a single, long-lived web server process, there is no central control point to rate-limit requests to storage. So, while we are using AsyncReporter, there are actually something more like 600 reporters doing the reporting.

Lots of chaos, not a lot of control :\ But zipkin-server, at least for us, does have some heap to spare. Since it is our central point, it seems like we can better do some flow-control there instead of having to stand up/support Kafka.

@codefromthecrypt
Copy link
Member

Note that ByteBoundedQueue (the backend) is fully intended to stop OOM as it is bounded. We chose to drop instead of block, so the problem is less OOM and more if you want more than you allocated, but I'm not sure that doesn't defeat the point of a bounded q :P

@codefromthecrypt
Copy link
Member

@Logic-32 One thing discussed offline is basically I think you want the advantage of a queue, but you don't want to run kafka, rabbit or another queue? The problem is this implies exactly the same complexity here and we are smaller than the Kafka, elasticsearch community etc. It isn't necessarily fair to try to make zipkin also a bounded queue implementor.

One alternative you could consider is that we have demand for activemq #2466 which is easy to run, and embeddable. It is possible that we could invert the flow (push to poll) without you needing a custom queue implementation. Can you think about this option?

@codefromthecrypt
Copy link
Member

for a custom implementation here I think the closest match to what we can support is re-using what we do client-side somehow. The problem space is exactly the same as client side and it took years to get the async reporter correct (to the degree it is). However, the client reporter also drops spans on problems, so any other handling would need to be considered independently as already mentioned (evaluating which conditions one should push back a message, to avoid putting a poison one back on the queue)

@Logic-32
Copy link
Contributor Author

Logic-32 commented Apr 4, 2019

One thing discussed offline is basically I think you want the advantage of a queue, but you don't want to run kafka, rabbit or another queue? The problem is this implies exactly the same complexity here and we are smaller than the Kafka, elasticsearch community etc. It isn't necessarily fair to try to make zipkin also a bounded queue implementor.

I feel as though we're on slightly different pages in some respect. To hopefully help clarify, yes, I think using Kafka (or RMQ, which our company does use already) would be a better solution overall. However, as #2023 showed, even pushing with Kafka right now can cause the "over capacity" message as a result of not being able to push back on the MQ and tell it to slow down. Even ignoring that case, there are probably others using RPC that have this issue as well.

My goal here is not to reinvent the MQ wheel but simply allow for a little more tolerance in how quickly the "over capacity" error is thrown. How familiar with ExecutorServices are you? They actually have a very convenient feature for this behavior. Take TheadPoolExecutor for instance; looking at the constructor arguments:

  • BlockingQueue (workQueue): this can be a "bounded queue" like LinkedBlockingQueue. In this sense, "bounded" means a hard-limit (which can be passed in via AutoConfiguration) on the total number of Runnables (messages) in it. Realistically, there is probably no reason ByteBoundedQueue can't be updated to implement BlockingQueue and we use that in the ExecutorService instead but that feels a little more like trying to reinvent an MQ protocol.
  • RejectedExecutionHandler (handler): when the ExecutorService is fully saturated and the queue is full, this determines how to handle additional Runnables that tried to get enqueued but can't be. The default policy being AbortPolicy

Given that review, I want to restate that my goal is not necessarily to have any kind of excessively-reliable queue but merely a moderate buffer to reduce issues from spike-load. "Over capacity" errors (or, RejectedExecutionExceptions in this case) will still get thrown but hopefully less frequently. No retries will be attempted (though, I believe this would give us a hook for retries if that is something we wanted to pursue later).

I also understand this may not be a feature the Zipkin team wants to maintain internally and am fully comfortable having a pull request rejected.

It is possible that we could invert the flow (push to poll) without you needing a custom queue implementation. Can you think about this option?

You mean have Zipkin poll for messages from something instead of directly issuing RPC requests to it? Zipkin already has support for various MQs so I would suggest simply switching from RPC to an AMQ protocol before going down that road. Or did I misunderstand the question?

Lastly, I'm definitely more than happy to discuss design options here as this discussion has already yielded a solution I'm personally more happy with. However, since I think communication sometimes works better in code, I would at least like to send you a patch/pull request for review before wholesale rejecting the idea.

@devinsba
Copy link
Member

devinsba commented Apr 4, 2019

Based on previous discussion here and the issue you linked above #2023 my opinion (and I think partially Adrian's) is we should solve this at the library and configuration level, where it applies to all collectors that are pull based and all storage options. HTTP collector being mostly an outlier here, most collectors are pull based.

The intent being that all storage engines can create a well known error that can then be taken as a signal at the collector level to both retry the request that was dropped (ie: don't advance our cursor) and slow down.

@Logic-32
Copy link
Contributor Author

Logic-32 commented Apr 4, 2019

So focus my changes on ZipkinHttpCollector instead of ElasticsearchSpanConsumer? That definitely seems like a plausible thing to attempt. I'm thinking just have it wrap whatever Collector it gets in one which uses the ExecutorService to throttle things still. Then put the AutoConfig properties for queue size/etc... under zipkin.collector.http? The only catch with doing that is, I'm not sure how I'd make sure the number of requests the Collector can attempt doesn't get out of sync with zipkin.elasticsearch.max-requests?

The intent being that all storage engines can create a well known error that can then be taken as a signal at the collector level to both retry the request that was dropped (ie: don't advance our cursor) and slow down.

I take that as: if we are "over capacity", make sure Callback.onError() sets a status of 429 (Too Many Requests) on the HttpServerExchange (instead of 500) or something similar. Does that sound reasonable?

@codefromthecrypt
Copy link
Member

If we had a reliable signal, and we were acting on it in the server, and there was a smart way to flip the state to green, we could push it back yes.

Keep in mind, the POST endpoint is async: the response is sent prior to the storage consumer. So there would be some non-synchronous delay sending that back (ex an atomic state variable checked before consuming the request).

@codefromthecrypt
Copy link
Member

@Logic-32 any chance you can help us spend some synchronous time (pun intended) at our upcoming workshop to review any work you've done here? Eventhough the focus is UI, we can carve time (just grab a slot that works for you). https://cwiki.apache.org/confluence/display/ZIPKIN/2019-04-17+UX+workshop+and+Lens+GA+planning+at+LINE+Fukuoka

@codefromthecrypt
Copy link
Member

codefromthecrypt commented Apr 5, 2019 via email

@codefromthecrypt
Copy link
Member

codefromthecrypt commented Apr 5, 2019 via email

@Logic-32
Copy link
Contributor Author

Logic-32 commented Apr 6, 2019

Keep in mind, the POST endpoint is async: the response is sent prior to the storage consumer. So there would be some non-synchronous delay sending that back (ex an atomic state variable checked before consuming the request).

True, it is async. But the determination of whether we're over capacity (at least according to the current Semaphore implementation) is made before things actually go async. So sending a 429 is still possible.

... any chance you can help us spend some synchronous time (pun intended) at our upcoming workshop ...

I'm UTC-6 and I see you have 10:30 picked out for this? 15:30 would be the earliest I could do and hope that work doesn't get in my way. So basically, I'd like to help but would need to be awake to do so ;) (Edit: 10:30 am = 7:30 pm, was looking at wrong time conversion earlier; no promise on making this time but I'll try)

for here we have a lot of unfinished work to complete and you can see
change related to what you discuss in our PR queue. for example the Netflix
rate limiter PR. many change like this become abandoned which is why I try
to make a wiki this time to properly inventory things because sometimes
people forget common problems and explaining across N issues takes our time
away.

Thank you for reminding me to go have a look at those! I didn't realize work was already being done on this since nothing was called out in the other issue. Both #2169 and #2166 are definitely "stepping on my toes" (or me theirs, but the point remains). I left a comment on 3ba76c2#r272783016, hopefully we can work to come up with a mutual solution.

the main concern is again exactly the same as the asynchronous reporter.
you have a bundling concern (which translates to how many spans per bulk
request). above that you have a concern of how many simultaneous bulk
requests (that the semaphore should control).

you also want some other things like error handling to not drop spans etc.
I would recommend splitting problems into parts this avoids a lot of
rehashing as you notice the issue is just like many others and some of this
takes longer to rehash than code.

If it helps ease your concerns, I could always have the Queue default to a size of 0. Then, the ExecutorService would act exactly like the Semaphore with the exception of it not being in HttpCall but a higher layer that is probably more appropriate. Outside of that, I definitely agree there are potentially many issues here. Hopefully collaborating on one of the above mentioned pull requests will result in something more palatable. Either way, I will try to help with the wiki page you put together for this and the workshop as best I can. But I'm afraid I don't have the capacity to assist with much outside of the spike load issue (meaning no push back to try and throttle things on the Reporter side) outside of assisting in integration with the other existing pull requests.

@codefromthecrypt
Copy link
Member

codefromthecrypt commented Apr 6, 2019 via email

@Logic-32
Copy link
Contributor Author

Logic-32 commented Apr 9, 2019

@adriancole, I just signed up as "logic32" :)

@codefromthecrypt
Copy link
Member

codefromthecrypt commented Apr 10, 2019 via email

@codefromthecrypt
Copy link
Member

@Logic-32 PS your account is somewhat.. anonymous :)

As this repository will shortly move to the Apache Software Foundation (ASF) and I suspect you will end up with non-trivial code to merge, you should probably prepare by sending a contributor agreement to the ASF. If you have already done this in the past, no work to do. This is a once in life thing.

If you haven't already, easiest way is to download the ICLA template. After filling the icla.pdf with personal information correctly, print, sign, scan, and send it in mail as an attachment to the secretary ([email protected])

@Logic-32
Copy link
Contributor Author

This is the internet, not a job interview :P No offense of course; if there is strong motivation to change I can :)

I've seen the plans for moving to ASF but haven't been following it. Would it be acceptable to open/review a pull request before filling that out? Hate to go through the effort of signing up for that if my work doesn't make the cut ;)

Also, while I'm here, one issue that will come up in the review is that of overriding other settings. Is there any precedent for, say, having a "global" max-concurrency setting which would override zipkin.storage.elasticsearch.max-requests for instance? I see there are other "max-active" and like settings. If we're going to have something that controls how many items get access to storage at a given instant then it makes sense that the settings for them remain in sync. Though, there is certainly no obligation to do so. Thoughts/opinions?

@codefromthecrypt
Copy link
Member

codefromthecrypt commented Apr 11, 2019 via email

@Logic-32
Copy link
Contributor Author

Attaching some very early results of running my changes in production (wanted to make sure these were up in time for the LINE workshop).

Before:
before

After:
after

Results are 24 hours apart. New metrics:

  • Throttle concurrency/limit = how many Threads Netflix thinks we can run
  • Throttle in flight requests = concurrency + queue size; should be the same most of the time but is mostly just a check to make sure we're resolving all our LimitListeners and not leaking anything.

You can see the Drop Rate spikes are not only fewer farther apart but thinner with no appreciable impact on Response Times or Heap.

@codefromthecrypt
Copy link
Member

codefromthecrypt commented Apr 17, 2019 via email

Logic-32 added a commit to Logic-32/zipkin that referenced this issue Apr 17, 2019
Adding storage-throttle module/etc. to contain logic for wrapping other storage implementations and limiting the number of requests that can go through to them at a given time.
Elasticsearch storage's maxRequests can be override by throttle properties if the throttle is enabled.
Making sure RejectedExecutionExceptions are "first class" citizens since they are used to reduce the throttle.
Removing HttpCall's Semaphore in favor of the throttle (same purpose, different implementations).
Inspired by work done on openzipkin#2169.
@Logic-32
Copy link
Contributor Author

Just a head's up, I've got the ICLA signed but am waiting on feedback from my company's legal department to make sure I don't violate condition 4.

@codefromthecrypt
Copy link
Member

codefromthecrypt commented Apr 20, 2019 via email

@Logic-32
Copy link
Contributor Author

ICLA signed and emailed :)

Logic-32 added a commit to Logic-32/zipkin that referenced this issue Apr 24, 2019
Adding storage-throttle module/etc. to contain logic for wrapping other storage implementations and limiting the number of requests that can go through to them at a given time.
Elasticsearch storage's maxRequests can be override by throttle properties if the throttle is enabled.
Inspired by work done on openzipkin#2169.
Logic-32 added a commit to Logic-32/zipkin that referenced this issue May 3, 2019
Adding ThrottledStorageComponent/etc. to contain logic for wrapping other storage implementations and limiting the number of requests that can go through to them at a given time.
Elasticsearch storage's maxRequests can be override by throttle properties if the throttle is enabled.
Inspired by work done on openzipkin#2169.
codefromthecrypt pushed a commit to Logic-32/zipkin that referenced this issue May 8, 2019
Adding ThrottledStorageComponent/etc. to contain logic for wrapping other storage implementations and limiting the number of requests that can go through to them at a given time.
Elasticsearch storage's maxRequests can be override by throttle properties if the throttle is enabled.
Inspired by work done on openzipkin#2169.
@codefromthecrypt
Copy link
Member

#2502 now includes test instructions please give a try

codefromthecrypt pushed a commit to Logic-32/zipkin that referenced this issue May 10, 2019
Adding ThrottledStorageComponent/etc. to contain logic for wrapping other storage implementations and limiting the number of requests that can go through to them at a given time.
Elasticsearch storage's maxRequests can be override by throttle properties if the throttle is enabled.
Inspired by work done on openzipkin#2169.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants