Skip to content

Improve parallel speedup on pipelined benchmark #761

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,12 @@ The *disk cache policy* determines if lookup operations use the OS page
cache. Caching may improve the performance of lookups and updates if
database access follows certain patterns.

`confMergeBatchSize`
The merge batch size balances the maximum latency of individual update
operations, versus the latency of a sequence of update operations.
Bigger batches improves overall performance but some updates will take a
lot longer than others. The default is to use a large batch size.

##### Fine-tuning: Merge Policy, Size Ratio, and Write Buffer Size <span id="fine_tuning_data_layout" class="anchor"></span>

The configuration parameters `confMergePolicy`, `confSizeRatio`, and
Expand Down Expand Up @@ -647,6 +653,33 @@ locality if it is likely to access entries that have nearby keys.
does not have good spatial or temporal locality. For instance, if the
access pattern is uniformly random.

##### Fine-tuning: Merge Batch Size <span id="fine_tuning_merge_batch_size" class="anchor"></span>

The *merge batch size* is a micro-tuning parameter, and in most cases
you do need to think about it and can leave it at its default.

When using the `Incremental` merge schedule, merging is done in batches.
This is a trade-off: larger batches tends to mean better overall
performance but the downside is that while most updates (inserts,
deletes, upserts) are fast, some are slower (when a batch of merging
work has to be done).

If you care most about the maximum latency of updates, then use a small
batch size. If you don't care about latency of individual operations,
just the latency of the overall sequence of operations then use a large
batch size. The default is to use a large batch size, the same size as
the write buffer itself. The minimum batch size is 1. The maximum batch
size is the size of the write buffer `confWriteBufferAlloc`.

Note that the actual batch size is the minimum of this configuration
parameter and the size of the batch of operations performed (e.g.
`inserts`). So if you consistently use large batches, you can use a
batch size of 1 and the merge batch size will always be determined by
the operation batch size.

A further reason why it may be preferable to use minimal batch sizes is
to get good parallel work balance, when using parallelism.

### References

The implementation of LSM-trees in this package draws inspiration from:
Expand Down
26 changes: 19 additions & 7 deletions bench/macro/lsm-tree-bench-wp8.hs
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,23 @@ mkTableConfigSetup GlobalOpts{diskCachePolicy} SetupOpts{bloomFilterAlloc} conf
, LSM.confBloomFilterAlloc = bloomFilterAlloc
}

mkTableConfigRun :: GlobalOpts -> LSM.TableConfig -> LSM.TableConfig
mkTableConfigRun GlobalOpts{diskCachePolicy} conf = conf {
LSM.confDiskCachePolicy = diskCachePolicy
mkTableConfigRun :: GlobalOpts -> RunOpts -> LSM.TableConfig -> LSM.TableConfig
mkTableConfigRun GlobalOpts{diskCachePolicy} RunOpts {pipelined} conf =
conf {
LSM.confDiskCachePolicy = diskCachePolicy,
LSM.confMergeBatchSize = if pipelined
then LSM.MergeBatchSize 1
else LSM.confMergeBatchSize conf
}

mkOverrideDiskCachePolicy :: GlobalOpts -> LSM.OverrideDiskCachePolicy
mkOverrideDiskCachePolicy GlobalOpts{diskCachePolicy} = LSM.OverrideDiskCachePolicy diskCachePolicy
mkTableConfigOverride :: GlobalOpts -> RunOpts -> LSM.TableConfigOverride
mkTableConfigOverride GlobalOpts{diskCachePolicy} RunOpts {pipelined} =
LSM.noTableConfigOverride {
LSM.overrideDiskCachePolicy = Just diskCachePolicy,
LSM.overrideMergeBatchSize = if pipelined
then Just (LSM.MergeBatchSize 1)
else Nothing
}

mkTracer :: GlobalOpts -> Tracer IO LSM.LSMTreeTrace
mkTracer gopts
Expand Down Expand Up @@ -582,8 +592,10 @@ doRun gopts opts = do
-- reference version starts with empty (as it's not practical or
-- necessary for testing to load the whole snapshot).
tbl <- if check opts
then LSM.newTableWith @IO @K @V @B (mkTableConfigRun gopts benchTableConfig) session
else LSM.openTableFromSnapshotWith @IO @K @V @B (mkOverrideDiskCachePolicy gopts) session name label
then let conf = mkTableConfigRun gopts opts benchTableConfig
in LSM.newTableWith @IO @K @V @B conf session
else let conf = mkTableConfigOverride gopts opts
in LSM.openTableFromSnapshotWith @IO @K @V @B conf session name label

-- In checking mode, compare each output against a pure reference.
checkvar <- newIORef $ pureReference
Expand Down
31 changes: 31 additions & 0 deletions lsm-tree.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ description:
The /disk cache policy/ determines if lookup operations use the OS page cache.
Caching may improve the performance of lookups and updates if database access follows certain patterns.

[@confMergeBatchSize@]
The merge batch size balances the maximum latency of individual update
operations, versus the latency of a sequence of update operations. Bigger
batches improves overall performance but some updates will take a lot
longer than others. The default is to use a large batch size.

==== Fine-tuning: Merge Policy, Size Ratio, and Write Buffer Size #fine_tuning_data_layout#

The configuration parameters @confMergePolicy@, @confSizeRatio@, and @confWriteBufferAlloc@ affect how the table organises its data.
Expand Down Expand Up @@ -429,6 +435,31 @@ description:
* Use the @DiskCacheNone@ policy if the database's access pattern has does not have good spatial or temporal locality.
For instance, if the access pattern is uniformly random.

==== Fine-tuning: Merge Batch Size #fine_tuning_merge_batch_size#

The /merge batch size/ is a micro-tuning parameter, and in most cases you do
need to think about it and can leave it at its default.

When using the 'Incremental' merge schedule, merging is done in batches. This
is a trade-off: larger batches tends to mean better overall performance but the
downside is that while most updates (inserts, deletes, upserts) are fast, some
are slower (when a batch of merging work has to be done).

If you care most about the maximum latency of updates, then use a small batch
size. If you don't care about latency of individual operations, just the
latency of the overall sequence of operations then use a large batch size. The
default is to use a large batch size, the same size as the write buffer itself.
The minimum batch size is 1. The maximum batch size is the size of the write
buffer 'confWriteBufferAlloc'.

Note that the actual batch size is the minimum of this configuration
parameter and the size of the batch of operations performed (e.g. 'inserts').
So if you consistently use large batches, you can use a batch size of 1 and
the merge batch size will always be determined by the operation batch size.

A further reason why it may be preferable to use minimal batch sizes is to get
good parallel work balance, when using parallelism.

== References

The implementation of LSM-trees in this package draws inspiration from:
Expand Down
3 changes: 3 additions & 0 deletions src-extras/Database/LSMTree/Extras/NoThunks.hs
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,9 @@ deriving anyclass instance NoThunks DiskCachePolicy
deriving stock instance Generic MergeSchedule
deriving anyclass instance NoThunks MergeSchedule

deriving stock instance Generic MergeBatchSize
deriving anyclass instance NoThunks MergeBatchSize

{-------------------------------------------------------------------------------
RWVar
-------------------------------------------------------------------------------}
Expand Down
26 changes: 15 additions & 11 deletions src/Database/LSMTree.hs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ module Database.LSMTree (
confBloomFilterAlloc,
confFencePointerIndex,
confDiskCachePolicy,
confMergeSchedule
confMergeSchedule,
confMergeBatchSize
),
defaultTableConfig,
MergePolicy (LazyLevelling),
Expand All @@ -119,9 +120,11 @@ module Database.LSMTree (
BloomFilterAlloc (AllocFixed, AllocRequestFPR),
FencePointerIndexType (OrdinaryIndex, CompactIndex),
DiskCachePolicy (..),
MergeBatchSize (..),

-- ** Table Configuration Overrides #table_configuration_overrides#
OverrideDiskCachePolicy (..),
TableConfigOverride (..),
noTableConfigOverride,

-- * Ranges #ranges#
Range (..),
Expand Down Expand Up @@ -214,11 +217,12 @@ import qualified Database.LSMTree.Internal.BlobRef as Internal
import Database.LSMTree.Internal.Config
(BloomFilterAlloc (AllocFixed, AllocRequestFPR),
DiskCachePolicy (..), FencePointerIndexType (..),
LevelNo (..), MergePolicy (..), MergeSchedule (..),
SizeRatio (..), TableConfig (..), WriteBufferAlloc (..),
defaultTableConfig, serialiseKeyMinimalSize)
LevelNo (..), MergeBatchSize (..), MergePolicy (..),
MergeSchedule (..), SizeRatio (..), TableConfig (..),
WriteBufferAlloc (..), defaultTableConfig,
serialiseKeyMinimalSize)
import Database.LSMTree.Internal.Config.Override
(OverrideDiskCachePolicy (..))
(TableConfigOverride (..), noTableConfigOverride)
import Database.LSMTree.Internal.Entry (NumEntries (..))
import qualified Database.LSMTree.Internal.Entry as Entry
import Database.LSMTree.Internal.Merge (LevelMergeType (..))
Expand Down Expand Up @@ -2400,7 +2404,7 @@ Variant of 'withTableFromSnapshot' that accepts [table configuration overrides](
withTableFromSnapshotWith ::
forall k v b a.
(ResolveValue v) =>
OverrideDiskCachePolicy ->
TableConfigOverride ->
Session IO ->
SnapshotName ->
SnapshotLabel ->
Expand All @@ -2411,7 +2415,7 @@ withTableFromSnapshotWith ::
forall m k v b a.
(IOLike m) =>
(ResolveValue v) =>
OverrideDiskCachePolicy ->
TableConfigOverride ->
Session m ->
SnapshotName ->
SnapshotLabel ->
Expand Down Expand Up @@ -2475,7 +2479,7 @@ openTableFromSnapshot ::
SnapshotLabel ->
m (Table m k v b)
openTableFromSnapshot session snapName snapLabel =
openTableFromSnapshotWith NoOverrideDiskCachePolicy session snapName snapLabel
openTableFromSnapshotWith noTableConfigOverride session snapName snapLabel

{- |
Variant of 'openTableFromSnapshot' that accepts [table configuration overrides](#g:table_configuration_overrides).
Expand All @@ -2484,7 +2488,7 @@ Variant of 'openTableFromSnapshot' that accepts [table configuration overrides](
openTableFromSnapshotWith ::
forall k v b.
(ResolveValue v) =>
OverrideDiskCachePolicy ->
TableConfigOverride ->
Session IO ->
SnapshotName ->
SnapshotLabel ->
Expand All @@ -2494,7 +2498,7 @@ openTableFromSnapshotWith ::
forall m k v b.
(IOLike m) =>
(ResolveValue v) =>
OverrideDiskCachePolicy ->
TableConfigOverride ->
Session m ->
SnapshotName ->
SnapshotLabel ->
Expand Down
68 changes: 66 additions & 2 deletions src/Database/LSMTree/Internal/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@ module Database.LSMTree.Internal.Config (
, diskCachePolicyForLevel
-- * Merge schedule
, MergeSchedule (..)
-- * Merge batch size
, MergeBatchSize (..)
, creditThresholdForLevel
) where

import Control.DeepSeq (NFData (..))
import Database.LSMTree.Internal.Index (IndexType)
import qualified Database.LSMTree.Internal.Index as Index
(IndexType (Compact, Ordinary))
import qualified Database.LSMTree.Internal.MergingRun as MR
import qualified Database.LSMTree.Internal.RawBytes as RB
import Database.LSMTree.Internal.Run (RunDataCaching (..))
import Database.LSMTree.Internal.RunAcc (RunBloomFilterAlloc (..))
Expand Down Expand Up @@ -90,6 +94,12 @@ For a detailed discussion of fine-tuning the table configuration, see [Fine-tuni
[@confDiskCachePolicy :: t'DiskCachePolicy'@]
The /disk cache policy/ supports caching lookup operations using the OS page cache.
Caching may improve the performance of lookups and updates if database access follows certain patterns.

[@confMergeBatchSize :: t'MergeBatchSize'@]
The merge batch size balances the maximum latency of individual update
operations, versus the latency of a sequence of update operations. Bigger
batches improves overall performance but some updates will take a lot
longer than others. The default is to use a large batch size.
-}
data TableConfig = TableConfig {
confMergePolicy :: !MergePolicy
Expand All @@ -99,12 +109,14 @@ data TableConfig = TableConfig {
, confBloomFilterAlloc :: !BloomFilterAlloc
, confFencePointerIndex :: !FencePointerIndexType
, confDiskCachePolicy :: !DiskCachePolicy
, confMergeBatchSize :: !MergeBatchSize
}
deriving stock (Show, Eq)

instance NFData TableConfig where
rnf (TableConfig a b c d e f g) =
rnf a `seq` rnf b `seq` rnf c `seq` rnf d `seq` rnf e `seq` rnf f `seq` rnf g
rnf (TableConfig a b c d e f g h) =
rnf a `seq` rnf b `seq` rnf c `seq` rnf d `seq`
rnf e `seq` rnf f `seq` rnf g `seq` rnf h

-- | The 'defaultTableConfig' defines reasonable defaults for all 'TableConfig' parameters.
--
Expand All @@ -122,6 +134,8 @@ instance NFData TableConfig where
-- OrdinaryIndex
-- >>> confDiskCachePolicy defaultTableConfig
-- DiskCacheAll
-- >>> confMergeBatchSize defaultTableConfig
-- MergeBatchSize 20000
--
defaultTableConfig :: TableConfig
defaultTableConfig =
Expand All @@ -133,6 +147,7 @@ defaultTableConfig =
, confBloomFilterAlloc = AllocRequestFPR 1.0e-3
, confFencePointerIndex = OrdinaryIndex
, confDiskCachePolicy = DiskCacheAll
, confMergeBatchSize = MergeBatchSize 20_000 -- same as write buffer
}

data RunLevelNo = RegularLevel LevelNo | UnionLevel
Expand Down Expand Up @@ -238,6 +253,8 @@ data MergeSchedule =
The 'Incremental' merge schedule spreads out the merging work over time.
This is less efficient than the 'OneShot' merge schedule, but has a consistent workload.
Using the 'Incremental' merge schedule, the worst-case disk I\/O complexity of the update operations is /logarithmic/ in the size of the table.
This 'Incremental' merge schedule still uses batching to improve performance.
The batch size can be controlled using the 'MergeBatchSize'.
-}
| Incremental
deriving stock (Eq, Show)
Expand Down Expand Up @@ -385,3 +402,50 @@ diskCachePolicyForLevel policy levelNo =
RegularLevel l | l <= LevelNo n -> CacheRunData
| otherwise -> NoCacheRunData
UnionLevel -> NoCacheRunData

{-------------------------------------------------------------------------------
Merge batch size
-------------------------------------------------------------------------------}

{- |
The /merge batch size/ is a micro-tuning parameter, and in most cases you do
need to think about it and can leave it at its default.

When using the 'Incremental' merge schedule, merging is done in batches. This
is a trade-off: larger batches tends to mean better overall performance but the
downside is that while most updates (inserts, deletes, upserts) are fast, some
are slower (when a batch of merging work has to be done).

If you care most about the maximum latency of updates, then use a small batch
size. If you don't care about latency of individual operations, just the
latency of the overall sequence of operations then use a large batch size. The
default is to use a large batch size, the same size as the write buffer itself.
The minimum batch size is 1. The maximum batch size is the size of the write
buffer 'confWriteBufferAlloc'.

Note that the actual batch size is the minimum of this configuration
parameter and the size of the batch of operations performed (e.g. 'inserts').
So if you consistently use large batches, you can use a batch size of 1 and
the merge batch size will always be determined by the operation batch size.

A further reason why it may be preferable to use minimal batch sizes is to get
good parallel work balance, when using parallelism.
-}
newtype MergeBatchSize = MergeBatchSize Int
deriving stock (Show, Eq, Ord)
deriving newtype (NFData)

-- TODO: the thresholds for doing merge work should be different for each level,
-- and ideally all-pairs co-prime.
Comment on lines +438 to +439
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the thresholds also have some relationship with the size of update batches? Even if the thresholds are co-prime, if the update batch is large enough then we could hit all thresholds at the same time

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true of course. But the update batch size is only known dynamically and it can change.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, ideally doing a big batch of updates would not re-synchronise the counters relative to their thresholds.

creditThresholdForLevel :: TableConfig -> LevelNo -> MR.CreditThreshold
creditThresholdForLevel TableConfig {
confMergeBatchSize = MergeBatchSize mergeBatchSz,
confWriteBufferAlloc = AllocNumEntries writeBufferSz
}
(LevelNo _i) =
MR.CreditThreshold
. MR.UnspentCredits
. MR.MergeCredits
. max 1
. min writeBufferSz
$ mergeBatchSz
Loading