Skip to content

Commit 3a395b0

Browse files
committed
Track more state when reading file to avoid unnecessary IO.
1 parent 8fc3708 commit 3a395b0

8 files changed

Lines changed: 89 additions & 41 deletions

File tree

src/DataFrame/IO/CSV.hs

Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import qualified Data.Vector.Mutable as VM
2323
import qualified Data.Vector.Unboxed.Mutable as VUM
2424

2525
import Control.Applicative ((<$>), (<|>), (<*>), (<*), (*>), many)
26-
import Control.Monad (forM_, zipWithM_, unless, void, replicateM_)
26+
import Control.Monad (forM_, zipWithM_, unless, when, void, replicateM_)
2727
import Data.Attoparsec.Text
2828
import Data.Char
2929
import DataFrame.Internal.Column (Column(..), freezeColumn', writeColumn, columnLength)
@@ -49,33 +49,39 @@ data ReadOptions = ReadOptions {
4949
hasHeader :: Bool,
5050
inferTypes :: Bool,
5151
safeRead :: Bool,
52-
rowRange :: Maybe (Int, Int), -- (start, length)
53-
seekPos :: Maybe Integer
52+
rowRange :: !(Maybe (Int, Int)), -- (start, length)
53+
seekPos :: !(Maybe Integer),
54+
totalRows :: !(Maybe Int),
55+
leftOver :: !T.Text,
56+
rowsRead :: !Int
5457
}
5558

5659
-- | By default we assume the file has a header, we infer the types on read
5760
-- and we convert any rows with nullish objects into Maybe (safeRead).
5861
defaultOptions :: ReadOptions
59-
defaultOptions = ReadOptions { hasHeader = True, inferTypes = True, safeRead = True, rowRange = Nothing, seekPos = Nothing }
62+
defaultOptions = ReadOptions { hasHeader = True, inferTypes = True, safeRead = True, rowRange = Nothing, seekPos = Nothing, totalRows = Nothing, leftOver = "", rowsRead = 0 }
6063

6164
-- | Reads a CSV file from the given path.
6265
-- Note this file stores intermediate temporary files
6366
-- while converting the CSV from a row to a columnar format.
6467
readCsv :: String -> IO DataFrame
65-
readCsv = readSeparated ',' defaultOptions
68+
readCsv path = fst <$> readSeparated ',' defaultOptions path
6669

6770
-- | Reads a tab separated file from the given path.
6871
-- Note this file stores intermediate temporary files
6972
-- while converting the CSV from a row to a columnar format.
7073
readTsv :: String -> IO DataFrame
71-
readTsv = readSeparated '\t' defaultOptions
74+
readTsv path = fst <$> readSeparated '\t' defaultOptions path
7275

7376
-- | Reads a character separated file into a dataframe using mutable vectors.
74-
readSeparated :: Char -> ReadOptions -> String -> IO DataFrame
77+
readSeparated :: Char -> ReadOptions -> String -> IO (DataFrame, (Integer, T.Text, Int))
7578
readSeparated c opts path = do
76-
(begin, len) <- case rowRange opts of
77-
Nothing -> countRows c path >>= \totalRows -> return (0, if hasHeader opts then totalRows - 1 else totalRows)
78-
Just (start, len) -> return (start, len)
79+
totalRows <- case totalRows opts of
80+
Nothing -> countRows c path >>= \total -> if hasHeader opts then return (total - 1) else return total
81+
Just n -> if hasHeader opts then return (n - 1) else return n
82+
let (begin, len) = case rowRange opts of
83+
Nothing -> (0, totalRows)
84+
Just (start, len) -> (start, min len (totalRows - rowsRead opts))
7985
withFile path ReadMode $ \handle -> do
8086
firstRow <- map T.strip . parseSep c <$> TIO.hGetLine handle
8187
let columnNames = if hasHeader opts
@@ -84,17 +90,18 @@ readSeparated c opts path = do
8490
-- If there was no header rewind the file cursor.
8591
unless (hasHeader opts) $ hSeek handle AbsoluteSeek 0
8692

87-
-- skip columns till `begin`
88-
_ <- replicateM_ begin (TIO.hGetLine handle >> return () )
93+
currPos <- hTell handle
94+
when (isJust $ seekPos opts) $ hSeek handle AbsoluteSeek (fromMaybe currPos (seekPos opts))
8995

9096
-- Initialize mutable vectors for each column
9197
let numColumns = length columnNames
92-
let numRows = len
98+
let numRows = len
9399
-- Use this row to infer the types of the rest of the column.
94100
-- TODO: this isn't robust but in so far as this is a guess anyway
95101
-- it's probably fine. But we should probably sample n rows and pick
96102
-- the most likely type from the sample.
97-
dataRow <- map T.strip . parseSep c <$> TIO.hGetLine handle
103+
-- dataRow <- map T.strip . parseSep c . (<>) (leftOver opts) <$> TIO.hGetLine handle
104+
(!dataRow, !remainder) <- readSingleLine c (leftOver opts) handle
98105

99106
-- This array will track the indices of all null values for each column.
100107
-- If any exist then the column will be an optional type.
@@ -104,18 +111,19 @@ readSeparated c opts path = do
104111
getInitialDataVectors numRows mutableCols dataRow
105112

106113
-- Read rows into the mutable vectors
107-
fillColumns numRows c mutableCols nullIndices handle
114+
(!unconsumed, !r) <- fillColumns numRows c mutableCols nullIndices remainder handle
108115

109116
-- Freeze the mutable vectors into immutable ones
110117
nulls' <- V.unsafeFreeze nullIndices
111118
cols <- V.mapM (freezeColumn mutableCols nulls' opts) (V.generate numColumns id)
119+
pos <- hTell handle
112120

113-
return $ DataFrame {
121+
return (DataFrame {
114122
columns = cols,
115123
freeIndices = [],
116124
columnIndices = M.fromList (zip columnNames [0..]),
117125
dataframeDimensions = (maybe 0 columnLength (cols V.! 0), V.length cols)
118-
}
126+
}, (pos, unconsumed, r + 1))
119127
{-# INLINE readSeparated #-}
120128

121129
getInitialDataVectors :: Int -> VM.IOVector Column -> [T.Text] -> IO ()
@@ -138,10 +146,22 @@ inferValueType s = let
138146
Nothing -> "Other"
139147
{-# INLINE inferValueType #-}
140148

149+
readSingleLine :: Char -> T.Text -> Handle -> IO ([T.Text], T.Text)
150+
readSingleLine c unused handle = parseWith (TIO.hGetChunk handle) (parseRow c) unused >>= \case
151+
Fail unconsumed ctx er -> do
152+
erpos <- hTell handle
153+
fail $ "Failed to parse CSV file around " <> show erpos <> " byte; due: "
154+
<> show er <> "; context: " <> show ctx
155+
Partial c -> do
156+
fail "Partial handler is called"
157+
Done (unconsumed :: T.Text) (row :: [T.Text]) -> do
158+
return (row, unconsumed)
159+
141160
-- | Reads rows from the handle and stores values in mutable vectors.
142-
fillColumns :: Int -> Char -> VM.IOVector Column -> VM.IOVector [(Int, T.Text)] -> Handle -> IO ()
143-
fillColumns n c mutableCols nullIndices handle = do
144-
input <- newIORef (mempty :: T.Text)
161+
fillColumns :: Int -> Char -> VM.IOVector Column -> VM.IOVector [(Int, T.Text)] -> T.Text -> Handle -> IO (T.Text, Int)
162+
fillColumns n c mutableCols nullIndices unused handle = do
163+
input <- newIORef unused
164+
rowsRead <- newIORef (0 :: Int)
145165
forM_ [1..(n - 1)] $ \i -> do
146166
isEOF <- hIsEOF handle
147167
input' <- readIORef input
@@ -155,7 +175,11 @@ fillColumns n c mutableCols nullIndices handle = do
155175
fail "Partial handler is called"
156176
Done (unconsumed :: T.Text) (row :: [T.Text]) -> do
157177
writeIORef input unconsumed
178+
modifyIORef rowsRead (+1)
158179
zipWithM_ (writeValue mutableCols nullIndices i) [0..] row
180+
l <- readIORef input
181+
r <- readIORef rowsRead
182+
pure (l, r)
159183
{-# INLINE fillColumns #-}
160184

161185
-- | Writes a value into the appropriate column, resizing the vector if necessary.

src/DataFrame/Internal/Column.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
{-# LANGUAGE OverloadedStrings #-}
55
{-# LANGUAGE RankNTypes #-}
66
{-# LANGUAGE ScopedTypeVariables #-}
7-
{-# LANGUAGE StrictData #-}
7+
{-# LANGUAGE Strict #-}
88
{-# LANGUAGE TypeApplications #-}
99
{-# LANGUAGE FlexibleContexts #-}
1010
{-# LANGUAGE FlexibleInstances #-}

src/DataFrame/Internal/DataFrame.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
{-# LANGUAGE ScopedTypeVariables #-}
55
{-# LANGUAGE TypeApplications #-}
66
{-# LANGUAGE GADTs #-}
7-
{-# LANGUAGE StrictData #-}
7+
{-# LANGUAGE Strict #-}
88
{-# LANGUAGE FlexibleContexts #-}
99
module DataFrame.Internal.DataFrame where
1010

src/DataFrame/Lazy/Internal/DataFrame.hs

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,24 @@
33
{-# LANGUAGE InstanceSigs #-}
44
{-# LANGUAGE ExistentialQuantification #-}
55
{-# LANGUAGE AllowAmbiguousTypes #-}
6+
{-# LANGUAGE Strict #-}
7+
{-# LANGUAGE BangPatterns #-}
8+
{-# LANGUAGE OverloadedStrings #-}
69
{-# LANGUAGE NumericUnderscores #-}
710
module DataFrame.Lazy.Internal.DataFrame where
811

9-
import Control.Monad (forM_)
12+
import Control.Monad (forM, foldM)
1013
import Data.IORef
1114
import Data.Kind
15+
import qualified Data.List as L
1216
import qualified Data.Map as M
1317
import qualified Data.Text as T
1418
import qualified Data.Vector as V
1519
import qualified DataFrame.Internal.DataFrame as D
1620
import qualified DataFrame.Internal.Column as C
1721
import qualified DataFrame.Internal.Expression as E
1822
import qualified DataFrame.Operations.Core as D
23+
import DataFrame.Operations.Merge
1924
import qualified DataFrame.Operations.Subset as D
2025
import qualified DataFrame.Operations.Transformations as D
2126
import qualified DataFrame.IO.CSV as D
@@ -37,7 +42,7 @@ data InputType = ICSV deriving Show
3742
data LazyDataFrame = LazyDataFrame
3843
{ inputPath :: FilePath
3944
, inputType :: InputType
40-
, operations :: [LazyOperation]
45+
, operations :: [LazyOperation]
4146
, batchSize :: Int
4247
} deriving Show
4348

@@ -49,27 +54,33 @@ eval (Filter expr) = D.filterWhere expr
4954
runDataFrame :: forall a . (C.Columnable a) => LazyDataFrame -> IO D.DataFrame
5055
runDataFrame df = do
5156
let path = inputPath df
52-
-- totalRows <- D.countRows ',' path
53-
let batches = batchRanges 1000000 (batchSize df)
54-
_ <- forM_ batches $ \ (start, end) -> do
55-
-- TODO: implement specific read operations for batching that returns a seek instead of re-reading everything.
56-
sdf <- D.readSeparated ',' (D.defaultOptions { D.rowRange = Just (start, (batchSize df)) }) path
57-
let rdf = foldl' (\d op -> eval op d) sdf (operations df)
58-
if fst (D.dimensions rdf) == 0 then return () else print rdf
59-
return (D.empty)
57+
totalRows <- D.countRows ',' path
58+
let batches = batchRanges totalRows (batchSize df)
59+
(df', _) <- foldM (\(!accDf, (!pos, !unused, !r)) (!start, !end) -> do
60+
mapM_ putStr ["Scanning: ", show start, " to ", show end, " rows out of ", show totalRows, "\n"]
61+
62+
(!sdf, (!pos', !unconsumed, !rowsRead)) <- D.readSeparated ',' (
63+
D.defaultOptions { D.rowRange = Just (start, batchSize df)
64+
, D.totalRows = Just totalRows
65+
, D.seekPos = pos
66+
, D.rowsRead = r
67+
, D.leftOver = unused}) path
68+
let !rdf = L.foldl' (flip eval) sdf (operations df)
69+
return (accDf <> rdf, (Just pos', unconsumed, rowsRead + r)) ) (D.empty, (Nothing, "", 0)) batches
70+
return df'
6071

6172
batchRanges :: Int -> Int -> [(Int, Int)]
6273
batchRanges n inc = go n [0,inc..n]
63-
where
74+
where
6475
go _ [] = []
6576
go n [x] = [(x, n)]
6677
go n (f:s:rest) =(f, s) : go n (s:rest)
6778

6879
scanCsv :: T.Text -> LazyDataFrame
69-
scanCsv path = LazyDataFrame (T.unpack path) ICSV [] 1024
80+
scanCsv path = LazyDataFrame (T.unpack path) ICSV [] 512_000
7081

7182
addOperation :: LazyOperation -> LazyDataFrame -> LazyDataFrame
72-
addOperation op df = df { operations = (operations df) ++ [op] }
83+
addOperation op df = df { operations = operations df ++ [op] }
7384

7485
derive :: C.Columnable a => T.Text -> E.Expr a -> LazyDataFrame -> LazyDataFrame
7586
derive name expr = addOperation (Derive name expr)

src/DataFrame/Operations/Core.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
{-# LANGUAGE ScopedTypeVariables #-}
77
{-# LANGUAGE TypeApplications #-}
88
{-# LANGUAGE BangPatterns #-}
9+
{-# LANGUAGE Strict #-}
910
module DataFrame.Operations.Core where
1011

1112
import qualified Data.List as L

src/DataFrame/Operations/Merge.hs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
{-# LANGUAGE InstanceSigs #-}
2+
{-# LANGUAGE Strict #-}
23
module DataFrame.Operations.Merge where
34

45
import qualified Data.List as L
@@ -11,9 +12,13 @@ import qualified DataFrame.Operations.Core as D
1112
instance Semigroup D.DataFrame where
1213
(<>) :: D.DataFrame -> D.DataFrame -> D.DataFrame
1314
(<>) a b = let
14-
columnsInBOnly = filter (\c -> not (c `elem` (D.columnNames b))) (D.columnNames b)
15+
columnsInBOnly = filter (\c -> c `notElem` D.columnNames b) (D.columnNames b)
1516
columnsInA = D.columnNames a
16-
addColumns a' b' df name = let
17+
addColumns a' b' df name
18+
| fst (D.dimensions a') == 0 && fst (D.dimensions b') == 0 = df
19+
| fst (D.dimensions a') == 0 = D.insertColumn' name (D.getColumn name b') df
20+
| fst (D.dimensions b') == 0 = D.insertColumn' name (D.getColumn name a') df
21+
| otherwise = let
1722
numColumnsA = (fst $ D.dimensions a')
1823
numColumnsB = (fst $ D.dimensions b')
1924
numColumns = max numColumnsA numColumnsB
@@ -26,4 +31,9 @@ instance Semigroup D.DataFrame where
2631
Just b'' -> case optA of
2732
Nothing -> D.insertColumn' name (Just (D.leftExpandColumn numColumnsA b'')) df
2833
Just a'' -> D.insertColumn' name (D.concatColumns a'' b'') df
29-
in foldl' (addColumns a b) D.empty (L.union (D.columnNames a) (D.columnNames b))
34+
in L.foldl' (addColumns a b) D.empty (D.columnNames a `L.union` D.columnNames b)
35+
36+
instance Monoid D.DataFrame where
37+
mempty = D.empty
38+
39+

src/DataFrame/Operations/Subset.hs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,10 @@ filterBy = flip filter
9999
filterWhere :: Expr Bool -> DataFrame -> DataFrame
100100
filterWhere expr df = let
101101
(TColumn col) = interpret @Bool df expr
102-
(Just indexes) = ifoldlColumn (\s i satisfied -> if satisfied then S.insert i s else s) S.empty col
102+
(Just indexes) = VU.convert . V.map (fromMaybe 0) . V.filter isJust . toVector @(Maybe Int) <$> itransform (\i satisfied -> if satisfied then Just i else Nothing) col
103103
c' = snd $ dataframeDimensions df
104-
pick idxs col = atIndices idxs <$> col
105-
in df {columns = V.map (pick indexes) (columns df), dataframeDimensions = (S.size indexes, c')}
104+
pick idxs col = atIndicesStable idxs <$> col
105+
in df {columns = V.map (pick indexes) (columns df), dataframeDimensions = (VU.length indexes, c')}
106106

107107

108108
-- | O(k) removes all rows with `Nothing` in a given column from the dataframe.

src/DataFrame/Operations/Transformations.hs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
{-# LANGUAGE ScopedTypeVariables #-}
44
{-# LANGUAGE TypeApplications #-}
55
{-# LANGUAGE FlexibleContexts #-}
6+
{-# LANGUAGE Strict #-}
7+
{-# LANGUAGE StrictData #-}
68
module DataFrame.Operations.Transformations where
79

810
import qualified Data.List as L

0 commit comments

Comments
 (0)