Pipes And Performance
Efficient and reliable data transmission is the backbone of seamless communication. At the heart of this process lies TCP (Transmission Control Protocol) networking, a fundamental technology that ensures data is sent and received accurately across the internet.
I never had to create a TCP server/client with .NET but I always wanted to learn, I was never compelled to do this seeing as the process was too complex and mistakes were easy to create.
A great article about writing a TCP server can be read here https://learn.microsoft.com/en-us/dotnet/standard/io/pipelines . And while it’s true that we can write a simpler server that wouldn’t be as performant (memory and CPU wise).
Pipelines were created for 2 reasons:
- reduce the boiler place code required to handle stream processing
- create a high performance framework (with the use of the new Memory/Span and rented buffers, etc) for handling stream processing .
- for more on this read https://devblogs.microsoft.com/dotnet/system-io-pipelines-high-performance-io-in-net/
What has this produced? Let’s see the benchmarks and where is kestrel situated
https://www.techempower.com/benchmarks/#section=data-r16&hw=ph&test=plaintext
Usage
Using pipes is pretty straightforward, most of the code I’ve copied from David Fowler’s example. For my playground, I abstracted away some of the boilerplate and created a TMessage that should always be shared between the server and the client
The Server: when started it will continuously listen for new connections. The number of lines of code may be intimidating at first, but if you take a closer look the code is quite simple. Processing the client only involves reading the pipe and iterating the buffer (for my simple test I just iterated the entire thing in an array)
public class TestTcpSever<TMessage> { private readonly IPAddress _ipAddress; private readonly int _port; private readonly IMessageSerialization<TMessage> _messageSerialization; internal TestTcpSever(IPAddress ipAddress, int port, IMessageSerialization<TMessage> messageSerialization) { _ipAddress = ipAddress; _port = port; _messageSerialization = messageSerialization; } public async Task StartAsync(Action<TMessage> onMessage) { var listener = new TcpListener(_ipAddress, _port); listener.Start(); Console.WriteLine($"Server listening on port {_port}"); while (true) { TcpClient client = await listener.AcceptTcpClientAsync(); var remote = client.Client.RemoteEndPoint.Serialize().ToString(); var handle = client.Client.Handle.ToInt64(); Console.WriteLine($"new connection from client {remote} with handle {handle}"); _ = ProcessClientAsync(client, onMessage); } } private async Task ProcessClientAsync(TcpClient client, Action<TMessage> onMessage) { NetworkStream stream = client.GetStream(); PipeReader pipeReader = PipeReader.Create(stream); while (true) { ReadResult result = await pipeReader.ReadAsync(); ReadOnlySequence<byte> buffer = result.Buffer; byte[] messageBytes = ArrayPool<byte>.Shared.Rent((int)buffer.Length); int lastIndex = 0; foreach (ReadOnlyMemory<byte> segment in buffer) { Array.Copy(segment.ToArray(), 0, messageBytes, lastIndex, segment.Length); lastIndex += segment.Length; } TMessage messageReceived = _messageSerialization .Deserialize(messageBytes); onMessage(messageReceived); pipeReader.AdvanceTo(buffer.End); if (result.IsCompleted) break; string received = $"received at {DateTime.UtcNow.ToLongDateString()}"; await stream.WriteAsync( Encoding.UTF8.GetBytes(received), 0, Encoding.UTF8.GetBytes(received).Length); } pipeReader.Complete(); } }
The TCP Client (similar to the TCP Server I made it always using the same TMessage)
public class TestTcpClient<TMessage> : IDisposable, IAsyncDisposable { private readonly NetworkStream _stream; private readonly TcpClient _client; private readonly IMessageSerialization<TMessage> _messageSerialization; internal TestTcpClient(TcpClient client, IMessageSerialization<TMessage> messageSerialization) { _client = client; _messageSerialization = messageSerialization; _stream = client.GetStream(); } public void Dispose() { _stream.Dispose(); _client.Dispose(); } public async ValueTask DisposeAsync() { _client.Dispose(); await _stream.DisposeAsync(); } public async Task SendMessageAsync( TMessage message) { PipeWriter pipeWriter = PipeWriter.Create(_stream); byte[] messageBytes = _messageSerialization.Serialize(message); await pipeWriter.WriteAsync(messageBytes); await pipeWriter.FlushAsync(); } public async Task ProcessIncoming(CancellationToken cancellationToken) { PipeReader pipeReader = PipeReader.Create(_stream); while (!cancellationToken.IsCancellationRequested) { ReadResult result = await pipeReader.ReadAsync(); ReadOnlySequence<byte> buffer = result.Buffer; foreach (var segment in buffer) { byte[] messageBytes = segment.ToArray(); string messageString = Encoding.UTF8.GetString(messageBytes); Console.WriteLine($"Received message from server: {messageString}"); } pipeReader.AdvanceTo(buffer.End); // this should happen when client closes connection // or when there is a response indicating that the no more // data will be sent if (result.IsCompleted) break; } } }
Now the starter
public class TestTcpConnection<TMessage>( IPAddress ipAddress, int port, IMessageSerialization<TMessage> serialization) { public TestTcpSever<TMessage> CreateServer() { return new TestTcpSever<TMessage>( ipAddress, port, serialization); } public async Task<TestTcpClient<TMessage>> ConnectAsync() { TcpClient client = new TcpClient(); CancellationTokenSource cts = new(); await client.ConnectAsync( ipAddress, port, cts.Token); return new TestTcpClient<TMessage>(client, serialization); } }
Using our Server/Client
Finally we can use our boiler plate code and create a server/client
The Server
TestTcpConnection<TestMessage> connection = new TestTcpConnection<TestMessage>( IPAddress.Any, 8080, new TestMessageSerialization()); TestTcpSever<TestMessage> server = connection.CreateServer(); await server.StartAsync((message) => { Console.WriteLine($"Server: Received message {message}"); });
The Client
TestTcpConnection<TestMessage> connection = new TestTcpConnection<TestMessage>( IPAddress.Loopback, 8080, new TestMessageSerialization()); TestTcpClient<TestMessage> client = await connection.ConnectAsync(); CancellationTokenSource cts = new(); client.ProcessIncoming(cts.Token); while (!cts.IsCancellationRequested) { Console.WriteLine("Send:"); var input = Console.ReadLine(); await client.SendMessageAsync(new TestMessage() { Content = input ?? "empty", Timestamp = DateTime.UtcNow }); } SpinWait.SpinUntil(() => cts.IsCancellationRequested);