August Feng

System.IO.Pipelines in F#

About

The .NET documentation demonstrates code that uses System.IO.Pipelines. I wanted to learn about it in F# style.

open System
open System.Buffers
open System.IO.Pipelines
open System.Net
open System.Net.Sockets
open System.Text.Json
open System.Threading.Tasks

let startServer (ip : string) port =
    let ip = IPAddress.Parse(ip)
    let listener = new TcpListener(ip, port)
    listener.Start()
    listener.AcceptSocket()

let buildPipe () =
    let pipe = Pipe()
    pipe.Reader, pipe.Writer

let write (pipe : PipeWriter) (socket : Socket) = task {
    let mutable c = true
    while c do
        let m = pipe.GetMemory()
        let! n = socket.ReceiveAsync(m)
        pipe.Advance(n)
        let! fr = pipe.FlushAsync()
        if fr.IsCompleted then c <- false
        do! Task.Yield()
    do! pipe.CompleteAsync()
}

let tryReadLine (buffer : ReadOnlySequence<byte>) =
    let b = buffer.Slice(0, buffer.Length) |> _.ToArray()
    buffer.PositionOf(byte('\n'))
    |> Option.ofNullable
    |> Option.map (fun i ->
        let bytes = buffer.Slice(0, i) |> _.ToArray()
        System.Text.Encoding.UTF8.GetString bytes
        )

let read (pipe : PipeReader) = task {
    let mutable c = true
    while c do
        let! r = pipe.ReadAsync()
        let b = r.Buffer
        match tryReadLine(b) with
        | Some s ->
            printfn $"{s}"
            pipe.AdvanceTo(b.Start, b.End)
        | None -> ()
        if r.IsCompleted then c <- false
        do! Task.Yield()
    do! pipe.CompleteAsync()
}

[<EntryPoint>]
let main _ =
    let reader, writer = buildPipe ()
    let socket = startServer "127.0.0.1" 1234
    let writing = write writer socket
    let reading = read reader
    Task.WhenAll(writing, reading) |> ignore
    0