August Feng

reading files concurrently for fun

About

I was curious about the performance of reading a file in chunks as opposed to a sequential read operation.

Experiment

According to my concurrent read implementaiton, the result is roughly the same. I found online that someone else had also done the experiment but found significant performance improvements though.

  package main

  import (
  	"crypto/md5"
  	"fmt"
  	"io"
  	"log"
  	"os"
  	"slices"
  	"sync"
  	"time"
  )

  type ReadChunkOperation struct {
  	seq    int
  	offset int
  	length int
  }

  type Chunk struct {
  	seq   int
  	bytes []byte
  }

  func buildReadOps(size int, count int) (operations []ReadChunkOperation) {
  	remaining := size
  	length := size / count
  	seq := 0

  	for remaining > 0 {
  		if remaining < length {
  			length = remaining
  		}
  		operation := ReadChunkOperation{seq, size - remaining, length}
  		operations = append(operations, operation)
  		seq = seq + 1
  		remaining = remaining - length
  	}
  	return
  }

  func readSectionFromFile(fh io.ReaderAt, operation ReadChunkOperation) Chunk {
  	b := make([]byte, operation.length)
  	n := int64(operation.offset)
  	fh.ReadAt(b, n)
  	return Chunk{operation.seq, b}
  }

  func order(chunksCh chan Chunk) (chunks []Chunk) {
  	for chunk := range chunksCh {
  		chunks = append(chunks, chunk)

  	}

  	slices.SortFunc(chunks, func(a Chunk, b Chunk) int {
  		return int(a.seq - b.seq)
  	})

  	return
  }

  func readConcurrently(fh io.ReaderAt, operations []ReadChunkOperation) (content []byte) {
  	var wg sync.WaitGroup
  	ch := make(chan Chunk, 100)

  	for _, operation := range operations {
  		wg.Add(1)
  		go func() {
  			ch <- readSectionFromFile(fh, operation)
  			wg.Done()
  		}()
  	}

  	go func() {
  		wg.Wait()
  		close(ch)
  	}()

  	chunks := order(ch)

  	size := 0
  	for _, chunk := range chunks {
  		size = size + len(chunk.bytes)
  	}

  	content = make([]byte, size)
  	offset := 0
  	for _, chunk := range chunks {
  		copy(content[offset:], chunk.bytes)
  		offset = offset + len(chunk.bytes)
  	}

  	return
  }

  func timer(fn func()) time.Duration {
  	start := time.Now()
  	fn()
  	return time.Now().Sub(start)
  }

  func main() {

  	filename := "foobar.bin" // dd if=/dev/urandom of=foobar.bin bs=1G count=16

  	fi, err := os.Stat(filename)
  	if err != nil {
  		log.Fatal(err)
  	}

  	size := fi.Size()

  	operations := buildReadOps(int(size), 32)

  	fh, err := os.Open(filename)
  	if err != nil {
  		log.Fatal(err)
  	}

  	a := []byte{'a'}
  	b := []byte{'b'}

  	concurrencyReadDuration := timer(func() {
  		a = readConcurrently(fh, operations)
  	})
  	fmt.Printf("concurrent read (%x) performance: %s\n", md5.Sum(a), concurrencyReadDuration)

  	sequentialReadDuration := timer(func() {
  		b = make([]byte, size)
  		fh.ReadAt(b, 0)
  	})
  	fmt.Printf("sequential read (%x) performance: %s\n", md5.Sum(b), sequentialReadDuration)
  }