One Billion Row challenge in Hs

Yes, here’s an implementation that uses flatparse:

import qualified Data.HashMap.Strict as M
import qualified Data.ByteString.Char8 as BS
import FlatParse.Basic hiding (char, isDigit)
import Data.Char

data T = T !Float !Float !Float !Float
data S = S !BS.ByteString !Float

anyFloat :: Parser () Float
anyFloat = withOption (satisfyAscii (== '-')) (\_ -> negate <$> anyPositiveFloat) anyPositiveFloat where
  anyPositiveFloat = do
    x <- anyAsciiDecimalInt <* skipSatisfyAscii (== '.') 
    c <- satisfyAscii isDigit
    pure $! fromIntegral x + fromIntegral (ord c - ord '0') * 0.1

parseLine :: Parser () S
parseLine = do
  name <- byteStringOf (skipSome (satisfyAscii (/= ';'))) <* skipAnyAsciiChar
  f <- anyFloat <* skipSatisfyAscii (== '\n')
  return $! S name f

parseMeasurements :: Parser () (M.HashMap BS.ByteString T)
parseMeasurements = go M.empty where
  go !m = withOption parseLine (\(S k v) -> go (M.insertWith f k (T v v v 1) m)) (pure m)
  f (T a b c d) (T a' b' c' d') = T (min a a') (max b b') (c+c') (d+d')

main = do
  str <- BS.readFile "measurements.txt"
  OK x _ <- pure $ runParser parseMeasurements str
  M.foldrWithKey (\k (T a b c d) go -> print (k, a, b, c/d) *> go) (pure ()) x

That’s the fastest solution I’ve seen on my machine up to now, clocking in at about 1 second for 10 million rows.

7 Likes

It doesn’t seem to output anything when I run it, but so far as I can see it should print the tuples.

1 Like

I’ve fixed it now. The problem was that parseMeasurements returned a M.empty if it couldn’t parse any more lines, but that should have of course been the accumulated m.

2 Likes

Very impressive – its at about 2s on my machine at just 25 LoC. It’d finish 1B in about 3m20 single threaded: I suspect that on 8 cores with a 75% efficiency, it would finish in around 30 seconds.

1 Like

Impressive! I appreciate the return $!/pure $!!

Yeah, I think it would be great if flatparse would provide a “value strict” Parser.


I have now also implemented a little adapter to make flatparse work with lazy bytestrings:

-- needs to be top-level now
f :: T -> T -> T
f (T a b c d) (T a' b' c' d') = T (min a a') (max b b') (c+c') (d+d')

runParserLazy :: (a -> a -> a) -> a -> Parser e a -> B.ByteString -> Result e a
runParserLazy k z p = B.foldlChunks step (OK z mempty) where
  step (OK x bs) bs' = k x <$> runParser p (bs <> bs')
  step x _ = x

main :: IO ()
main = do
  str <- B.readFile "measurements.txt"
  OK x _ <- pure $ runParserLazy (M.unionWith f) M.empty parseMeasurements str
  M.foldrWithKey (\k (T a b c d) go -> print (k, a, b, c/d) *> go) (pure ()) x

That doesn’t decrease the time on my machine by much, it is still 1.15 seconds. And it reduces the maximum residency from 138,019,888 to 207,456.

1 Like

Sounds like a good idea to make pure/return strict by default. For some reason I thought that this was already the case but apparently not.

The runParserLazy also looks very useful! However, if I’m correct, it only works if the bytestring chunks are the same as the chunks accepted by the parser. I think it should be possible to do a version which works with arbitrary chunking.

3 Likes

It doesn’t require the “parser chunks” to be exactly the same as the “bytestring chunks”.

The main assumption is that the parser is parsing some repetitive structure, and requires that you can provide an a -> a -> a function to combine the parser chunks.

The way it works is: when the parser stops before the end of a bytestring chunk with an OK result then runParserLazy simply appends the remainder of the chunk onto the beginning of the next chunk (this is technically a slow O(n) operation, but practically it seems fast enough).

So it must be noted that the parser chunks should be significantly smaller than the bytestring chunks, otherwise I think this approach will be slower. And this approach won’t work at all if the parser chunks are larger than the bytestring chunks.

So, I’m not sure how generally applicable this approach is. I guess it works well for parsing logs like this with many small entries, but I don’t think it would work well for arbitrary JSON or things like that.

OK, I was missing that. But as you say it doesn’t work with arbitrary chunking. I remember now that I considered generic lazy bytestring parsing before, and it would basically require duplicating the current codebase to support suspension and resumption for the parsing thread… Not too difficult technically but rather annoying to implement. Your version is interesting in that it does not require generic resumption.

Your output isn’t sorted, the station names are not properly encoded and has another format.

This needs a bit more than 5 minutes for the “real” data on my machine, but mainly 30GB RAM.
So a bit more than double the time (2min 20s on my computer) of

Fair enough, for now I thought we were dropping those constraints when exploring the simple solutions. I don’t think implementing those features will have a big impact on the running time because the output size is much smaller than the input.

Then you’re probably not using the runParserLazy function that I added in a later post: One Billion Row challenge in Hs - #66 by jaror

It’s not about the time needed, but the possibility to check the output.

No, because I’d need to do more than just cut and paste and change the import to use the lazy bytestring.

It’s really just a matter of copy pasting, removing the old main function and adding an import qualified Data.ByteString.Lazy as B and optionally removeing the local f binding from parseMeasurements that I’ve now floated to the top level.

But here's the full file if you just want to copy and paste one thing:
import qualified Data.HashMap.Strict as M
import qualified Data.ByteString.Char8 as BS
import qualified Data.ByteString.Lazy as B
import FlatParse.Basic hiding (char, isDigit)
import Data.Char

data T = T !Float !Float !Float !Float
data S = S !BS.ByteString !Float

anyFloat :: Parser () Float
anyFloat = withOption (satisfyAscii (== '-')) (\_ -> negate <$> anyPositiveFloat) anyPositiveFloat where
  anyPositiveFloat = do
    x <- anyAsciiDecimalInt <* skipSatisfyAscii (== '.') 
    c <- satisfyAscii isDigit
    pure $! fromIntegral x + fromIntegral (ord c - ord '0') * 0.1

parseLine :: Parser () S
parseLine = do
  name <- byteStringOf (skipSome (satisfyAscii (/= ';'))) <* skipAnyAsciiChar
  f <- anyFloat <* skipSatisfyAscii (== '\n')
  return $! S name f

parseMeasurements :: Parser () (M.HashMap BS.ByteString T)
parseMeasurements = go M.empty where
  go !m = withOption parseLine (\(S k v) -> go (M.insertWith f k (T v v v 1) m)) (pure m)

f :: T -> T -> T
f (T a b c d) (T a' b' c' d') = T (min a a') (max b b') (c+c') (d+d')

runParserLazy :: (a -> a -> a) -> a -> Parser e a -> B.ByteString -> Result e a
runParserLazy k z p = B.foldlChunks step (OK z mempty) where
  step (OK x bs) bs' = k x <$> runParser p (bs <> bs')
  step x _ = x

main :: IO ()
main = do
  str <- B.readFile "measurements.txt"
  OK x _ <- pure $ runParserLazy (M.unionWith f) M.empty parseMeasurements str
  M.foldrWithKey (\k (T a b c d) go -> print (k, a, b, c/d) *> go) (pure ()) x

No, that didn’t work because of the type of name. I just had been to lazy to look into getting it to compile ;).

This version now needs only 64MB of RAM, 23 Threads? (although only a single one of these does any work) but takes 5:43.67 to complete, about 30s more than the original version.

Those results are very strange. On my machine the concurrent implementation by @vaibhavsagar takes 83 seconds (which is now the fastest I’ve tested) and my single-threaded implementation takes about 110 seconds.

Are you using any special compilation options? Which GHC version are you using?

I’m using cabal like this:

cabal run -O2 exes -- +RTS -s

With GHC 9.4.8.

My single-threaded statistics:
 775,835,964,136 bytes allocated in the heap
  12,633,241,992 bytes copied during GC
         207,784 bytes maximum residency (2 sample(s))
          32,920 bytes maximum slop
               7 MiB total memory in use (0 MB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0     181936 colls,     0 par    5.041s   5.112s     0.0000s    0.0001s
  Gen  1         2 colls,     0 par    0.000s   0.000s     0.0001s    0.0001s

  INIT    time    0.000s  (  0.000s elapsed)
  MUT     time  105.478s  (105.368s elapsed)
  GC      time    5.042s  (  5.113s elapsed)
  EXIT    time    0.000s  (  0.009s elapsed)
  Total   time  110.520s  (110.490s elapsed)

  %GC     time       0.0%  (0.0% elapsed)

  Alloc rate    7,355,418,908 bytes per MUT second

  Productivity  95.4% of total user, 95.4% of total elapsed
Statistics of the other concurrent implementation:
 475,839,555,232 bytes allocated in the heap
   2,203,512,336 bytes copied during GC
  13,795,569,976 bytes maximum residency (7 sample(s))
         815,080 bytes maximum slop
           13219 MiB total memory in use (0 MB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0     111308 colls, 111308 par   13.931s   3.891s     0.0000s    0.0010s
  Gen  1         7 colls,     6 par    0.004s   0.454s     0.0649s    0.4529s

  Parallel GC work balance: 0.03% (serial 0%, perfect 100%)

  TASKS: 26 (1 bound, 25 peak workers (25 total), using -N12)

  SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.003s  (  0.002s elapsed)
  MUT     time   88.960s  ( 78.875s elapsed)
  GC      time   13.935s  (  4.345s elapsed)
  EXIT    time    0.455s  (  0.009s elapsed)
  Total   time  103.354s  ( 83.230s elapsed)

  Alloc rate    5,348,887,140 bytes per MUT second

  Productivity  86.1% of total user, 94.8% of total elapsed

Hmm, 9.6.4 (ARM64), and -O2 -threaded +RTS -N all the other flags are just warnings and GHC2021

Running under the same config my program does indeed slow down a bit, but only to 130 seconds. I think that is caused by the -N option, if you use -N1 instead it should already be a bit faster, but disabling threading altogether should shave off a few more seconds. Perhaps the remaining difference is just the difference between AMD64 and ARM64?

You could also try -fllvm perhaps that produces better code for ARM than GHC’s native code gen.

I've now tried out using vector-hashtables for mutable hashtables and it improves the performance more so that it now takes less than 80 seconds for the full 1 billion rows:
import qualified Data.ByteString.Char8 as BS
import qualified Data.ByteString.Lazy as B
import FlatParse.Basic hiding (char, isDigit)
import Data.Char
import Control.Monad.IO.Class
import qualified Data.Vector.Mutable as VM
import qualified Data.Vector.Unboxed.Mutable as UM
import Data.Vector.Hashtables
import Linear.V4

type Hashtable k v = Dictionary (PrimState IO) VM.MVector k UM.MVector v

data S = S !BS.ByteString !Float

anyFloat :: ParserT st () Float
anyFloat = withOption (satisfyAscii (== '-')) (\_ -> negate <$> anyPositiveFloat) anyPositiveFloat where
  anyPositiveFloat = do
    x <- anyAsciiDecimalInt <* skipSatisfyAscii (== '.') 
    c <- satisfyAscii isDigit
    pure $! fromIntegral x + fromIntegral (ord c - ord '0') * 0.1

parseLine :: ParserT st () S
parseLine = do
  name <- byteStringOf (skipSome (satisfyAscii (/= ';'))) <* skipAnyAsciiChar
  temp <- anyFloat <* skipSatisfyAscii (== '\n')
  return $! S name temp

parseMeasurements :: Hashtable BS.ByteString (V4 Float) -> ParserIO () ()
parseMeasurements !m = go where
  go = withOption parseLine 
    (\(S k v) -> do
      liftIO $ alter m (\x -> Just $ case x of Nothing -> (V4 v v v 1); Just (V4 a b c d) -> V4 (min a v) (max b v) (c+v) (d+1)) k
      go)
    (pure ())

runParserLazy :: ParserIO e () -> B.ByteString -> IO ()
runParserLazy p bs0 = B.foldrChunks step (\_ -> pure ()) bs0 (OK () mempty) where
  step bs' go (OK () bs) = go =<< runParserIO p (bs <> bs')
  step _ go x = go x

main :: IO ()
main = do
  str <- B.readFile "measurements.txt"
  ht <- initialize 10000 :: IO (Hashtable BS.ByteString (V4 Float))
  _ <- runParserLazy (parseMeasurements ht) str
  ls <- toList ht
  foldr (\(k, V4 a b c d) go -> print (k, a, b, c/d) *> go) (pure ()) ls

This also requires the linear which gives us the ability to store the 4 floats next to each other in one big unboxed array which improves memory locality.

I think now it’s time to try to parallelize this.

5 Likes

So, now finally my solution. I’m sorry, that I didn’t have more time to add the correct output.

This single threaded version now takes about 100s and 15GB of RAM on my computer, which is about twice as long as the equivalent Go solution.

{-# LANGUAGE StrictData #-}
{-# LANGUAGE ViewPatterns #-}

module Main (main) where

import Data.Array.IO qualified as A
import Data.ByteString.Char8 qualified as BS
import Data.Char (ord)
import Data.Foldable (traverse_)
import Data.HashMap.Strict qualified as HM
import Data.Word (Word64)

data TemperatureData = TemperatureData (A.IOUArray Word64 Word64) (A.IOUArray Word64 Int) (A.IOUArray Word64 Int) (A.IOUArray Word64 Int)

parse :: Word64 -> BS.ByteString -> (HM.HashMap BS.ByteString Word64, TemperatureData) -> IO (HM.HashMap BS.ByteString Word64, TemperatureData)
parse _ (BS.null -> True) acc = pure acc
parse idx content (accMap, TemperatureData countsT sumT minT maxT) = do
  let (stationName, rest') = BS.break (== ';') content
  let rest2 = BS.drop 1 rest'
  let (temp, rest) = case BS.uncons rest2 of
        Nothing -> (0, rest2)
        Just ('-', rest2') -> case BS.uncons rest2' of
          Nothing -> (0, rest2')
          Just (ch1, rest3) -> case BS.uncons rest3 of
            Nothing -> (0, rest3)
            Just ('.', rest4) -> case BS.uncons rest4 of
              Nothing -> (0, rest4)
              Just (ch2, rest5) -> ((-1) * (10 * ord ch1 + ord ch2 - 11 * ord '0'), rest5)
            Just (ch2, rest4) -> case BS.uncons rest4 of
              Nothing -> (0, rest4)
              -- Must be '.'
              Just (_, rest5) -> case BS.uncons rest5 of
                Nothing -> (0, rest5)
                Just (ch3, rest6) -> ((-1) * (100 * ord ch1 + 10 * ord ch2 + ord ch3 - 111 * ord '0'), rest6)
        Just (ch1, rest3) -> case BS.uncons rest3 of
          Nothing -> (0, rest3)
          Just ('.', rest4) -> case BS.uncons rest4 of
            Nothing -> (0, rest4)
            Just (ch2, rest5) -> (10 * ord ch1 + ord ch2 - 11 * ord '0', rest5)
          Just (ch2, rest4) -> case BS.uncons rest4 of
            Nothing -> (0, rest4)
            -- Must be '.'
            Just (_, rest5) -> case BS.uncons rest5 of
              Nothing -> (0, rest5)
              Just (ch3, rest6) -> (100 * ord ch1 + 10 * ord ch2 + ord ch3 - 111 * ord '0', rest6)
  (newAcc, newIdx) <- case HM.lookup stationName accMap of
    Nothing -> do
      A.writeArray countsT idx 1
      A.writeArray sumT idx temp
      A.writeArray minT idx temp
      A.writeArray maxT idx temp
      pure ((HM.insert stationName idx accMap, TemperatureData countsT sumT minT maxT), idx + 1)
    Just idxM -> do
      A.modifyArray' countsT idxM (+ 1)
      A.modifyArray' sumT idxM (+ temp)
      A.modifyArray' minT idxM (min temp)
      A.modifyArray' maxT idxM (max temp)
      pure ((accMap, TemperatureData countsT sumT minT maxT), idx)
  parse newIdx (BS.drop 1 rest) newAcc

main :: IO ()
main = do
  content <- BS.readFile "../measurements_big.txt"
  c <- A.newArray_ (0 :: Word64, 10000 :: Word64)
  s <- A.newArray_ (0 :: Word64, 10000 :: Word64)
  m <- A.newArray_ (0 :: Word64, 10000 :: Word64)
  n <- A.newArray_ (0 :: Word64, 10000 :: Word64)
  (ma, TemperatureData rC rS rM rN) <- parse 0 content (HM.empty, TemperatureData c s m n)
  let keys = HM.keys ma
  traverse_
    ( \k ->
        do
          let i = HM.findWithDefault 0 k ma
          count <- A.readArray rC i
          sum <- A.readArray rS i
          tMin <- A.readArray rM i
          tMax <- A.readArray rN i
          print (k, i, count, sum, tMin, tMax)
    )
    keys

This takes more than a minute less on my computer, and is now at 4 minutes. What I really like about your solution is the memory footprint, still only 60MB!

2 Likes