System.IO.Pipelines in F#
About
The .NET documentation demonstrates code that uses System.IO.Pipelines. I wanted to learn about it in F# style.
open System
open System.Buffers
open System.IO.Pipelines
open System.Net
open System.Net.Sockets
open System.Text.Json
open System.Threading.Tasks
let startServer (ip : string) port =
let ip = IPAddress.Parse(ip)
let listener = new TcpListener(ip, port)
listener.Start()
listener.AcceptSocket()
let buildPipe () =
let pipe = Pipe()
pipe.Reader, pipe.Writer
let write (pipe : PipeWriter) (socket : Socket) = task {
let mutable c = true
while c do
let m = pipe.GetMemory()
let! n = socket.ReceiveAsync(m)
pipe.Advance(n)
let! fr = pipe.FlushAsync()
if fr.IsCompleted then c <- false
do! Task.Yield()
do! pipe.CompleteAsync()
}
let tryReadLine (buffer : ReadOnlySequence<byte>) =
let b = buffer.Slice(0, buffer.Length) |> _.ToArray()
buffer.PositionOf(byte('\n'))
|> Option.ofNullable
|> Option.map (fun i ->
let bytes = buffer.Slice(0, i) |> _.ToArray()
System.Text.Encoding.UTF8.GetString bytes
)
let read (pipe : PipeReader) = task {
let mutable c = true
while c do
let! r = pipe.ReadAsync()
let b = r.Buffer
match tryReadLine(b) with
| Some s ->
printfn $"{s}"
pipe.AdvanceTo(b.Start, b.End)
| None -> ()
if r.IsCompleted then c <- false
do! Task.Yield()
do! pipe.CompleteAsync()
}
[<EntryPoint>]
let main _ =
let reader, writer = buildPipe ()
let socket = startServer "127.0.0.1" 1234
let writing = write writer socket
let reading = read reader
Task.WhenAll(writing, reading) |> ignore
0