-
Notifications
You must be signed in to change notification settings - Fork 34
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pipeline Optimization Strategies #1105
Comments
As part of follow-on cleanup for e-mission/e-mission-server#1014, I would like to see:
|
In #1098, However, it's not a bottleneck when running the pipeline locally, so I added local instrumentation that just measures the number of DB calls instead of measure the time it takes to execute. At the same time I have been working on implementing |
Yes, this would be a good refactor |
Before looking at pandas versus polars, I think we need to look at pandas in the first place. IIRC, a pretty big chunk of the code in trip segmentation iterates over the points in the trajectory one by one to check for the distance from the previous points. Switching to a vectorized operation (such as pandas.diff https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.diff.html) should make it much faster. I think that would be more low-hanging fruit than switching from pandas to polars. It will likely not affect DB calls, but should be faster computationally |
I'll explore this further to see if I can implement the current behavior with a vectorized operation. However, I am not sure if it will be possible to do that without significant changes to the segmentation logic. I am new to this and learning, but from what I've read it does seem that the reliance on row-wise iteration is likely the reason switching to |
Right, this is the reason why I haven't worked on it yet. I do think it is not that complicated, but it is not as simple as changing the arguments to a function and pre-loading data. |
I will take a look at this as well and do my own analysis with pandas. |
The current segmentation logic is as follows:
I am struggling with the fact that
If not for this, we could just perform an upfront computation of the max distances for all locations in last My next thought is that we can use recursion or a while loop. It would precompute distances but identify only one segment at a time. Then it would do it again, excluding the rows that were already segmented. |
Was this faster than the current point by point iteration? If so, by how much? I think that would be a good back-of-the-envelope check before we handle all the corner cases.
I assume this is
We should discuss this further, but let me outline how I think this would work at a high level. With the restructure, I think we would work in three stages:
With the approach above, if we have |
I can verify that after merging e-mission/e-mission-server#1014 I don't see any errors in the AWS logs. The only Although... it looks like we have no incoming data on stage (there is no match for |
How bad is it if the segmentation behavior is slightly different? What if it identifies the end of some trips one point earlier or later? For example, in DwellSegmentationTimeFilter one thing I noticed is that when determining what point gets marked as last point of the trip (https://github.com/e-mission/e-mission-server/blob/aaedcd3caf7551efa8af0088c8fa56158e1c9725/emission/analysis/intake/segmentation/trip_segmentation_methods/dwell_segmentation_time_filter.py#L314-L316), But there is a discrepancy because Usually |
This could work for false positives, but what about false negatives? On With the precomputed distances approach, we get 28, 30, 31, 49, 50, 51, 138, 139, 140, 197, 198, 208, 209, 240, 241, 298, 299, 300, 326 Why wasn't a trip end detected at 207? The reason for the discrepancy is that point 197 was excluded from the |
So maybe the move is to compute all the distances ahead of time, but not the This is not as optimal as computing all of it upfront, including the And although we still have to iterate, we should only need to iterate per segment rather than per point |
I think it would be fine if segmentation behavior is slightly different, as long as we have explored the differences and are convinced that they are small. We will need to regenerate the ground truth for the unit tests, but I have some code to do that from the time we switched to the gis branch as the master. As an aside, the current hacks and rules for trip segmentation are from my own data that I collected in the 2016-2017 timeframe. I have noticed that the segmentation rules don't work quite as well with more modern phones; I have a whole collection of "bad segmentation" trips from the pixel phone for example. So I think that we might actually want to have a deeper dive into that (maybe with a summer intern!) but I would like to do that separately from this scalability improvement.
I would be fine with that, as long as
Yup! You can compute the distances and the diffs, then use the diffs to segment through iteration, excluding previous points and doing the more heavyweight checks with the motion activity and transitions etc. Note that to compute the max, you wouldn't need to iterate over all the points, you can use the dataframe directly (something like And as you can also see, we end up with ~ 8 segments for ~ 350 points, so switching from |
TRIP_SEGMENTATION is much improved after: |
…tabase We have made several changes to cache summary information in the user profile. This summary information can be used to improve the scalability of the admin dashboard (e-mission/op-admin-dashboard#145) but will also be used to determine dormant deployment and potentially in a future OpenPATH-wide dashboard. These changes started by making a single call to cache both trip and call stats e-mission#1005 This resulted in all the composite trips being read every hour, so we split the stats into pipeline-dependent and pipeline-independent stats, in 88bb35a (part of e-mission#1005) The idea was that, since the composite object query was slow because the composite trips were large, we could run only the queries for the server API stats every hour, and read the composite trips only when they were updated. However, after the improvements to the trip segmentation pipeline (e-mission#1014, results in e-mission/e-mission-docs#1105 (comment)) reading the server stats is now the bottleneck. Checking the computation time on the big deployments (e.g. ccebikes), although the time taken has significantly improved as the database load has gone down, even in the past two days, we see a median of ~ 10 seconds and a max of over two minutes. And we don't really need to query this data to generate the call summary statistics. Instead of computing them on every run, we can compute them only when _they_ change, which happens when we receive calls to the API. So, in the API's after_request hook, in addition to adding a new stat, we can also just update the `last_call_ts` in the profile. This potentially makes every API call a teeny bit slower since we are adding one more DB write, but it significantly lowers the DB load, so should make the system as a whole faster. Testing done: - the modified `TestUserStat` test for the `last_call_ts` passes
|
I have a few ideas:
Caveat to (1): This will not work if |
(2) is possible by doing something like this:
The returned list of results will separated by “count” entries, allowing us to distinguish which results were for which locations Initially, I tested this by just querying the start and end locations together (2 points per query), and I saw a modest improvement (~25% faster) I also looked into (3) but did not find much opportunity for optimization. I checked the API responses and found that “relations”, and their children “members”, make up the vast majority of the results.
I checked the code to see where we use them, thinking that we may be querying more than we need.
I did find that we only use relation members that have a |
@JGreenlee I am not sure why this is not showing up in your instrumentation, but when I run the first round of trip segmentation fixes (pre-reading all data from the DB), the next slow task is section segmentation. I reset the pipelines for ccebikes and started up the intake pipeline with:
From the logs, the timings are:
In fact, two out of three processes are stuck in
I think that the next step, even before MODE_INFERENCE, would be to rewrite section segmentation also to read all entries from the database upfront, instead of reading on a point-by-point basis. I would also suggest that you instrument running the pipeline against a bigger dataset (e.g. ccebikes) with constrained docker resources (e.g. 2 CPU/2 GB RAM) to understand where the production bottlenecks are likely to be. |
Quick check after a few hours; the first process has now moved to
|
Quick check after a few more hours, all three processes are still stuck on the same
|
Ah, the third has now moved on to JUMP_SMOOTHING. But it took from 14:43 to 20:49, so around 6 hours
While the
|
The user being processed in
|
After waiting overnight, the
The other two have finished the user they were on, and have gotten stuck in the next |
I see. My instrumentation has been based on In hindsight, I should have looked closer at the logs before giving up on the large dumps. I didn't realize it was getting stuck on particular users (although that does make sense); I thought there was just too much data to get through in a reasonable time. Would you be able to send me some of the opcodes/ UUIDs that got stuck so I can reproduce this myself (without having to run it overnight)? |
@JGreenlee wait a minute - it looks like this behavior is specific to mongo 8 If I reset the pipeline, and then run the same code on mongo 4, all three of the users that were stuck earlier are done in under 15 mins.
There's a huge investigation into query plans in #1109 and I think we finally have a workaround. Regardless, given our current DB characteristics, I think that reading entries upfront is generally a good idea to avoid multiple small DB calls. |
…tabase We have made several changes to cache summary information in the user profile. This summary information can be used to improve the scalability of the admin dashboard (e-mission/op-admin-dashboard#145) but will also be used to determine dormant deployment and potentially in a future OpenPATH-wide dashboard. These changes started by making a single call to cache both trip and call stats e-mission#1005 This resulted in all the composite trips being read every hour, so we split the stats into pipeline-dependent and pipeline-independent stats, in 88bb35a (part of e-mission#1005) The idea was that, since the composite object query was slow because the composite trips were large, we could run only the queries for the server API stats every hour, and read the composite trips only when they were updated. However, after the improvements to the trip segmentation pipeline (e-mission#1014, results in e-mission/e-mission-docs#1105 (comment)) reading the server stats is now the bottleneck. Checking the computation time on the big deployments (e.g. ccebikes), although the time taken has significantly improved as the database load has gone down, even in the past two days, we see a median of ~ 10 seconds and a max of over two minutes. And we don't really need to query this data to generate the call summary statistics. Instead of computing them on every run, we can compute them only when _they_ change, which happens when we receive calls to the API. So, in the API's after_request hook, in addition to adding a new stat, we can also just update the `last_call_ts` in the profile. This potentially makes every API call a teeny bit slower since we are adding one more DB write, but it significantly lowers the DB load, so should make the system as a whole faster. Testing done: - the modified `TestUserStat` test for the `last_call_ts` passes
Previously, we just recorded the number of calls to # record a stat every time the DB is queried by monitoring the MongoDB client
import inspect
import time
from pymongo.monitoring import register, CommandListener
import emission.storage.decorations.stats_queries as esds
class QueryMonitor(CommandListener):
def started(self, event):
event_cmd = str(event.command)
call_stack = [f.function for f in inspect.stack()][12:]
if (
event.command_name in {"find", "aggregate", "insert", "update", "delete"}
and 'stats/pipeline_time' not in event_cmd
):
call_stack = [f.function for f in inspect.stack()][12:17]
esds.store_pipeline_time(None,
f'db_call/{event.command_name}',
time.time(),
str(call_stack))
def __init__(self): pass
def succeeded(self, _): pass
def failed(self, _): pass
register(QueryMonitor()) Which counts every DB call but also allows us to identify We think it would be beneficial to check this into the repo somewhere, but I am not sure the correct location for it |
Follow-on to the investigation in e-mission/e-mission-server#1032 (comment):
I have had Two users are stuck in MODE_INFERENCE for the last 1 and 2 hours respectively
For
|
I am considering stopping the run because it would probably take all weekend and I will not be home to keep an eye on it. After those are merged, it may become more practical to do side-by-side comparisons on large dumps. As for e-mission/e-mission-server#1032, it does drastically reduce the number of DB operations which significantly improves |
It could also be that the logs have rolled over. In that case, I would expect to see an |
we can do a before-and-after at the end of next week after e-mission/e-mission-server#1017 and e-mission/e-mission-server#1026 and e-mission/e-mission-server#1032 are merged |
While working on e-mission/e-mission-server#1032 I found that However, removing them significantly reduced the local execution time for ![]() ![]() ![]() The duplicate
|
e-mission/e-mission-server#1017 (comment)
|
e-mission/e-mission-server#1017 (comment)
|
reducing DB queries (e-mission/e-mission-server#1014)
The low-hanging fruit is to:
The changes implemented (so far as of 1/30) drastically reduce the number of DB queries during trip segmentation.
For a typical day of travel for a user (e.g. shankari_2015-07-22, shankari_2016-07-27) I measured about 4000 calls to
_get_entries_for_timeseries
, during trip segmentation alone!After the changes, this is cut down to about 50. Local pipeline runs are consistently, but only slightly, faster than before (about 10%)
I am hopeful that we will see more significant effects on stage / prod where DB calls are more of a bottleneck
polars
We (@TeachMeTW and I) suspect that another cause of slowness (which is more noticeable locally, where DB queries are not a bottleneck) is from
pandas
.polars
is much faster, butpandas
is used extensively in the pipeline, so it is not practical to change all usages at once. Switching topolars
would need to happen gradually or just in a few key places.I wanted to run some tests to see if the performance benefit is worth the effort. I extended
get_data_df
to support eitherpandas
orpolars
, thinking that we can support both for now and differentiate them by different variable names (example_df
vsexample_pldf
)builtin_timeseries.py
However, when I tested this on actual OpenPATH data, I ran into countless errors like this:
When types differ within a column,
pandas
automatically casts, butpolars
does not in the name of performance. I searched, but cannot find a flag to enable this behavior in polars, so there is no way to use this without patching the incongruent types in the dataFor now I have this hack in
builtin_timeseries.py
, which uses MongoDBaggregate
to correct the data types for a bunch of fields such thatpolars
will accept themIt's not a permanent solution but it's enough for me to continue experimenting with
polars
The text was updated successfully, but these errors were encountered: