It doesn’t seem to output anything when I run it, but so far as I can see it should print the tuples.
I’ve fixed it now. The problem was that parseMeasurements
returned a M.empty
if it couldn’t parse any more lines, but that should have of course been the accumulated m
.
Very impressive – its at about 2s on my machine at just 25 LoC. It’d finish 1B in about 3m20 single threaded: I suspect that on 8 cores with a 75% efficiency, it would finish in around 30 seconds.
Impressive! I appreciate the return $!
/pure $!
!
Yeah, I think it would be great if flatparse would provide a “value strict” Parser
.
I have now also implemented a little adapter to make flatparse work with lazy bytestrings:
-- needs to be top-level now
f :: T -> T -> T
f (T a b c d) (T a' b' c' d') = T (min a a') (max b b') (c+c') (d+d')
runParserLazy :: (a -> a -> a) -> a -> Parser e a -> B.ByteString -> Result e a
runParserLazy k z p = B.foldlChunks step (OK z mempty) where
step (OK x bs) bs' = k x <$> runParser p (bs <> bs')
step x _ = x
main :: IO ()
main = do
str <- B.readFile "measurements.txt"
OK x _ <- pure $ runParserLazy (M.unionWith f) M.empty parseMeasurements str
M.foldrWithKey (\k (T a b c d) go -> print (k, a, b, c/d) *> go) (pure ()) x
That doesn’t decrease the time on my machine by much, it is still 1.15 seconds. And it reduces the maximum residency from 138,019,888 to 207,456.
Sounds like a good idea to make pure
/return
strict by default. For some reason I thought that this was already the case but apparently not.
The runParserLazy
also looks very useful! However, if I’m correct, it only works if the bytestring chunks are the same as the chunks accepted by the parser. I think it should be possible to do a version which works with arbitrary chunking.
It doesn’t require the “parser chunks” to be exactly the same as the “bytestring chunks”.
The main assumption is that the parser is parsing some repetitive structure, and requires that you can provide an a -> a -> a
function to combine the parser chunks.
The way it works is: when the parser stops before the end of a bytestring chunk with an OK
result then runParserLazy
simply appends the remainder of the chunk onto the beginning of the next chunk (this is technically a slow O(n) operation, but practically it seems fast enough).
So it must be noted that the parser chunks should be significantly smaller than the bytestring chunks, otherwise I think this approach will be slower. And this approach won’t work at all if the parser chunks are larger than the bytestring chunks.
So, I’m not sure how generally applicable this approach is. I guess it works well for parsing logs like this with many small entries, but I don’t think it would work well for arbitrary JSON or things like that.
OK, I was missing that. But as you say it doesn’t work with arbitrary chunking. I remember now that I considered generic lazy bytestring parsing before, and it would basically require duplicating the current codebase to support suspension and resumption for the parsing thread… Not too difficult technically but rather annoying to implement. Your version is interesting in that it does not require generic resumption.
Your output isn’t sorted, the station names are not properly encoded and has another format.
This needs a bit more than 5 minutes for the “real” data on my machine, but mainly 30GB RAM.
So a bit more than double the time (2min 20s on my computer) of
Fair enough, for now I thought we were dropping those constraints when exploring the simple solutions. I don’t think implementing those features will have a big impact on the running time because the output size is much smaller than the input.
Then you’re probably not using the runParserLazy
function that I added in a later post: One Billion Row challenge in Hs - #66 by jaror
It’s not about the time needed, but the possibility to check the output.
No, because I’d need to do more than just cut and paste and change the import to use the lazy bytestring.
It’s really just a matter of copy pasting, removing the old main
function and adding an import qualified Data.ByteString.Lazy as B
and optionally removeing the local f
binding from parseMeasurements
that I’ve now floated to the top level.
But here's the full file if you just want to copy and paste one thing:
import qualified Data.HashMap.Strict as M
import qualified Data.ByteString.Char8 as BS
import qualified Data.ByteString.Lazy as B
import FlatParse.Basic hiding (char, isDigit)
import Data.Char
data T = T !Float !Float !Float !Float
data S = S !BS.ByteString !Float
anyFloat :: Parser () Float
anyFloat = withOption (satisfyAscii (== '-')) (\_ -> negate <$> anyPositiveFloat) anyPositiveFloat where
anyPositiveFloat = do
x <- anyAsciiDecimalInt <* skipSatisfyAscii (== '.')
c <- satisfyAscii isDigit
pure $! fromIntegral x + fromIntegral (ord c - ord '0') * 0.1
parseLine :: Parser () S
parseLine = do
name <- byteStringOf (skipSome (satisfyAscii (/= ';'))) <* skipAnyAsciiChar
f <- anyFloat <* skipSatisfyAscii (== '\n')
return $! S name f
parseMeasurements :: Parser () (M.HashMap BS.ByteString T)
parseMeasurements = go M.empty where
go !m = withOption parseLine (\(S k v) -> go (M.insertWith f k (T v v v 1) m)) (pure m)
f :: T -> T -> T
f (T a b c d) (T a' b' c' d') = T (min a a') (max b b') (c+c') (d+d')
runParserLazy :: (a -> a -> a) -> a -> Parser e a -> B.ByteString -> Result e a
runParserLazy k z p = B.foldlChunks step (OK z mempty) where
step (OK x bs) bs' = k x <$> runParser p (bs <> bs')
step x _ = x
main :: IO ()
main = do
str <- B.readFile "measurements.txt"
OK x _ <- pure $ runParserLazy (M.unionWith f) M.empty parseMeasurements str
M.foldrWithKey (\k (T a b c d) go -> print (k, a, b, c/d) *> go) (pure ()) x
No, that didn’t work because of the type of name
. I just had been to lazy to look into getting it to compile ;).
This version now needs only 64MB of RAM, 23 Threads? (although only a single one of these does any work) but takes 5:43.67 to complete, about 30s more than the original version.
Those results are very strange. On my machine the concurrent implementation by @vaibhavsagar takes 83 seconds (which is now the fastest I’ve tested) and my single-threaded implementation takes about 110 seconds.
Are you using any special compilation options? Which GHC version are you using?
I’m using cabal like this:
cabal run -O2 exes -- +RTS -s
With GHC 9.4.8.
My single-threaded statistics:
775,835,964,136 bytes allocated in the heap
12,633,241,992 bytes copied during GC
207,784 bytes maximum residency (2 sample(s))
32,920 bytes maximum slop
7 MiB total memory in use (0 MB lost due to fragmentation)
Tot time (elapsed) Avg pause Max pause
Gen 0 181936 colls, 0 par 5.041s 5.112s 0.0000s 0.0001s
Gen 1 2 colls, 0 par 0.000s 0.000s 0.0001s 0.0001s
INIT time 0.000s ( 0.000s elapsed)
MUT time 105.478s (105.368s elapsed)
GC time 5.042s ( 5.113s elapsed)
EXIT time 0.000s ( 0.009s elapsed)
Total time 110.520s (110.490s elapsed)
%GC time 0.0% (0.0% elapsed)
Alloc rate 7,355,418,908 bytes per MUT second
Productivity 95.4% of total user, 95.4% of total elapsed
Statistics of the other concurrent implementation:
475,839,555,232 bytes allocated in the heap
2,203,512,336 bytes copied during GC
13,795,569,976 bytes maximum residency (7 sample(s))
815,080 bytes maximum slop
13219 MiB total memory in use (0 MB lost due to fragmentation)
Tot time (elapsed) Avg pause Max pause
Gen 0 111308 colls, 111308 par 13.931s 3.891s 0.0000s 0.0010s
Gen 1 7 colls, 6 par 0.004s 0.454s 0.0649s 0.4529s
Parallel GC work balance: 0.03% (serial 0%, perfect 100%)
TASKS: 26 (1 bound, 25 peak workers (25 total), using -N12)
SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)
INIT time 0.003s ( 0.002s elapsed)
MUT time 88.960s ( 78.875s elapsed)
GC time 13.935s ( 4.345s elapsed)
EXIT time 0.455s ( 0.009s elapsed)
Total time 103.354s ( 83.230s elapsed)
Alloc rate 5,348,887,140 bytes per MUT second
Productivity 86.1% of total user, 94.8% of total elapsed
Hmm, 9.6.4 (ARM64), and -O2 -threaded +RTS -N
all the other flags are just warnings and GHC2021
Running under the same config my program does indeed slow down a bit, but only to 130 seconds. I think that is caused by the -N
option, if you use -N1
instead it should already be a bit faster, but disabling threading altogether should shave off a few more seconds. Perhaps the remaining difference is just the difference between AMD64 and ARM64?
You could also try -fllvm
perhaps that produces better code for ARM than GHC’s native code gen.
I've now tried out using vector-hashtables for mutable hashtables and it improves the performance more so that it now takes less than 80 seconds for the full 1 billion rows:
import qualified Data.ByteString.Char8 as BS
import qualified Data.ByteString.Lazy as B
import FlatParse.Basic hiding (char, isDigit)
import Data.Char
import Control.Monad.IO.Class
import qualified Data.Vector.Mutable as VM
import qualified Data.Vector.Unboxed.Mutable as UM
import Data.Vector.Hashtables
import Linear.V4
type Hashtable k v = Dictionary (PrimState IO) VM.MVector k UM.MVector v
data S = S !BS.ByteString !Float
anyFloat :: ParserT st () Float
anyFloat = withOption (satisfyAscii (== '-')) (\_ -> negate <$> anyPositiveFloat) anyPositiveFloat where
anyPositiveFloat = do
x <- anyAsciiDecimalInt <* skipSatisfyAscii (== '.')
c <- satisfyAscii isDigit
pure $! fromIntegral x + fromIntegral (ord c - ord '0') * 0.1
parseLine :: ParserT st () S
parseLine = do
name <- byteStringOf (skipSome (satisfyAscii (/= ';'))) <* skipAnyAsciiChar
temp <- anyFloat <* skipSatisfyAscii (== '\n')
return $! S name temp
parseMeasurements :: Hashtable BS.ByteString (V4 Float) -> ParserIO () ()
parseMeasurements !m = go where
go = withOption parseLine
(\(S k v) -> do
liftIO $ alter m (\x -> Just $ case x of Nothing -> (V4 v v v 1); Just (V4 a b c d) -> V4 (min a v) (max b v) (c+v) (d+1)) k
go)
(pure ())
runParserLazy :: ParserIO e () -> B.ByteString -> IO ()
runParserLazy p bs0 = B.foldrChunks step (\_ -> pure ()) bs0 (OK () mempty) where
step bs' go (OK () bs) = go =<< runParserIO p (bs <> bs')
step _ go x = go x
main :: IO ()
main = do
str <- B.readFile "measurements.txt"
ht <- initialize 10000 :: IO (Hashtable BS.ByteString (V4 Float))
_ <- runParserLazy (parseMeasurements ht) str
ls <- toList ht
foldr (\(k, V4 a b c d) go -> print (k, a, b, c/d) *> go) (pure ()) ls
This also requires the linear which gives us the ability to store the 4 floats next to each other in one big unboxed array which improves memory locality.
I think now it’s time to try to parallelize this.
So, now finally my solution. I’m sorry, that I didn’t have more time to add the correct output.
This single threaded version now takes about 100s and 15GB of RAM on my computer, which is about twice as long as the equivalent Go solution.
{-# LANGUAGE StrictData #-}
{-# LANGUAGE ViewPatterns #-}
module Main (main) where
import Data.Array.IO qualified as A
import Data.ByteString.Char8 qualified as BS
import Data.Char (ord)
import Data.Foldable (traverse_)
import Data.HashMap.Strict qualified as HM
import Data.Word (Word64)
data TemperatureData = TemperatureData (A.IOUArray Word64 Word64) (A.IOUArray Word64 Int) (A.IOUArray Word64 Int) (A.IOUArray Word64 Int)
parse :: Word64 -> BS.ByteString -> (HM.HashMap BS.ByteString Word64, TemperatureData) -> IO (HM.HashMap BS.ByteString Word64, TemperatureData)
parse _ (BS.null -> True) acc = pure acc
parse idx content (accMap, TemperatureData countsT sumT minT maxT) = do
let (stationName, rest') = BS.break (== ';') content
let rest2 = BS.drop 1 rest'
let (temp, rest) = case BS.uncons rest2 of
Nothing -> (0, rest2)
Just ('-', rest2') -> case BS.uncons rest2' of
Nothing -> (0, rest2')
Just (ch1, rest3) -> case BS.uncons rest3 of
Nothing -> (0, rest3)
Just ('.', rest4) -> case BS.uncons rest4 of
Nothing -> (0, rest4)
Just (ch2, rest5) -> ((-1) * (10 * ord ch1 + ord ch2 - 11 * ord '0'), rest5)
Just (ch2, rest4) -> case BS.uncons rest4 of
Nothing -> (0, rest4)
-- Must be '.'
Just (_, rest5) -> case BS.uncons rest5 of
Nothing -> (0, rest5)
Just (ch3, rest6) -> ((-1) * (100 * ord ch1 + 10 * ord ch2 + ord ch3 - 111 * ord '0'), rest6)
Just (ch1, rest3) -> case BS.uncons rest3 of
Nothing -> (0, rest3)
Just ('.', rest4) -> case BS.uncons rest4 of
Nothing -> (0, rest4)
Just (ch2, rest5) -> (10 * ord ch1 + ord ch2 - 11 * ord '0', rest5)
Just (ch2, rest4) -> case BS.uncons rest4 of
Nothing -> (0, rest4)
-- Must be '.'
Just (_, rest5) -> case BS.uncons rest5 of
Nothing -> (0, rest5)
Just (ch3, rest6) -> (100 * ord ch1 + 10 * ord ch2 + ord ch3 - 111 * ord '0', rest6)
(newAcc, newIdx) <- case HM.lookup stationName accMap of
Nothing -> do
A.writeArray countsT idx 1
A.writeArray sumT idx temp
A.writeArray minT idx temp
A.writeArray maxT idx temp
pure ((HM.insert stationName idx accMap, TemperatureData countsT sumT minT maxT), idx + 1)
Just idxM -> do
A.modifyArray' countsT idxM (+ 1)
A.modifyArray' sumT idxM (+ temp)
A.modifyArray' minT idxM (min temp)
A.modifyArray' maxT idxM (max temp)
pure ((accMap, TemperatureData countsT sumT minT maxT), idx)
parse newIdx (BS.drop 1 rest) newAcc
main :: IO ()
main = do
content <- BS.readFile "../measurements_big.txt"
c <- A.newArray_ (0 :: Word64, 10000 :: Word64)
s <- A.newArray_ (0 :: Word64, 10000 :: Word64)
m <- A.newArray_ (0 :: Word64, 10000 :: Word64)
n <- A.newArray_ (0 :: Word64, 10000 :: Word64)
(ma, TemperatureData rC rS rM rN) <- parse 0 content (HM.empty, TemperatureData c s m n)
let keys = HM.keys ma
traverse_
( \k ->
do
let i = HM.findWithDefault 0 k ma
count <- A.readArray rC i
sum <- A.readArray rS i
tMin <- A.readArray rM i
tMax <- A.readArray rN i
print (k, i, count, sum, tMin, tMax)
)
keys
This takes more than a minute less on my computer, and is now at 4 minutes. What I really like about your solution is the memory footprint, still only 60MB!
You folks are creating some extremely useful data about best practices in this thread! Keep it up!