Pipes And Performance
Efficient and reliable data transmission is the backbone of seamless communication. At the heart of this process lies TCP (Transmission Control Protocol) networking, a fundamental technology that ensures data is sent and received accurately across the internet.
I never had to create a TCP server/client with .NET but I always wanted to learn, I was never compelled to do this seeing as the process was too complex and mistakes were easy to create.
A great article about writing a TCP server can be read here https://learn.microsoft.com/en-us/dotnet/standard/io/pipelines . And while it’s true that we can write a simpler server that wouldn’t be as performant (memory and CPU wise).
Pipelines were created for 2 reasons:
- reduce the boiler place code required to handle stream processing
- create a high performance framework (with the use of the new Memory/Span and rented buffers, etc) for handling stream processing .
- for more on this read https://devblogs.microsoft.com/dotnet/system-io-pipelines-high-performance-io-in-net/
What has this produced? Let’s see the benchmarks and where is kestrel situated
https://www.techempower.com/benchmarks/#section=data-r16&hw=ph&test=plaintext
Usage
Using pipes is pretty straightforward, most of the code I’ve copied from David Fowler’s example. For my playground, I abstracted away some of the boilerplate and created a TMessage that should always be shared between the server and the client
The Server: when started it will continuously listen for new connections. The number of lines of code may be intimidating at first, but if you take a closer look the code is quite simple. Processing the client only involves reading the pipe and iterating the buffer (for my simple test I just iterated the entire thing in an array)
public class TestTcpSever<TMessage>
{
private readonly IPAddress _ipAddress;
private readonly int _port;
private readonly IMessageSerialization<TMessage> _messageSerialization;
internal TestTcpSever(IPAddress ipAddress,
int port,
IMessageSerialization<TMessage> messageSerialization)
{
_ipAddress = ipAddress;
_port = port;
_messageSerialization = messageSerialization;
}
public async Task StartAsync(Action<TMessage> onMessage)
{
var listener = new TcpListener(_ipAddress, _port);
listener.Start();
Console.WriteLine($"Server listening on port {_port}");
while (true)
{
TcpClient client = await listener.AcceptTcpClientAsync();
var remote = client.Client.RemoteEndPoint.Serialize().ToString();
var handle = client.Client.Handle.ToInt64();
Console.WriteLine($"new connection from client {remote} with handle {handle}");
_ = ProcessClientAsync(client, onMessage);
}
}
private async Task ProcessClientAsync(TcpClient client, Action<TMessage> onMessage)
{
NetworkStream stream = client.GetStream();
PipeReader pipeReader = PipeReader.Create(stream);
while (true)
{
ReadResult result = await pipeReader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
byte[] messageBytes = ArrayPool<byte>.Shared.Rent((int)buffer.Length);
int lastIndex = 0;
foreach (ReadOnlyMemory<byte> segment in buffer)
{
Array.Copy(segment.ToArray(), 0, messageBytes, lastIndex, segment.Length);
lastIndex += segment.Length;
}
TMessage messageReceived = _messageSerialization
.Deserialize(messageBytes);
onMessage(messageReceived);
pipeReader.AdvanceTo(buffer.End);
if (result.IsCompleted)
break;
string received = $"received at {DateTime.UtcNow.ToLongDateString()}";
await stream.WriteAsync(
Encoding.UTF8.GetBytes(received),
0,
Encoding.UTF8.GetBytes(received).Length);
}
pipeReader.Complete();
}
}
The TCP Client (similar to the TCP Server I made it always using the same TMessage)
public class TestTcpClient<TMessage> : IDisposable, IAsyncDisposable
{
private readonly NetworkStream _stream;
private readonly TcpClient _client;
private readonly IMessageSerialization<TMessage> _messageSerialization;
internal TestTcpClient(TcpClient client,
IMessageSerialization<TMessage> messageSerialization)
{
_client = client;
_messageSerialization = messageSerialization;
_stream = client.GetStream();
}
public void Dispose()
{
_stream.Dispose();
_client.Dispose();
}
public async ValueTask DisposeAsync()
{
_client.Dispose();
await _stream.DisposeAsync();
}
public async Task SendMessageAsync(
TMessage message)
{
PipeWriter pipeWriter = PipeWriter.Create(_stream);
byte[] messageBytes = _messageSerialization.Serialize(message);
await pipeWriter.WriteAsync(messageBytes);
await pipeWriter.FlushAsync();
}
public async Task ProcessIncoming(CancellationToken cancellationToken)
{
PipeReader pipeReader = PipeReader.Create(_stream);
while (!cancellationToken.IsCancellationRequested)
{
ReadResult result = await pipeReader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
foreach (var segment in buffer)
{
byte[] messageBytes = segment.ToArray();
string messageString = Encoding.UTF8.GetString(messageBytes);
Console.WriteLine($"Received message from server: {messageString}");
}
pipeReader.AdvanceTo(buffer.End);
// this should happen when client closes connection
// or when there is a response indicating that the no more
// data will be sent
if (result.IsCompleted)
break;
}
}
}
Now the starter
public class TestTcpConnection<TMessage>(
IPAddress ipAddress,
int port,
IMessageSerialization<TMessage> serialization)
{
public TestTcpSever<TMessage> CreateServer()
{
return new TestTcpSever<TMessage>(
ipAddress,
port,
serialization);
}
public async Task<TestTcpClient<TMessage>> ConnectAsync()
{
TcpClient client = new TcpClient();
CancellationTokenSource cts = new();
await client.ConnectAsync(
ipAddress,
port,
cts.Token);
return new TestTcpClient<TMessage>(client, serialization);
}
}
Using our Server/Client
Finally we can use our boiler plate code and create a server/client
The Server
TestTcpConnection<TestMessage> connection = new TestTcpConnection<TestMessage>(
IPAddress.Any,
8080,
new TestMessageSerialization());
TestTcpSever<TestMessage> server = connection.CreateServer();
await server.StartAsync((message) =>
{
Console.WriteLine($"Server: Received message {message}");
});
The Client
TestTcpConnection<TestMessage> connection = new TestTcpConnection<TestMessage>(
IPAddress.Loopback,
8080,
new TestMessageSerialization());
TestTcpClient<TestMessage> client = await connection.ConnectAsync();
CancellationTokenSource cts = new();
client.ProcessIncoming(cts.Token);
while (!cts.IsCancellationRequested)
{
Console.WriteLine("Send:");
var input = Console.ReadLine();
await client.SendMessageAsync(new TestMessage()
{
Content = input ?? "empty",
Timestamp = DateTime.UtcNow
});
}
SpinWait.SpinUntil(() => cts.IsCancellationRequested);








