Skip to content

Commit

Permalink
Merge pull request #6 from tpgillam/tg/optimise_fixed
Browse files Browse the repository at this point in the history
Speed improvements
  • Loading branch information
tpgillam authored Aug 19, 2021
2 parents f61cf47 + 8dfbfbe commit 995f69b
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 83 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ jobs:
using Pkg
Pkg.develop(PackageSpec(path=pwd()))
Pkg.instantiate()'
- run: julia --project=docs docs/make.jl
- run: julia --project=docs --color=yes docs/make.jl
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
DOCUMENTER_KEY: ${{ secrets.DOCUMENTER_KEY }}
GKSwstype: "100" # https://discourse.julialang.org/t/generation-of-documentation-fails-qt-qpa-xcb-could-not-connect-to-display/60988
4 changes: 2 additions & 2 deletions Project.toml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
name = "AssociativeWindowAggregation"
uuid = "444271a7-5434-4a02-b82b-0e30a9223c60"
authors = ["Thomas Gillam <[email protected]>"]
version = "0.1.1"
version = "0.2.0"

[deps]
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"

[compat]
DataStructures = "0.18"
julia = "1.5, 1.6"
julia = "1.5"

[extras]
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
Expand Down
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,3 @@ Accumulate result of appying binary associative operators on rolling windows.
## Notes
- API is very preliminary and likely to change a lot.
- No attempts to optimise or otherwise benchmark the algorithms has been done (yet)!

## TODO
- Fixed window for simple types would benefit from statically allocated buffers that could be reused.
2 changes: 1 addition & 1 deletion docs/Project.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[deps]
AssociativeWindowAggregation = "444271a7-5434-4a02-b82b-0e30a9223c60"
Documenter = "e30172f5-a6a5-5a46-863b-614d45cd2de4"
Plots = "91a5bcdd-55d7-5caf-9e0b-520d859cae80"
AssociativeWindowAggregation = "444271a7-5434-4a02-b82b-0e30a9223c60"
8 changes: 5 additions & 3 deletions docs/src/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ Here we show a computation of the rolling mean over fixed windows. We use a
length, only including values where the window has filled.

```@example mean
using Plots
using AssociativeWindowAggregation
using Plots
x = range(1, 10; length=100)
y = sin.(x) + 0.5 * rand(length(x))
Expand All @@ -16,7 +16,7 @@ plot(x, y; label="raw", title="Rolling means")
for window in [5, 10, 20]
# Use this to keep track of a windowed sum.
state = FixedWindowAssociativeOp{Float64}(+, window)
state = FixedWindowAssociativeOp{Float64, +}(window)
z = []
for value in y
Expand All @@ -30,5 +30,7 @@ for window in [5, 10, 20]
plot!(x, z; label="mean $window", lw=2)
end
current()
savefig("mean-plot.svg"); nothing # hide
```

![](mean-plot.svg)
6 changes: 2 additions & 4 deletions examples/benchmark.jl
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
using AssociativeWindowAggregation
using BenchmarkTools


function run_example(window::Integer, num_points::Integer)
state = FixedWindowAssociativeOp{Int}(+, window)
state = FixedWindowAssociativeOp{Int,+}(window)
for _ in 1:num_points
update_state!(state, 1)
end
return window_value(state)
end


@benchmark run_example(10, 100) samples=100
@benchmark run_example(100, 10000)
27 changes: 10 additions & 17 deletions src/fixed_window_associative_op.jl
Original file line number Diff line number Diff line change
@@ -1,51 +1,44 @@
"""
FixedWindowAssociativeOp{T}
FixedWindowAssociativeOp{T,Op}
State necessary for accumulation over a rolling window of fixed size.
# Fields
- `window_state::WindowedAssociativeOp{T}`: The underlying general-window state.
- `window_state::WindowedAssociativeOp{T,Op}`: The underlying general-window state.
- `remaining_window::Int`: How much of the window remains to be filled. Initially this will
be set to the window size, and will then reduce for every value added until it reaches
zero.
"""
mutable struct FixedWindowAssociativeOp{T}
window_state::WindowedAssociativeOp{T}
mutable struct FixedWindowAssociativeOp{T,Op}
window_state::WindowedAssociativeOp{T,Op}
remaining_window::Int

"""
FixedWindowAssociativeOp{T}
FixedWindowAssociativeOp{T,Op}
Construct a new empty instance of `FixedWindowAssociativeOp`.
# Arguments
- `op::Function`: Any binary, associative, function.
- `window::Integer`: The fixed window size.
"""
function FixedWindowAssociativeOp{T}(op::Function, window::Integer) where T
function FixedWindowAssociativeOp{T,Op}(window::Integer) where {T,Op}
if window < 1
throw(ArgumentError("Got window $window, but it must be positive."))
end
return new(WindowedAssociativeOp{T}(op), window)
return new(WindowedAssociativeOp{T,Op}(), window)
end
end

"""
update_state!(
state::FixedWindowAssociativeOp{T},
value
)::FixedWindowAssociativeOp{T} where T
update_state!(state::FixedWindowAssociativeOp, value)
Add the specified `value` to the `state`. Drop a value from the window iff the window is
full.
# Returns
- `::FixedWindowAssociativeOp{T}`: The instance `state` that was passed in.
- `::FixedWindowAssociativeOp`: The instance `state` that was passed in.
"""
function update_state!(
state::FixedWindowAssociativeOp{T},
value
)::FixedWindowAssociativeOp{T} where T
function update_state!(state::FixedWindowAssociativeOp, value)
num_dropped_from_window = if state.remaining_window > 0
state.remaining_window -= 1
0
Expand Down
17 changes: 6 additions & 11 deletions src/time_window_associative_op.jl
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,20 @@ That is, at time t' this window represents the open-closed time interval (t' - w
- `window_full::Bool`: For internal use - will be set to true once a point has dropped out
of the window.
"""
mutable struct TimeWindowAssociativeOp{Value,Time,TimeDiff}
window_state::WindowedAssociativeOp{Value}
mutable struct TimeWindowAssociativeOp{Value,Op,Time,TimeDiff}
window_state::WindowedAssociativeOp{Value,Op}
window::TimeDiff
times::Deque{Time}
window_full::Bool

function TimeWindowAssociativeOp{Value,Time,TimeDiff}(
op::Function,
function TimeWindowAssociativeOp{Value,Op,Time,TimeDiff}(
window::TimeDiff
) where {Value,Time,TimeDiff}
) where {Value,Op,Time,TimeDiff}
if window <= zero(TimeDiff)
throw(ArgumentError("Got window $window, but it must be positive."))
end
return new(
WindowedAssociativeOp{Value}(op),
WindowedAssociativeOp{Value,Op}(),
window,
Deque{Time}(),
false
Expand All @@ -60,11 +59,7 @@ are no longer in the time window.
# Returns
- `::TimeWindowAssociativeOp{Value,Time,TimeDiff}`: `state`, which has been mutated.
"""
function update_state!(
state::TimeWindowAssociativeOp{Value,Time,TimeDiff},
time,
value
)::TimeWindowAssociativeOp{Value,Time,TimeDiff} where {Value,Time,TimeDiff}
function update_state!(state::TimeWindowAssociativeOp, time, value)
if !isempty(state.times) && time <= last(state.times)
throw(ArgumentError(
"Got out-of-order time $time. Previous time was $(last(state.times))"
Expand Down
87 changes: 53 additions & 34 deletions src/windowed_associative_op.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
WindowedAssociativeOp{T}
WindowedAssociativeOp{T,Op}
State associated with a windowed aggregation of a binary associative operator,
in a numerically accurate fashion.
Expand Down Expand Up @@ -32,56 +32,59 @@ cumulative sums. We create a new, empty, `B`.
`O(1)` amortized runtime complexity, and `O(L)` space complexity, where `L` is the typical
window length.
# Type parameters
- `T`: The type of the values of the array.
- `Op`: Any binary, associative, function.
# Fields
- `op::Function`: Any binary, associative, function.
- `previous_cumsum::Array{T, 1}`: Corresponds to array `A` above.
- `previous_cumsum::Vector{T}`: Corresponds to array `A` above.
- `ri_previous_cumsum::Int`: A reverse index into `previous_cumsum`, once it contains
values. It should be subtracted from `end` in order to obtain the appropriate index.
- `values::Array{T, 1}`: Corresponds to array `B` above.
- `values::Vector{T}`: Corresponds to array `B` above.
- `sum::Union{Nothing, T}`: The sum of the elements in values.
"""
mutable struct WindowedAssociativeOp{T}
op::Function
previous_cumsum::Array{T,1}
mutable struct WindowedAssociativeOp{T,Op}
previous_cumsum::Vector{T}
ri_previous_cumsum::Int
values::Array{T,1}
values::Vector{T}
sum::Union{Nothing,T}

"""
WindowedAssociativeOp{T}
WindowedAssociativeOp{T,Op}
Create a new, empty, instance of WindowedAssociativeOp.
# Arguments
- `op::Function`: Any binary, associative, function.
# Type parameters
- `T`: The type of the values of the array.
- `Op`: Any binary, associative, function.
"""
WindowedAssociativeOp{T}(op::Function) where T = new(op, T[], 0, T[], nothing)
WindowedAssociativeOp{T,Op}() where {T,Op} = new{T,Op}(T[], 0, T[], nothing)
end

"""
update_state!(
state::WindowedAssociativeOp{T},
state::WindowedAssociativeOp{T,Op},
value,
num_dropped_from_window::Integer
)::WindowedAssociativeOp{T} where T
)::WindowedAssociativeOp{T,Op} where {T,Op}
Add the specified value to the state, drop some number of elements from the start of the
window, and return `state` (which will have been mutated).
# Arguments
- `state::WindowedAssociativeOp{T}`: The state to update (will be mutated).
- `state::WindowedAssociativeOp{T,Op}`: The state to update (will be mutated).
- `value`: The value to add to the end of the window - must be convertible to a `T`.
- `num_dropped_from_window::Integer`: The number of elements to remove from the front of
the window.
# Returns
- `::WindowedAssociativeOp{T}`: The instance `state` that was passed in.
- `::WindowedAssociativeOp{T,Op}`: The instance `state` that was passed in.
"""
function update_state!(
state::WindowedAssociativeOp{T},
state::WindowedAssociativeOp{T,Op},
value,
num_dropped_from_window::Integer
)::WindowedAssociativeOp{T} where T
)::WindowedAssociativeOp{T,Op} where {T,Op}
# Our index into previous_cumsum is advanced by the number of values we drop from the
# window.
state.ri_previous_cumsum += num_dropped_from_window
Expand All @@ -102,9 +105,6 @@ function update_state!(
))
end

# TODO: Is there a copy here that we could avoid?
trimmed_reversed_values = state.values[end:-1:1 + num_values_to_remove]

# We now generate the partial sum, and set our index back to zero. values is also
# emptied, since its information is now reflected in previous_cumsum.
# NOTE: We need to take care here in the case of non-commutation. In accumulate, we
Expand All @@ -115,58 +115,77 @@ function update_state!(
# but we actually want:
#
# (x0, op(x1, x0), op(x2, op(x1, x0)), ...)
state.previous_cumsum = accumulate(
(x, y) -> state.op(y, x),
trimmed_reversed_values
)

# Conceptually the following code is equivalent to the following, however avoids
# unnecessary allocations:
# trimmed_reversed_values = state.values[end:-1:1 + num_values_to_remove]
# state.previous_cumsum = accumulate(
# (x, y) -> Op(y, x),
# trimmed_reversed_values
# )

empty!(state.previous_cumsum)
upper = length(state.values)
lower = 1 + num_values_to_remove
if (upper - lower) >= 0 # i.e. we have a non-zero range
i = upper
accumulation = @inbounds(state.values[i])
while true
push!(state.previous_cumsum, accumulation)
i -= 1
i >= lower || break
accumulation = Op(@inbounds(state.values[i]), accumulation)
end
end

state.ri_previous_cumsum = 0
state.values = T[]
empty!(state.values)
# state.sum is now garbage, but we are not going to use it before we recompute it.
end

# Include the new value in sum and values.
state.sum = length(state.values) == 0 ? value : state.op(state.sum, value)
state.sum = length(state.values) == 0 ? value : Op(state.sum, value)
push!(state.values, value)

return state
end


"""
window_value(state::WindowedAssociativeOp{T})::T where T
window_value(state::WindowedAssociativeOp{T,Op})::T where T
Get the value currently represented by the state.
# Arguments:
- `state::WindowedAssociativeOp{T}`: The state to query.
- `state::WindowedAssociativeOp{T,Op}`: The state to query.
# Returns:
- `T`: The result of aggregating over the values in the window.
"""
function window_value(state::WindowedAssociativeOp{T})::T where T
function window_value(state::WindowedAssociativeOp{T,Op})::T where {T,Op}
return if length(state.previous_cumsum) == 0
# The A buffer is empty, so we need only worry about the 'B' buffer.
state.sum
else
# Include contributions both from A and B buffers.
# Remember that we are indexing from the back.
index = length(state.previous_cumsum) - state.ri_previous_cumsum
state.op(state.previous_cumsum[index], state.sum)
Op(@inbounds(state.previous_cumsum[index]), state.sum)
end
end

"""
function window_size(state::WindowedAssociativeOp{T})::Int where T
function window_size(state::WindowedAssociativeOp)::Int
Get the current size of the window in `state`.
# Arguments:
- `state::WindowedAssociativeOp{T}`: The state to query.
- `state::WindowedAssociativeOp`: The state to query.
# Returns:
- `Int`: The current size of the window.
"""
function window_size(state::WindowedAssociativeOp{T})::Int where T
function window_size(state::WindowedAssociativeOp)::Int
return if length(state.previous_cumsum) == 0
# The A buffer is empty, so we need only worry about the 'B' buffer.
length(state.values)
Expand Down
6 changes: 3 additions & 3 deletions test/fixed_window_associative_op.jl
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
function test_fixed_window(values, op; approximate_equality::Bool=false)
T = typeof(first(values))
@test_throws ArgumentError FixedWindowAssociativeOp{T}(op, 0)
@test_throws ArgumentError FixedWindowAssociativeOp{T,op}(0)

for window in 1:2 * length(values)
state = FixedWindowAssociativeOp{T}(op, window)
state = FixedWindowAssociativeOp{T,op}(window)
for (i, value) in enumerate(values)
@test update_state!(state, value) == state

Expand Down Expand Up @@ -34,7 +34,7 @@ end
T = Int64
op = +
window = 2
state = FixedWindowAssociativeOp{T}(op, window)
state = FixedWindowAssociativeOp{T,op}(window)

@test update_state!(state, 3) == state
@test !window_full(state)
Expand Down
Loading

2 comments on commit 995f69b

@tpgillam
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/43165

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v0.2.0 -m "<description of version>" 995f69baa23463fec8414a697d7e01051a8dac35
git push origin v0.2.0

Please sign in to comment.