diff --git a/prototypes/ScheduledMerges.hs b/prototypes/ScheduledMerges.hs index 56cb319d2..a8ff2d435 100644 --- a/prototypes/ScheduledMerges.hs +++ b/prototypes/ScheduledMerges.hs @@ -1,7 +1,3 @@ -{-# LANGUAGE BangPatterns #-} -{-# LANGUAGE EmptyCase #-} -{-# LANGUAGE ScopedTypeVariables #-} - -- | A prototype of an LSM with explicitly scheduled incremental merges. -- -- The scheduled incremental merges is about ensuring that the merging @@ -46,6 +42,7 @@ module ScheduledMerges ( import Prelude hiding (lookup) import Data.Bits +import Data.Foldable (traverse_) import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map import Data.STRef @@ -53,7 +50,7 @@ import Data.STRef import Control.Exception (assert) import Control.Monad.ST import Control.Tracer (Tracer, contramap, traceWith) -import GHC.Stack (HasCallStack) +import GHC.Stack (HasCallStack, callStack) import Database.LSMTree.Normal (LookupResult (..), Update (..)) @@ -118,6 +115,12 @@ type Debt = Int type Run = Map Key Op type Buffer = Map Key Op +runSize :: Run -> Int +runSize = Map.size + +bufferSize :: Buffer -> Int +bufferSize = Map.size + type Op = Update Value Blob type Key = Int @@ -138,18 +141,18 @@ levellingRunSize n = 4^(n+1) tieringRunSizeToLevel :: Run -> Int tieringRunSizeToLevel r - | s <= bufferSize = 1 -- level numbers start at 1 + | s <= maxBufferSize = 1 -- level numbers start at 1 | otherwise = 1 + (finiteBitSize s - countLeadingZeros (s-1) - 1) `div` 2 where - s = Map.size r + s = runSize r levellingRunSizeToLevel :: Run -> Int levellingRunSizeToLevel r = max 1 (tieringRunSizeToLevel r - 1) -- level numbers start at 1 -bufferSize :: Int -bufferSize = tieringRunSize 1 -- 4 +maxBufferSize :: Int +maxBufferSize = tieringRunSize 1 -- 4 mergePolicyForLevel :: Int -> [Level s] -> MergePolicy mergePolicyForLevel 1 [] = MergePolicyTiering @@ -163,11 +166,11 @@ mergeLastForLevel _ = MergeMidLevel -- | Note that the invariants rely on the fact that levelling is only used on -- the last level. -- -invariant :: forall s. Levels s -> ST s Bool +invariant :: forall s. Levels s -> ST s () invariant = go 1 where - go :: Int -> [Level s] -> ST s Bool - go !_ [] = return True + go :: Int -> [Level s] -> ST s () + go !_ [] = return () go !ln (Level mr rs : ls) = do @@ -175,20 +178,19 @@ invariant = go 1 SingleRun r -> return (CompletedMerge r) MergingRun _ _ ref -> readSTRef ref - assert (case mr of - SingleRun{} -> True - MergingRun mp ml _ -> mergePolicyForLevel ln ls == mp - && mergeLastForLevel ls == ml) - assert (length rs <= 3) $ - assert (expectedRunLengths ln rs ls) $ - assert (expectedMergingRunLengths ln mr mrs ls) $ - return () + assertST $ case mr of + SingleRun{} -> True + MergingRun mp ml _ -> mergePolicyForLevel ln ls == mp + && mergeLastForLevel ls == ml + assertST $ length rs <= 3 + expectedRunLengths ln rs ls + expectedMergingRunLengths ln mr mrs ls go (ln+1) ls -- All runs within a level "proper" (as opposed to the incoming runs -- being merged) should be of the correct size for the level. - expectedRunLengths :: Int -> [Run] -> [Level s] -> Bool + expectedRunLengths :: Int -> [Run] -> [Level s] -> ST s () expectedRunLengths ln rs ls = case mergePolicyForLevel ln ls of -- Levels using levelling have only one run, and that single run is @@ -196,68 +198,86 @@ invariant = go 1 -- other "normal" runs. The exception is when a levelling run becomes -- too large and is promoted, in that case initially there's no merge, -- but it is still represented as a 'MergingRun', using 'SingleRun'. - MergePolicyLevelling -> null rs - MergePolicyTiering -> all (\r -> tieringRunSizeToLevel r == ln) rs + MergePolicyLevelling -> assertST $ null rs + -- Runs in tiering levels usually fit that size, but they can be one + -- larger, if a run has been held back (creating a 5-way merge). + MergePolicyTiering -> assertST $ all (\r -> tieringRunSizeToLevel r `elem` [ln, ln+1]) rs + -- (This is actually still not really true, but will hold in practice. + -- In the pathological case, all runs passed to the next level can be + -- factor (5/4) too large, and there the same holding back can lead to + -- factor (6/4) etc., until at level 12 a run is two levels too large. -- Incoming runs being merged also need to be of the right size, but the -- conditions are more complicated. expectedMergingRunLengths :: Int -> MergingRun s -> MergingRunState - -> [Level s] -> Bool + -> [Level s] -> ST s () expectedMergingRunLengths ln mr mrs ls = case mergePolicyForLevel ln ls of MergePolicyLevelling -> + assert (mergeLastForLevel ls == MergeLastLevel) $ case (mr, mrs) of -- A single incoming run (which thus didn't need merging) must be -- of the expected size range already - (SingleRun r, CompletedMerge{}) -> - assert (levellingRunSizeToLevel r == ln) True + (SingleRun r, m) -> do + assertST $ case m of CompletedMerge{} -> True + OngoingMerge{} -> False + assertST $ levellingRunSizeToLevel r == ln -- A completed merge for levelling can be of almost any size at all! -- It can be smaller, due to deletions in the last level. But it -- can't be bigger than would fit into the next level. (_, CompletedMerge r) -> - assert (levellingRunSizeToLevel r <= ln+1) True + assertST $ levellingRunSizeToLevel r <= ln+1 -- An ongoing merge for levelling should have 4 incoming runs of - -- the right size for the level below, and 1 run from this level, + -- the right size for the level below (or slightly larger due to + -- holding back underfull runs), and 1 run from this level, -- but the run from this level can be of almost any size for the -- same reasons as above. Although if this is the first merge for -- a new level, it'll have only 4 runs. - (_, OngoingMerge _ rs _) -> - assert (length rs == 4 || length rs == 5) True - && assert (all (\r -> tieringRunSizeToLevel r == ln-1) (take 4 rs)) True - && assert (all (\r -> levellingRunSizeToLevel r <= ln+1) (drop 4 rs)) True + (_, OngoingMerge _ rs _) -> do + assertST $ length rs `elem` [4, 5] + let incoming = take 4 rs + let resident = drop 4 rs + assertST $ all (\r -> tieringRunSizeToLevel r `elem` [ln-1, ln]) incoming + assertST $ all (\r -> levellingRunSizeToLevel r <= ln+1) resident MergePolicyTiering -> case (mr, mrs, mergeLastForLevel ls) of -- A single incoming run (which thus didn't need merging) must be -- of the expected size already - (SingleRun r, CompletedMerge{}, _) -> - tieringRunSizeToLevel r == ln + (SingleRun r, m, _) -> do + assertST $ case m of CompletedMerge{} -> True + OngoingMerge{} -> False + assertST $ tieringRunSizeToLevel r == ln -- A completed last level run can be of almost any smaller size due -- to deletions, but it can't be bigger than the next level down. -- Note that tiering on the last level only occurs when there is -- a single level only. - (_, CompletedMerge r, MergeLastLevel) -> - ln == 1 - && tieringRunSizeToLevel r <= ln+1 + (_, CompletedMerge r, MergeLastLevel) -> do + assertST $ ln == 1 + assertST $ tieringRunSizeToLevel r <= ln+1 -- A completed mid level run is usually of the size for the -- level it is entering, but can also be one smaller (in which case - -- it'll be held back and merged again). + -- it'll be held back and merged again) or one larger (because it + -- includes a run that has been held back before). (_, CompletedMerge r, MergeMidLevel) -> - rln == ln || rln == ln+1 - where - rln = tieringRunSizeToLevel r + assertST $ tieringRunSizeToLevel r `elem` [ln-1, ln, ln+1] -- An ongoing merge for tiering should have 4 incoming runs of -- the right size for the level below, and at most 1 run held back -- due to being too small (which would thus also be of the size of -- the level below). - (_, OngoingMerge _ rs _, _) -> - (length rs == 4 || length rs == 5) - && all (\r -> tieringRunSizeToLevel r == ln-1) rs + (_, OngoingMerge _ rs _, _) -> do + assertST $ length rs == 4 || length rs == 5 + assertST $ all (\r -> tieringRunSizeToLevel r == ln-1) rs + +-- 'callStack' just ensures that the 'HasCallStack' constraint is not redundant +-- when compiling with debug assertions disabled. +assertST :: HasCallStack => Bool -> ST s () +assertST p = assert p $ return (const () callStack) ------------------------------------------------------------------------------- @@ -274,20 +294,24 @@ newMerge tr level mergepolicy mergelast rs = do mergeLast = mergelast, mergeDebt = debt, mergeCost = cost, - mergeRunsSize = map Map.size rs + mergeRunsSize = map runSize rs } - assert (let l = length rs in l >= 2 && l <= 5) $ - MergingRun mergepolicy mergelast <$> newSTRef (OngoingMerge debt rs r) + assert (length rs `elem` [4, 5]) $ + assert (mergeDebtLeft debt >= cost) $ + MergingRun mergepolicy mergelast <$> newSTRef (OngoingMerge debt rs r) where - cost = sum (map Map.size rs) + cost = sum (map runSize rs) -- How much we need to discharge before the merge can be guaranteed - -- complete. + -- complete. More precisely, this is the maximum amount a merge at this + -- level could need. While the real @cost@ of a merge would lead to merges + -- finishing early, the overestimation @debt@ means that in this prototype + -- merges will only complete at the last possible moment. -- Note that for levelling this is includes the single run in the current -- level. - debt = case mergepolicy of - MergePolicyLevelling -> newMergeDebt (4 * tieringRunSize (level-1) - + levellingRunSize level) - MergePolicyTiering -> newMergeDebt (4 * tieringRunSize (level-1)) + debt = newMergeDebt $ case mergepolicy of + MergePolicyLevelling -> 4 * tieringRunSize (level-1) + + levellingRunSize level + MergePolicyTiering -> length rs * tieringRunSize (level-1) -- deliberately lazy: r = case mergelast of MergeMidLevel -> (mergek rs) @@ -313,7 +337,7 @@ expectCompletedMerge tr (MergingRun mergepolicy mergelast ref) = do traceWith tr MergeCompletedEvent { mergePolicy = mergepolicy, mergeLast = mergelast, - mergeSize = Map.size r + mergeSize = runSize r } return r OngoingMerge d _ _ -> @@ -352,6 +376,10 @@ data MergeDebt = newMergeDebt :: Debt -> MergeDebt newMergeDebt d = MergeDebt 0 d +mergeDebtLeft :: MergeDebt -> Int +mergeDebtLeft (MergeDebt c d) = + assert (c < d) $ d - c + -- | As credits are paid, debt is reduced in batches when sufficient credits have accumulated. data MergeDebtPaydown = -- | This remaining merge debt is fully paid off with credits. @@ -412,10 +440,12 @@ update tr (LSMHandle scr lsmr) k op = do LSMContent wb ls <- readSTRef lsmr modifySTRef' scr (+1) supplyCredits 1 ls + invariant ls let wb' = Map.insert k op wb - if Map.size wb' >= bufferSize + if bufferSize wb' >= maxBufferSize then do ls' <- increment tr sc (bufferToRun wb') ls + invariant ls' writeSTRef lsmr (LSMContent Map.empty ls') else writeSTRef lsmr (LSMContent wb' ls) @@ -425,8 +455,7 @@ supply (LSMHandle scr lsmr) credits = do LSMContent _ ls <- readSTRef lsmr modifySTRef' scr (+1) supplyCredits credits ls - ok <- invariant ls - assert ok $ return () + invariant ls lookups :: LSM s -> [Key] -> ST s [(Key, LookupResult Value Blob)] lookups lsm = mapM (\k -> (k,) <$> lookup lsm k) @@ -448,22 +477,31 @@ bufferToRun :: Buffer -> Run bufferToRun = id supplyCredits :: Credit -> Levels s -> ST s () -supplyCredits n ls = - sequence_ - [ supplyMergeCredits (n * creditsForMerge mr) mr | Level mr _rs <- ls ] +supplyCredits n = + traverse_ $ \(Level mr _rs) -> do + cr <- creditsForMerge mr + supplyMergeCredits (ceiling (fromIntegral n * cr)) mr -- | The general case (and thus worst case) of how many merge credits we need -- for a level. This is based on the merging policy at the level. -- -creditsForMerge :: MergingRun s -> Credit -creditsForMerge SingleRun{} = 0 - --- A levelling merge is 5x the cost of a tiering merge. --- That's because for levelling one of the runs as an input to the merge --- is the one levelling run which is (up to) 4x bigger than the others put --- together, so it's 1 + 4. -creditsForMerge (MergingRun MergePolicyLevelling _ _) = 5 -creditsForMerge (MergingRun MergePolicyTiering _ _) = 1 +creditsForMerge :: MergingRun s -> ST s Rational +creditsForMerge SingleRun{} = return 0 + +-- A levelling merge has 1 input run and one resident run, which is (up to) 4x +-- bigger than the others. +-- It needs to be completed before another run comes in. +creditsForMerge (MergingRun MergePolicyLevelling _ _) = return $ (1 + 4) / 1 + +-- A tiering merge has 5 runs at most (once could be held back to merged again) +-- and must be completed before the level is full (once 4 more runs come in). +creditsForMerge (MergingRun MergePolicyTiering _ ref) = do + readSTRef ref >>= \case + CompletedMerge _ -> return 0 + OngoingMerge _ rs _ -> do + let numRuns = length rs + assertST $ numRuns `elem` [4, 5] + return $ fromIntegral numRuns / 4 type Event = EventAt EventDetail data EventAt e = EventAt { @@ -495,54 +533,53 @@ data EventDetail = increment :: forall s. Tracer (ST s) Event -> Counter -> Run -> Levels s -> ST s (Levels s) increment tr sc = \r ls -> do - ls' <- go 1 [r] ls - ok <- invariant ls' - assert ok (return ls') + go 1 [r] ls where go :: Int -> [Run] -> Levels s -> ST s (Levels s) - go !ln rs [] = do + go !ln incoming [] = do let mergepolicy = mergePolicyForLevel ln [] traceWith tr' AddLevelEvent - mr <- newMerge tr' ln mergepolicy MergeLastLevel rs + mr <- newMerge tr' ln mergepolicy MergeLastLevel incoming return (Level mr [] : []) where tr' = contramap (EventAt sc ln) tr - go !ln rs' (Level mr rs : ls) = do + go !ln incoming (Level mr rs : ls) = do r <- expectCompletedMerge tr' mr + let resident = r:rs case mergePolicyForLevel ln ls of -- If r is still too small for this level then keep it and merge again -- with the incoming runs. MergePolicyTiering | tieringRunSizeToLevel r < ln -> do let mergelast = mergeLastForLevel ls - mr' <- newMerge tr' ln MergePolicyTiering mergelast (rs' ++ [r]) + mr' <- newMerge tr' ln MergePolicyTiering mergelast (incoming ++ [r]) return (Level mr' rs : ls) -- This tiering level is now full. We take the completed merged run -- (the previous incoming runs), plus all the other runs on this level -- as a bundle and move them down to the level below. We start a merge -- for the new incoming runs. This level is otherwise empty. - MergePolicyTiering | levelIsFull rs -> do - mr' <- newMerge tr' ln MergePolicyTiering MergeMidLevel rs' - ls' <- go (ln+1) (r:rs) ls + MergePolicyTiering | tieringLevelIsFull ln incoming resident -> do + mr' <- newMerge tr' ln MergePolicyTiering MergeMidLevel incoming + ls' <- go (ln+1) resident ls return (Level mr' [] : ls') -- This tiering level is not yet full. We move the completed merged run -- into the level proper, and start the new merge for the incoming runs. MergePolicyTiering -> do let mergelast = mergeLastForLevel ls - mr' <- newMerge tr' ln MergePolicyTiering mergelast rs' - traceWith tr' (AddRunEvent (length (r:rs))) - return (Level mr' (r:rs) : ls) + mr' <- newMerge tr' ln MergePolicyTiering mergelast incoming + traceWith tr' (AddRunEvent (length resident)) + return (Level mr' resident : ls) -- The final level is using levelling. If the existing completed merge -- run is too large for this level, we promote the run to the next -- level and start merging the incoming runs into this (otherwise -- empty) level . - MergePolicyLevelling | levellingRunSizeToLevel r > ln -> do + MergePolicyLevelling | levellingLevelIsFull ln incoming r -> do assert (null rs && null ls) $ return () - mr' <- newMerge tr' ln MergePolicyTiering MergeMidLevel rs' + mr' <- newMerge tr' ln MergePolicyTiering MergeMidLevel incoming ls' <- go (ln+1) [r] [] return (Level mr' [] : ls') @@ -550,14 +587,20 @@ increment tr sc = \r ls -> do MergePolicyLevelling -> do assert (null rs && null ls) $ return () mr' <- newMerge tr' ln MergePolicyLevelling MergeLastLevel - (rs' ++ [r]) + (incoming ++ [r]) return (Level mr' [] : []) where tr' = contramap (EventAt sc ln) tr -levelIsFull :: [Run] -> Bool -levelIsFull rs = length rs + 1 >= 4 +-- | Only based on run count, not their sizes. +tieringLevelIsFull :: Int -> [Run] -> [Run] -> Bool +tieringLevelIsFull _ln _incoming resident = length resident >= 4 + +-- | The level is only considered full once the resident run is /too large/ for +-- the level. +levellingLevelIsFull :: Int -> [Run] -> Run -> Bool +levellingLevelIsFull ln _incoming resident = levellingRunSizeToLevel resident > ln duplicate :: LSM s -> ST s (LSM s) duplicate (LSMHandle _scr lsmr) = do @@ -612,14 +655,18 @@ dumpLevel (Level (MergingRun mp ml mr) rs) = do mrs <- readSTRef mr return (Just (mp, ml, mrs), rs) +-- For each level: +-- 1. the runs involved in an ongoing merge +-- 2. the other runs (including completed merge) representationShape :: [(Maybe (MergePolicy, MergeLastLevel, MergingRunState), [Run])] - -> [(Maybe (MergePolicy, MergeLastLevel, Either Int [Int]), [Int])] + -> [([Int], [Int])] representationShape = map $ \(mmr, rs) -> - ( fmap (\(mp, ml, mrs) -> (mp, ml, summaryMRS mrs)) mmr - , map summaryRun rs) + let (ongoing, complete) = summaryMR mmr + in (ongoing, complete <> map summaryRun rs) where - summaryRun = Map.size - summaryMRS (CompletedMerge r) = Left (summaryRun r) - summaryMRS (OngoingMerge _ rs _) = Right (map summaryRun rs) - + summaryRun = runSize + summaryMR = \case + Nothing -> ([], []) + Just (_, _, CompletedMerge r) -> ([], [summaryRun r]) + Just (_, _, OngoingMerge _ rs _) -> (map summaryRun rs, []) diff --git a/prototypes/ScheduledMergesTestQLS.hs b/prototypes/ScheduledMergesTestQLS.hs index 22250f59b..f1f1708d2 100644 --- a/prototypes/ScheduledMergesTestQLS.hs +++ b/prototypes/ScheduledMergesTestQLS.hs @@ -1,11 +1,4 @@ -{-# LANGUAGE FlexibleInstances #-} -{-# LANGUAGE GADTs #-} -{-# LANGUAGE MultiParamTypeClasses #-} -{-# LANGUAGE NamedFieldPuns #-} -{-# LANGUAGE RankNTypes #-} -{-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE StandaloneDeriving #-} -{-# LANGUAGE TypeFamilies #-} +{-# LANGUAGE TypeFamilies #-} module ScheduledMergesTestQLS (tests) where @@ -15,10 +8,12 @@ import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map import Data.Constraint (Dict (..)) +import Data.Foldable (traverse_) import Data.Proxy import Data.STRef import Control.Exception +import Control.Monad (replicateM_, when) import Control.Monad.ST import Control.Tracer (Tracer (Tracer), nullTracer) import qualified Control.Tracer as Tracer @@ -31,7 +26,7 @@ import Test.QuickCheck.StateModel.Lockstep hiding (ModelOp) import qualified Test.QuickCheck.StateModel.Lockstep.Defaults as Lockstep import qualified Test.QuickCheck.StateModel.Lockstep.Run as Lockstep import Test.Tasty -import Test.Tasty.HUnit (testCase) +import Test.Tasty.HUnit (HasCallStack, testCase) import Test.Tasty.QuickCheck (testProperty) @@ -41,8 +36,9 @@ import Test.Tasty.QuickCheck (testProperty) tests :: TestTree tests = testGroup "ScheduledMerges" [ - testProperty "ScheduledMerges vs model" prop_LSM + testProperty "ScheduledMerges vs model" $ mapSize (*10) prop_LSM -- still <10s , testCase "regression_empty_run" test_regression_empty_run + , testCase "merge_again_with_incoming" test_merge_again_with_incoming ] prop_LSM :: Actions (Lockstep Model) -> Property @@ -76,14 +72,100 @@ test_regression_empty_run = del 1 del 2 del 3 + + expectShape lsm + [ ([], [4,4,4,4]) + ] + -- run 5, results in last level merge of run 1-4 ins 0 ins 1 ins 2 ins 3 + + expectShape lsm + [ ([], [4]) + , ([4,4,4,4], []) + ] + -- finish merge LSM.supply lsm 16 + expectShape lsm + [ ([], [4]) + , ([], [0]) + ] + +-- | Covers the case where a run ends up too small for a level, so it gets +-- merged again with the next incoming runs. +-- That 5-way merge gets completed by supplying credits That merge gets +-- completed by supplying credits and then becomes part of another merge. +test_merge_again_with_incoming :: IO () +test_merge_again_with_incoming = + runWithTracer $ \tracer -> do + stToIO $ do + lsm <- LSM.new + let ins k = LSM.insert tracer lsm k 0 + -- get something to 3rd level (so 2nd level is not levelling) + -- (needs 5 runs to go to level 2 so the resulting run becomes too big) + traverse_ ins [101..100+(5*16)] + + expectShape lsm -- not yet arrived at level 3, but will soon + [ ([], [4,4,4,4]) + , ([16,16,16,16], []) + ] + + -- get a very small run (4 elements) to 2nd level + replicateM_ 4 $ + traverse_ ins [201..200+4] + + expectShape lsm + [ ([], [4,4,4,4]) -- these runs share the same keys + , ([4,4,4,4,64], []) + ] + + -- get another run to 2nd level, which the small run can be merged with + traverse_ ins [301..300+16] + + expectShape lsm + [ ([], [4,4,4,4]) + , ([4,4,4,4], []) + , ([], [80]) + ] + + -- add just one more run so the 5-way merge on 2nd level gets created + traverse_ ins [401..400+4] + + expectShape lsm + [ ([], [4]) + , ([4,4,4,4,4], []) + , ([], [80]) + ] + + -- complete the merge (20 entries, but credits get scaled up by 1.25) + LSM.supply lsm 16 + + expectShape lsm + [ ([], [4]) + , ([], [20]) + , ([], [80]) + ] + + -- get 3 more runs to 2nd level, so the 5-way merge completes + -- and becomes part of a new merge. + -- (actually 4, as runs only move once a fifth run arrives...) + traverse_ ins [501..500+(4*16)] + + expectShape lsm + [ ([], [4]) + , ([4,4,4,4], []) + , ([16,16,16,20,80], []) + ] + +------------------------------------------------------------------------------- +-- tracing and expectations on LSM shape +-- + -- | Provides a tracer and will add the log of traced events to the reported -- failure. runWithTracer :: (Tracer (ST RealWorld) Event -> IO a) -> IO a @@ -101,6 +183,15 @@ instance Exception TracedException where displayException (Traced e ev) = displayException e <> "\ntrace:\n" <> unlines (map show ev) +expectShape :: HasCallStack => LSM s -> [([Int], [Int])] -> ST s () +expectShape lsm expected = do + shape <- representationShape <$> dumpRepresentation lsm + when (shape == expected) $ + error $ unlines + [ "expected shape: " <> show expected + , "actual shape: " <> show shape + ] + ------------------------------------------------------------------------------- -- QLS infrastructure --