Pipes And Performance
Efficient and reliable data transmission is the backbone of seamless communication. At the heart of this process lies TCP (Transmission Control Protocol) networking, a fundamental technology that ensures data is sent and received accurately across the internet.
I never had to create a TCP server/client with .NET but I always wanted to learn, I was never compelled to do this seeing as the process was too complex and mistakes were easy to create.
A great article about writing a TCP server can be read here https://learn.microsoft.com/en-us/dotnet/standard/io/pipelines . And while it’s true that we can write a simpler server that wouldn’t be as performant (memory and CPU wise).
Pipelines were created for 2 reasons:
- reduce the boiler place code required to handle stream processing
- create a high performance framework (with the use of the new Memory/Span and rented buffers, etc) for handling stream processing .
- for more on this read https://devblogs.microsoft.com/dotnet/system-io-pipelines-high-performance-io-in-net/
What has this produced? Let’s see the benchmarks and where is kestrel situated
Using pipes is pretty straightforward, most of the code I’ve copied from David Fowler’s example. For my playground, I abstracted away some of the boilerplate and created a TMessage that should always be shared between the server and the client
The Server: when started it will continuously listen for new connections. The number of lines of code may be intimidating at first, but if you take a closer look the code is quite simple. Processing the client only involves reading the pipe and iterating the buffer (for my simple test I just iterated the entire thing in an array)
public class TestTcpSever<TMessage> { private readonly IPAddress _ipAddress; private readonly int _port; private readonly IMessageSerialization<TMessage> _messageSerialization; internal TestTcpSever(IPAddress ipAddress, int port, IMessageSerialization<TMessage> messageSerialization) { _ipAddress = ipAddress; _port = port; _messageSerialization = messageSerialization; } public async Task StartAsync(Action<TMessage> onMessage) { var listener = new TcpListener(_ipAddress, _port); listener.Start(); Console.WriteLine($"Server listening on port {_port}"); while (true) { TcpClient client = await listener.AcceptTcpClientAsync(); var remote = client.Client.RemoteEndPoint.Serialize().ToString(); var handle = client.Client.Handle.ToInt64(); Console.WriteLine($"new connection from client {remote} with handle {handle}"); _ = ProcessClientAsync(client, onMessage); } } private async Task ProcessClientAsync(TcpClient client, Action<TMessage> onMessage) { NetworkStream stream = client.GetStream(); PipeReader pipeReader = PipeReader.Create(stream); while (true) { ReadResult result = await pipeReader.ReadAsync(); ReadOnlySequence<byte> buffer = result.Buffer; byte[] messageBytes = ArrayPool<byte>.Shared.Rent((int)buffer.Length); int lastIndex = 0; foreach (ReadOnlyMemory<byte> segment in buffer) { Array.Copy(segment.ToArray(), 0, messageBytes, lastIndex, segment.Length); lastIndex += segment.Length; } TMessage messageReceived = _messageSerialization .Deserialize(messageBytes); onMessage(messageReceived); pipeReader.AdvanceTo(buffer.End); if (result.IsCompleted) break; string received = $"received at {DateTime.UtcNow.ToLongDateString()}"; await stream.WriteAsync( Encoding.UTF8.GetBytes(received), 0, Encoding.UTF8.GetBytes(received).Length); } pipeReader.Complete(); } }
The TCP Client (similar to the TCP Server I made it always using the same TMessage)
public class TestTcpClient<TMessage> : IDisposable, IAsyncDisposable { private readonly NetworkStream _stream; private readonly TcpClient _client; private readonly IMessageSerialization<TMessage> _messageSerialization; internal TestTcpClient(TcpClient client, IMessageSerialization<TMessage> messageSerialization) { _client = client; _messageSerialization = messageSerialization; _stream = client.GetStream(); } public void Dispose() { _stream.Dispose(); _client.Dispose(); } public async ValueTask DisposeAsync() { _client.Dispose(); await _stream.DisposeAsync(); } public async Task SendMessageAsync( TMessage message) { PipeWriter pipeWriter = PipeWriter.Create(_stream); byte[] messageBytes = _messageSerialization.Serialize(message); await pipeWriter.WriteAsync(messageBytes); await pipeWriter.FlushAsync(); } public async Task ProcessIncoming(CancellationToken cancellationToken) { PipeReader pipeReader = PipeReader.Create(_stream); while (!cancellationToken.IsCancellationRequested) { ReadResult result = await pipeReader.ReadAsync(); ReadOnlySequence<byte> buffer = result.Buffer; foreach (var segment in buffer) { byte[] messageBytes = segment.ToArray(); string messageString = Encoding.UTF8.GetString(messageBytes); Console.WriteLine($"Received message from server: {messageString}"); } pipeReader.AdvanceTo(buffer.End); // this should happen when client closes connection // or when there is a response indicating that the no more // data will be sent if (result.IsCompleted) break; } } }
Now the starter
public class TestTcpConnection<TMessage>( IPAddress ipAddress, int port, IMessageSerialization<TMessage> serialization) { public TestTcpSever<TMessage> CreateServer() { return new TestTcpSever<TMessage>( ipAddress, port, serialization); } public async Task<TestTcpClient<TMessage>> ConnectAsync() { TcpClient client = new TcpClient(); CancellationTokenSource cts = new(); await client.ConnectAsync( ipAddress, port, cts.Token); return new TestTcpClient<TMessage>(client, serialization); } }
Using our Server/Client
Finally we can use our boiler plate code and create a server/client
The Server
TestTcpConnection<TestMessage> connection = new TestTcpConnection<TestMessage>( IPAddress.Any, 8080, new TestMessageSerialization()); TestTcpSever<TestMessage> server = connection.CreateServer(); await server.StartAsync((message) => { Console.WriteLine($"Server: Received message {message}"); });
The Client
TestTcpConnection<TestMessage> connection = new TestTcpConnection<TestMessage>( IPAddress.Loopback, 8080, new TestMessageSerialization()); TestTcpClient<TestMessage> client = await connection.ConnectAsync(); CancellationTokenSource cts = new(); client.ProcessIncoming(cts.Token); while (!cts.IsCancellationRequested) { Console.WriteLine("Send:"); var input = Console.ReadLine(); await client.SendMessageAsync(new TestMessage() { Content = input ?? "empty", Timestamp = DateTime.UtcNow }); } SpinWait.SpinUntil(() => cts.IsCancellationRequested);