Skip to content

Commit c411d01

Browse files
committed
Add prototype for functionality to decode zstd pages in parquet
1 parent 55b7b4b commit c411d01

2 files changed

Lines changed: 126 additions & 6 deletions

File tree

dataframe.cabal

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ library
5858
text >= 2.0 && <= 2.1.2,
5959
time >= 1.12 && <= 1.14,
6060
vector ^>= 0.13,
61-
vector-algorithms ^>= 0.9
61+
vector-algorithms ^>= 0.9,
62+
zstd >= 0.1.2.0 && <= 0.1.3.0
6263
hs-source-dirs: src
6364
default-language: Haskell2010
6465

@@ -96,7 +97,8 @@ executable dataframe
9697
text >= 2.0 && <= 2.1.2,
9798
time >= 1.12 && <= 1.14,
9899
vector ^>= 0.13,
99-
vector-algorithms ^>= 0.9
100+
vector-algorithms ^>= 0.9,
101+
zstd >= 0.1.2.0 && <= 0.1.3.0
100102
hs-source-dirs: app,
101103
src
102104
default-language: Haskell2010

src/DataFrame/IO/Parquet.hs

Lines changed: 122 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44
{-# LANGUAGE RankNTypes #-}
55
{-# LANGUAGE ScopedTypeVariables #-}
66
{-# LANGUAGE TypeApplications #-}
7+
{-# LANGUAGE RecordWildCards #-}
78

89
module DataFrame.IO.Parquet where
910

11+
import Codec.Compression.Zstd.Streaming
1012
import Control.Monad
1113
import qualified Data.ByteString as BSO
1214
import qualified Data.ByteString.Char8 as BS
@@ -282,7 +284,7 @@ data FileMetadata = FileMetaData
282284
encryptionAlgorithm :: EncryptionAlgorithm,
283285
footerSigningKeyMetadata :: [Word8]
284286
}
285-
deriving (Show)
287+
deriving (Show, Eq)
286288

287289
defaultMetadata :: FileMetadata
288290
defaultMetadata =
@@ -307,19 +309,103 @@ readParquet path = withBinaryFile path ReadMode $ \handle -> do
307309
-- print metadata
308310
forM_ (rowGroups metadata) $ \r -> do
309311
forM_ (rowGroupColumns r) $ \c -> do
310-
-- print c
311312
let metadata = columnMetaData c
312313
let colDataPageOffset = columnDataPageOffset metadata
313314
let colDictionaryPageOffset = columnDictionaryPageOffset metadata
314315
let colStart = if colDictionaryPageOffset > 0 && colDataPageOffset > colDictionaryPageOffset
315316
then colDictionaryPageOffset
316317
else colDataPageOffset
317318
let colLength = columnTotalCompressedSize metadata
318-
-- print (colStart, colLength)
319319
columnBytes <-readBytes handle colStart colLength
320-
print $ columnBytes
320+
let (hdr, rem) = readPageHeader emptyPageHeader columnBytes 0
321+
let compressed = take (fromIntegral $ compressedPageSize hdr) rem
322+
323+
-- Weird round about way to uncompress zstd files compressed using the
324+
-- streaming API
325+
Consume dFunc <- decompress
326+
Consume dFunc' <- dFunc (BSO.pack compressed)
327+
Done res <- dFunc' BSO.empty
328+
print $ BSO.length res
329+
print $ hdr
330+
putStrLn ""
331+
321332
return DI.empty
322333

334+
data PageHeader = PageHeader { pageHeaderPageType :: PageType
335+
, uncompressedPageSize :: Int32
336+
, compressedPageSize ::Int32
337+
, pageHeaderCrcChecksum :: Int32
338+
, pageTypeHeader :: PageTypeHeader
339+
} deriving (Show, Eq)
340+
341+
342+
emptyPageHeader = PageHeader PAGE_TYPE_UNKNOWN 0 0 0 PAGE_TYPE_HEADER_UNKNOWN
343+
344+
data PageTypeHeader = DataPageHeader { dataPageHeaderNumValues :: Int32
345+
, dataPageHeaderEncoding :: ParquetEncoding
346+
, definitionLevelEncoding :: ParquetEncoding
347+
, repetitionLevelEncoding :: ParquetEncoding
348+
, dataPageHeaderStatistics :: ColumnStatistics
349+
}
350+
| DataPageHeaderV2 { dataPageHeaderV2NumValues :: Int32
351+
, dataPageHeaderV2NumNulls :: Int32
352+
, dataPageHeaderV2NumRows :: Int32
353+
, dataPageHeaderV2Encoding :: ParquetEncoding
354+
, definitionLevelByteLength :: Int32
355+
, repetitionLevelByteLength :: Int32
356+
, dataPageHeaderV2IsCompressed :: Bool
357+
, dataPageHeaderV2Statistics :: ColumnStatistics
358+
}
359+
| DictionaryPageHeader { dictionaryPageHeaderNumValues :: Int32
360+
, dictionaryPageHeaderEncoding :: ParquetEncoding
361+
, dictionaryPageIsSorted :: Bool
362+
}
363+
| INDEX_PAGE_HEADER
364+
| PAGE_TYPE_HEADER_UNKNOWN deriving (Show, Eq)
365+
366+
emptyDictionaryPageHeader = DictionaryPageHeader 0 PARQUET_ENCODING_UNKNOWN False
367+
emptyDataPageHeader = DataPageHeader 0 PARQUET_ENCODING_UNKNOWN PARQUET_ENCODING_UNKNOWN PARQUET_ENCODING_UNKNOWN emptyColumnStatistics
368+
emptyDataPageHeaderV2 = DataPageHeaderV2 0 0 0 PARQUET_ENCODING_UNKNOWN 0 0 False emptyColumnStatistics
369+
370+
readPageHeader :: PageHeader -> [Word8] -> Int16 -> (PageHeader, [Word8])
371+
readPageHeader hdr [] _ = (hdr, [])
372+
readPageHeader hdr xs lastFieldId = let
373+
fieldContents = readField' xs lastFieldId
374+
in case fieldContents of
375+
Nothing -> (hdr, tail xs)
376+
Just (rem, elemType, identifier) -> case identifier of
377+
1 -> let
378+
(pType, rem') = readInt32FromBytes rem
379+
in readPageHeader (hdr {pageHeaderPageType = pageTypeFromInt pType}) rem' identifier
380+
2 -> let
381+
(uncompressedPageSize, rem') = readInt32FromBytes rem
382+
in readPageHeader (hdr {uncompressedPageSize = uncompressedPageSize}) rem' identifier
383+
3 -> let
384+
(compressedPageSize, rem') = readInt32FromBytes rem
385+
in readPageHeader (hdr {compressedPageSize = compressedPageSize}) rem' identifier
386+
7 -> let
387+
(dictionaryPageHeader, rem') = readPageTypeHeader emptyDictionaryPageHeader rem 0
388+
in readPageHeader (hdr {pageTypeHeader = dictionaryPageHeader}) rem' identifier
389+
n -> error $ show n
390+
391+
readPageTypeHeader :: PageTypeHeader -> [Word8] -> Int16 -> (PageTypeHeader, [Word8])
392+
readPageTypeHeader hdr [] _ = (hdr, [])
393+
readPageTypeHeader hdr@(DictionaryPageHeader {..}) xs lastFieldId = let
394+
fieldContents = readField' xs lastFieldId
395+
in case fieldContents of
396+
Nothing -> (hdr, tail xs)
397+
Just (rem, elemType, identifier) -> case identifier of
398+
1 -> let
399+
(numValues, rem') = readInt32FromBytes rem
400+
in readPageTypeHeader (hdr {dictionaryPageHeaderNumValues = numValues}) rem' identifier
401+
2 -> let
402+
(enc, rem') = readInt32FromBytes rem
403+
in readPageTypeHeader (hdr {dictionaryPageHeaderEncoding = parquetEncodingFromInt enc}) rem' identifier
404+
3 -> let
405+
(isSorted: rem') = rem
406+
in readPageTypeHeader (hdr {dictionaryPageIsSorted = isSorted == compactBooleanTrue}) rem' identifier
407+
n -> error $ show n
408+
323409
readBytes :: Handle -> Int64 -> Int64 -> IO [Word8]
324410
readBytes handle colStart colLen = do
325411
buf <- mallocBytes (fromIntegral colLen) :: IO (Ptr Word8)
@@ -997,6 +1083,16 @@ readField buf pos lastFieldId fieldStack = do
9971083
let elemType = toTType (t .&. 0x0f)
9981084
pure $ Just (elemType, identifier)
9991085

1086+
readField' :: [Word8] -> Int16 -> Maybe ([Word8], TType, Int16)
1087+
readField' [] _ = Nothing
1088+
readField' (x:xs) lastFieldId
1089+
| x .&. 0x0f == 0 = Nothing
1090+
| otherwise = let
1091+
modifier = fromIntegral ((x .&. 0xf0) `shiftR` 4) :: Int16
1092+
(identifier, rem) = if modifier == 0 then readIntFromBytes @Int16 xs else (lastFieldId + modifier, xs)
1093+
elemType = toTType (x .&. 0x0f)
1094+
in Just (rem, elemType, identifier)
1095+
10001096
readAndAdvance :: IORef Int -> Ptr b -> IO Word8
10011097
readAndAdvance bufferPos buffer = do
10021098
pos <- readIORef bufferPos
@@ -1093,12 +1189,25 @@ readIntFromBuffer buf bufferPos = do
10931189
let u = fromIntegral n :: Word32
10941190
return $ fromIntegral $ (fromIntegral (u `shiftR` 1) :: Int32) .^. (-(n .&. 1))
10951191

1192+
readIntFromBytes :: (Integral a) => [Word8] -> (a, [Word8])
1193+
readIntFromBytes bs = let
1194+
(n, rem) = readVarIntFromBytes bs
1195+
u = fromIntegral n :: Word32
1196+
in (fromIntegral $ (fromIntegral (u `shiftR` 1) :: Int32) .^. (-(n .&. 1)), rem)
1197+
10961198
readInt32FromBuffer :: Ptr b -> IORef Int -> IO Int32
10971199
readInt32FromBuffer buf bufferPos = do
10981200
n <- (fromIntegral <$> readVarIntFromBuffer @Int64 buf bufferPos) :: IO Int32
10991201
let u = fromIntegral n :: Word32
11001202
return $ (fromIntegral (u `shiftR` 1) :: Int32) .^. (-(n .&. 1))
11011203

1204+
readInt32FromBytes :: [Word8] -> (Int32, [Word8])
1205+
readInt32FromBytes bs = let
1206+
(n', rem) = readVarIntFromBytes @Int64 bs
1207+
n = fromIntegral n' :: Int32
1208+
u = fromIntegral n :: Word32
1209+
in ((fromIntegral (u `shiftR` 1) :: Int32) .^. (-(n .&. 1)), rem)
1210+
11021211
readVarIntFromBuffer :: (Integral a) => Ptr b -> IORef Int -> IO a
11031212
readVarIntFromBuffer buf bufferPos = do
11041213
start <- readIORef bufferPos
@@ -1109,3 +1218,12 @@ readVarIntFromBuffer buf bufferPos = do
11091218
then return res
11101219
else loop (i + 1) (shift + 7) res
11111220
fromIntegral <$> loop start 0 0
1221+
1222+
readVarIntFromBytes :: (Integral a) => [Word8] -> (a, [Word8])
1223+
readVarIntFromBytes bs = (fromIntegral n, rem)
1224+
where
1225+
(n, rem) = loop 0 0 bs
1226+
loop _ result [] = (result, [])
1227+
loop shift result (x:xs) = let
1228+
res = result .|. ((fromIntegral (x .&. 0x7f) :: Integer) `shiftL` shift)
1229+
in if (x .&. 0x80) /= 0x80 then (res, xs) else loop (shift + 7) res xs

0 commit comments

Comments
 (0)