reading files concurrently for fun
About
I was curious about the performance of reading a file in chunks as opposed to a sequential read operation.
Experiment
According to my concurrent read implementaiton, the result is roughly the same. I found online that someone else had also done the experiment but found significant performance improvements though.
package main
import (
"crypto/md5"
"fmt"
"io"
"log"
"os"
"slices"
"sync"
"time"
)
type ReadChunkOperation struct {
seq int
offset int
length int
}
type Chunk struct {
seq int
bytes []byte
}
func buildReadOps(size int, count int) (operations []ReadChunkOperation) {
remaining := size
length := size / count
seq := 0
for remaining > 0 {
if remaining < length {
length = remaining
}
operation := ReadChunkOperation{seq, size - remaining, length}
operations = append(operations, operation)
seq = seq + 1
remaining = remaining - length
}
return
}
func readSectionFromFile(fh io.ReaderAt, operation ReadChunkOperation) Chunk {
b := make([]byte, operation.length)
n := int64(operation.offset)
fh.ReadAt(b, n)
return Chunk{operation.seq, b}
}
func order(chunksCh chan Chunk) (chunks []Chunk) {
for chunk := range chunksCh {
chunks = append(chunks, chunk)
}
slices.SortFunc(chunks, func(a Chunk, b Chunk) int {
return int(a.seq - b.seq)
})
return
}
func readConcurrently(fh io.ReaderAt, operations []ReadChunkOperation) (content []byte) {
var wg sync.WaitGroup
ch := make(chan Chunk, 100)
for _, operation := range operations {
wg.Add(1)
go func() {
ch <- readSectionFromFile(fh, operation)
wg.Done()
}()
}
go func() {
wg.Wait()
close(ch)
}()
chunks := order(ch)
size := 0
for _, chunk := range chunks {
size = size + len(chunk.bytes)
}
content = make([]byte, size)
offset := 0
for _, chunk := range chunks {
copy(content[offset:], chunk.bytes)
offset = offset + len(chunk.bytes)
}
return
}
func timer(fn func()) time.Duration {
start := time.Now()
fn()
return time.Now().Sub(start)
}
func main() {
filename := "foobar.bin" // dd if=/dev/urandom of=foobar.bin bs=1G count=16
fi, err := os.Stat(filename)
if err != nil {
log.Fatal(err)
}
size := fi.Size()
operations := buildReadOps(int(size), 32)
fh, err := os.Open(filename)
if err != nil {
log.Fatal(err)
}
a := []byte{'a'}
b := []byte{'b'}
concurrencyReadDuration := timer(func() {
a = readConcurrently(fh, operations)
})
fmt.Printf("concurrent read (%x) performance: %s\n", md5.Sum(a), concurrencyReadDuration)
sequentialReadDuration := timer(func() {
b = make([]byte, size)
fh.ReadAt(b, 0)
})
fmt.Printf("sequential read (%x) performance: %s\n", md5.Sum(b), sequentialReadDuration)
}