One Billion Row challenge in Hs

I would probably need to profile whether parsing is actually the bottleneck, but have you tried replacing the 5 unsafeIndexes by one 4-byte aligned read of a Word64 and then shifting stuff around? Perhaps the LLVM backend will take care this for you; on the other hand it’s a bit unlikely through all those withForeignPtr abstractions. Otherwise, 5 separate reads seems a bit expensive.

Also perhaps there is a large number of clashes in your hash table, resulting in a non-negligible number of go iterations? Unlikely, but perhaps worth checking.

If parsing is a bottleneck now, then we might want to explore using rank-select bit strings: Haskell Works Blog - Introduction to the rank-select bit-string

There’s a whole series of blog posts including CSV parsing examples which are similar to what we’re parsing: Haskell Works Blog - Archives

That slowed it down from 15.375s to 17.676s for me. And doesn’t lines already use a fast SIMD memchr?

Also, if I remove the hash table insertion from the program (but still make sure parseLine runs) the running time is 9.830s for me, so it seems more than 60% of the time is spent on parsing. If I also remove parseLine then it takes only about 5 seconds.

It’s good to know about the parsing time!
For me the custom newline search was a small but reliable improvement. Memchr is fast indeed, but our searches are very short and I thought that the out-of-line C call could eat up the bonus.

A couple of benchmarking notes for those comparing to the Danny van Kooten version:

  1. His processor – AMD Ryzen 4800U and 16 GB of RAM – can run single threaded at 4.2Ghz (mine does 2.8Ghz). Further, its an 8 core processor: 16 with hyper threading.

  2. Assuming the effect of mmap is a constant (its just read speed, so it should be), his best single threaded version is about 20 - (6-1.6) = 15.6s.

  3. His input file is 12Gb. Mine is 15.5Gb, about 30% bigger.

@ReleaseCandidate – what’s the file size, processor speed and RAM on the computer you’re using? Also is it running off of an encrypted disk or vanilla? We might be beating the above already given like for like conditions.

I’ve already looked at Danny van Kooten’s version and I would say it isn’t optimised much at all (except for the parallel reading and a trick with forking and un-mapping the data file). I could not get it to run successfully even without this forking and piping, I always get a segfault in one of the worker threads.

I quickly made a single threaded version out of it to get some baseline comparison (and fixed the segfault).

C code
#include <fcntl.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

#define MAX_DISTINCT_GROUPS    10000
#define MAX_GROUPBY_KEY_LENGTH 100

// Capacity of our hashmap
// Needs to be a power of 2
// so we can bit-and instead of modulo
#define HASHMAP_CAPACITY 16384
#define HASHMAP_INDEX(h) (h & (HASHMAP_CAPACITY - 1))

struct Group
{
    unsigned int count;
    int min;
    int max;
    long sum;
    char key[MAX_GROUPBY_KEY_LENGTH];
};

struct Result
{
    unsigned int n;
    unsigned int map[HASHMAP_CAPACITY];
    struct Group groups[MAX_DISTINCT_GROUPS];
};

// parses a floating point number as an integer
// this is only possible because we know our data file has only a single decimal
static inline const char *parse_number(int *dest, const char *s)
{
    // parse sign
    int mod = 1;
    if (*s == '-')
    {
        mod = -1;
        s++;
    }

    if (s[1] == '.')
    {
        *dest = ((s[0] * 10) + s[2] - ('0' * 11)) * mod;
        return s + 4;
    }

    *dest = (s[0] * 100 + s[1] * 10 + s[3] - ('0' * 111)) * mod;
    return s + 5;
}

static inline int min(int a, int b)
{
    return a < b ? a : b;
}
static inline int max(int a, int b)
{
    return a > b ? a : b;
}

// qsort callback
static inline int cmp(const void *ptr_a, const void *ptr_b)
{
    return strcmp(((struct Group *)ptr_a)->key, ((struct Group *)ptr_b)->key);
}

static struct Result *process_chunk(char *data, size_t data_size)
{
    // initialize result
    struct Result *result = malloc(sizeof(*result));
    if (!result)
    {
        perror("malloc error");
        exit(EXIT_FAILURE);
    }
    result->n = 0;

    // we could do this in a single call to memset
    // since the two are contiguous in memory
    // but this code is only called NTHREADS times
    // so not really worth it
    memset(result->map, 0, HASHMAP_CAPACITY * sizeof(*result->map));
    memset(result->groups, 0, MAX_DISTINCT_GROUPS * sizeof(*result->groups));

    const char *s            = data;
    const char *end          = &data[data_size - 1];
    const char *last_but_one = end - 1;

    // flaming hot loop
    while (s < last_but_one)
    {
        const char *linestart = s;

        // find position of ;
        // while simulatenuously hashing everything up to that point
        unsigned int len = 0;
        unsigned int h   = 0;
        while (s[len] != ';')
        {
            h = (h * 31) + (unsigned char)s[len];
            len += 1;
        }

        // parse decimal number as int
        int temperature;
        s = parse_number(&temperature, linestart + len + 1);

        // probe map until free spot or match
        unsigned int *c = &result->map[HASHMAP_INDEX(h)];
        while (*c > 0 && memcmp(result->groups[*c].key, linestart, len) != 0)
        {
            h += 1;
            c = &result->map[HASHMAP_INDEX(h)];
        }

        // new hashmap entry
        if (*c == 0)
        {
            *c = result->n;
            memcpy(result->groups[*c].key, linestart, len);
            result->n++;
        }

        // existing entry
        result->groups[*c].count += 1;
        result->groups[*c].min = min(result->groups[*c].min, temperature);
        result->groups[*c].max = max(result->groups[*c].max, temperature);
        result->groups[*c].sum += temperature;
    }

    return result;
}

static void result_to_str(char *dest, const struct Result *result)
{
    char buf[128];
    *dest++ = '{';
    for (unsigned int i = 0; i < result->n; i++)
    {
        size_t n = (size_t)snprintf(
            buf, 128, "%s=%.1f/%.1f/%.1f%s", result->groups[i].key,
            (float)result->groups[i].min / 10.0,
            ((float)result->groups[i].sum / (float)result->groups[i].count)
                / 10.0,
            (float)result->groups[i].max / 10.0,
            i < (result->n - 1) ? ", " : "");

        memcpy(dest, buf, n);
        dest += n;
    }
    *dest++ = '}';
    *dest   = 0x0;
}

int main(int argc, char **argv)
{
    char *file = "../measurements_big.txt";
    if (argc > 1)
    {
        file = argv[1];
    }

    int fd = open(file, O_RDONLY);
    if (!fd)
    {
        perror("error opening file");
        exit(EXIT_FAILURE);
    }

    struct stat sb;
    if (fstat(fd, &sb) == -1)
    {
        perror("error getting file size");
        exit(EXIT_FAILURE);
    }

    // mmap entire file into memory
    size_t sz  = (size_t)sb.st_size;
    char *data = mmap(NULL, sz, PROT_READ, MAP_SHARED, fd, 0);
    if (data == MAP_FAILED)
    {
        perror("error mmapping file");
        exit(EXIT_FAILURE);
    }

    struct Result *result = process_chunk(data, sz);

    // sort results alphabetically
    qsort(result->groups, (size_t)result->n, sizeof(*result->groups), cmp);

    char *buf = malloc(240551);
    result_to_str(buf, result);
    puts(buf);

    // clean-up
    munmap((void *)data, sz);
    close(fd);

    return EXIT_SUCCESS;
}

I compiled it using his flags:

clang -std=c11 -O3 -march=native -mtune=native -flto -Wall -Wextra -Wpedantic -Wformat=2 -Wconversion -Wundef -Winline -Wimplicit-fallthrough test.c -o testc 

The result contains negative zeroes -0.0 and a , at the end, but is correct.
The benchmark: runtime is 27.5s, needs 8MB of RAM.

% hyperfine -r 5 -w 1 './testc > solution.txt'
Benchmark 1: ./testc > solution.txt
  Time (mean ± σ):     27.489 s ±  0.027 s    [User: 25.152 s, System: 2.271 s]
  Range (min … max):   27.452 s … 27.521 s    5 runs

My data

Apple M1 Max Pro
10Cores - 8 performance 2 efficiency
Max. Clock 3206 MHz (it did run at that frequency, I checked that)
L3 Cache 48MB
SSD has a measured speed of 4629.7MB/s write 5180.3 MB/s read

My data file is 14.9GB.

Btw. looking for a Fortran version I’ve found GitHub - sshestov/1brc-fortran, which (because of too many hash collisions) only returns 8350 instead of 8939 stations as a result and runs in 3.5s using 16 threads. That’s why I always check the results :wink:

If somebody has a AVX2 CPU on Linux: this should be the fastest C solution (if the title is true)

2 Likes

That takes 0.7 seconds on my machine with 6 workers, very impressive.

Yeah me and sshestov seem to be the only people with Fortran solutions at the moment :slight_smile: My latest hash + mmap version is clocking in at 38s (single threaded). I’ve tried to hand code some SIMD stuff but its difficult to beat the intrinsic functions. I’ll make it parallel soon and put it on a decent AWS machine for a fair go.

I’ve played around with it a bit. Looks like there’s a GHC RTS limitation. When I run -N8 with 8 threads (no allocations except at the end) it somehow blocks I/O in such a way that the program cannot use more than 400-600% CPU. If I add yield it can help with one or two read threads, but not more. If I use -N9 (ideally nProcessThreads + nReadThreads) then it works good, but indeed uses more than 800% CPU making it an incorrect benchmark.

I don’t know yet what is the source of the RTS behavior, so I just replaced parallel read threads with a simple reading in the same thread.

I also found that mmap works worse in a parallel scenario than parallel hGetBuf (perhaps too many conflicting page table mappings), while it’s slightly better in the sequential case.

The challenge seems to be to make as few memory accesses as possible, so I replaced the array-based hash table with a flat C-like blob (the same way others did, but using plain Foreign.Ptr.peek/poke without any intrinsics).

I have updated my gist. It runs faster than @jaror’s version, but slower than yours (and probably @AndrasKovacs’) because it doesn’t have any short-string optimisations (BTW, your version speeds up well with a -fllvm flag).

In multi-threaded variant it’s slightly slower than 7.c version (but faster with 24 threads). So:

  • a safe no-brainer version (unboxed array for counters) runs 2-4 times slower than C.
  • unsafe C-like version built on Foreign.Ptr runs about 1-1.25 times slower than C.
  • Data.Primitive looks more safe, but requires digging into intrinsics and might not always give better performance than Ptr.peek/poke
  • -fllvm can help.
  • minimizing memory access and any computing overhead matters more than any smart algorithms in this case.
  • mmap might not always be the best.
  • there are some issues in GHC RTS with tight non-allocating loops + I/O.

AVX is cool, I don’t think it’s doable in Haskell (that why we have FFI).

2 Likes

That’s interesting, because on my computer I had to remove it to gain about 300ms. By adding some bangs I shaved off another 1.5s, but nothing significant or interesting.

Talking about SIMD and auto-vectorisation of unrolled loops: does GHC even have the concept of loop unrolling?

Your version needs 103MB RAM, and uses 7 CPUs: 23s

hyperfine -w 1 -r 5  './exe-exe > solution.txt'
Benchmark 1: ./exe-exe > solution.txt
  Time (mean ± σ):     22.919 s ±  0.027 s    [User: 153.921 s, System: 7.697 s]
  Range (min … max):   22.897 s … 22.962 s    5 runs

Parallel version
Using 10 threads (the actual number of physical cores) did yield the best result, 6s using 25MB of RAM.

gfortran -fopenmp -march=native -ffast-math -O2 -o testf test.f90
hyperfine -w 1 -r 5  './testf > solution.txt'
Benchmark 1: ./testf > solution.txt
  Time (mean ± σ):      5.911 s ±  0.023 s    [User: 54.546 s, System: 1.320 s]
  Range (min … max):    5.881 s …  5.938 s    5 runs

22s total / 154s user times doesn’t look right. Have you compiled with -O2? (I’ve added it to the gist not to have such issues in future)

On my machine it was about 15s->13s total time difference. A very nice boost.

Oh, sorry, no. Using -O2 changes the time to (with a VM in the background, so I should get better results later): 4.5s!

Benchmark 1: ./exe-exe > solution.txt
  Time (mean ± σ):      4.312 s ±  0.012 s    [User: 32.158 s, System: 1.299 s]
  Range (min … max):    4.297 s …  4.324 s    5 runs

The used GHC options:

ghc-options:
  - -fspec-constr-count=100
  - -threaded
  - -O2
  - -Wall
  - -Wno-gadt-mono-local-binds
  - -Wno-type-defaults
1 Like

If you have time try changing the code to integer, parameter :: parts = 20, that will change the number of threads to 20 and allocate one part of the buffer to each thread. Given hyper-threading, the number of parts should be double the number of CPUs. I have a 4 core CPU, the best time I’ve got with 8 parts is 6.2s!

I did try that - I’ve tried 8, 10, 16 (your default), 20, 40 and 200. 10 had been the fastest, followed by 16, 20 and 8 (I’m not sure about 40).

1 Like

I got a WIP attempt ( just using print atm) with fairly idiomatic Haskell, now if there isn’t any bugs, on my 5900X with the 10M sample it’s 8s single core, and 0.6s with 6 cores, performance declines with more cores.

The problem is that it needs up to 91GB, but I only have 32GB, so it’s swapping. After a runtime of about 30 minutes I’ve stopped it.

The time is minimally better, but still at 4.5s.

% hyperfine -w 1 -r 5  './exe-exe  > solution.txt'
Benchmark 1: ./exe-exe > solution.txt
  Time (mean ± σ):      4.306 s ±  0.009 s    [User: 32.170 s, System: 1.294 s]
  Range (min … max):    4.295 s …  4.317 s    5 runs

I quickly found the culprit, using a bounded queue kept it under 100MB, a pure single threaded implementation with strict foldl’ and strict BS reads also slowed it down by 10x and causes a space leak (perhaps reads aren’t blocked), not sure why… Haskell has some bizarre performance issues that go completely against my intuition.

Irony that things only got slower the more I tried profiling the code lol.

Update:

Pushed a new version with 14s on 100M entries.

Map.unionWith (<>) . Map.singleton is faster than Map.alter

Gotten majority of the time down to parsing

Haskell’s multithreading RTS is really bad at migrating threads around passing -qm to a single threaded implementation can double the performance.

I see lots of weird performance qwerks because of this, for example running -N5 and spawning 1 reader, and 4 processing threads is far faster than -N and expecting the RTS to “do the right thing”, performance will degrade with more threads as well.

My hunch is that Haskell’s heap and threading system makes too many assumptions about memory latency, and hates the L3 cache on my Zen 3 CPU.

Oh yes, this is better :smiley:

The format of the result is different, but the number of stations is correct and the values look correct too.

In multi-threaded mode it needs 21MB of RAM and 8 threads, but uses only 120% of CPU (instead of 800% for 8 fully used cores). The time is therefore longer than in the single-thread version, 320s, 5min 20s.

% hyperfine -w 1 -r 5 './1brc m > solution.txt'
Benchmark 1: ./1brc m > solution.txt
  Time (mean ± σ):     318.603 s ± 10.467 s    [User: 365.078 s, System: 9.115 s]
  Range (min … max):   305.822 s … 332.892 s    5 runs

In single-threaded mode: 16MB RAM, 100% CPU and a runtime of 290s, 4m 50s.

% hyperfine -w 1 -r 5 './1brc s > solution.txt'
Benchmark 1: ./1brc s > solution.txt
  Time (mean ± σ):     290.503 s ±  4.688 s    [User: 285.638 s, System: 2.397 s]
  Range (min … max):   283.823 s … 294.683 s    5 runs

The multithreaded one needs +RTS -N<n>, 120% CPU and only 20MB of RAM would hint it’s set to 1 (the default), mine seems to bottleneck around 4-5 threads.