My idea is to put the data received from a socket into a more practical form of IObservable<byte>, so I can use the power of Rx to query the received data.
Related links:
Using Reactive Extensions (Rx) for socket programming practical?
1. First, I've created a extensions class to wrap each method that I'm interested in into a Task:
public static class SocketTaskExtensions { public static Task<Socket> AcceptClient(this Socket socket) { var tcs = new TaskCompletionSource<Socket>(); try { socket.BeginAccept(iar => { try { Socket handler = socket.EndAccept(iar); tcs.SetResult(handler); } catch (Exception ex) { tcs.SetException(ex); } }, null); } catch (Exception ex) { tcs.SetException(ex); } return tcs.Task; } public static Task ConnectTo(this Socket socket, EndPoint remoteEndPoint) { var tcs = new TaskCompletionSource<bool>(); try { socket.BeginConnect(remoteEndPoint, iar => { try { socket.EndConnect(iar); tcs.SetResult(true); } catch (Exception ex) { tcs.SetException(ex); } }, null); } catch (Exception ex) { tcs.SetException(ex); } return tcs.Task; } public static Task<int> ReceiveData(this Socket socket, byte[] buffer, int offset, int count, SocketFlags socketFlags) { var tcs = new TaskCompletionSource<int>(); try { socket.BeginReceive(buffer, offset, count, socketFlags, iar => { try { int bytesRead = socket.EndReceive(iar); tcs.SetResult(bytesRead); } catch (Exception ex) { tcs.SetException(ex); } }, null); } catch (Exception ex) { tcs.SetException(ex); } return tcs.Task; } public static Task<int> SendData(this Socket socket, byte[] data) { var tcs = new TaskCompletionSource<int>(); try { socket.BeginSend(data, 0, data.Length, 0, iar => { try { int bytesSent = socket.EndSend(iar); tcs.SetResult(bytesSent); } catch (Exception ex) { tcs.SetException(ex); } }, null); } catch (Exception ex) { tcs.SetException(ex); } return tcs.Task; } }
2. I've created an extensions class to add ReadToObservable on socket class:
public static class SocketRxExtensions { public static IObservable<byte> ReadToObservable(this Socket socket, Func<Exception, bool> exceptionsHandler) { BlockingCollection<byte> chunks = new BlockingCollection<byte>(); ReadDataFromSocket(socket, (buffer, length) => { if (length == 0) { chunks.CompleteAdding(); } else { for (var i = 0; i < length; i++) { chunks.Add(buffer[i]); } } },exceptionsHandler); return chunks.GetConsumingEnumerable().ToObservable<byte>(Scheduler.NewThread); } private static void ReadDataFromSocket(Socket socket, Action<byte[], int> callback, Func<Exception, bool> exceptionsHandler) { byte[] buffer = new byte[1024]; var readDataTask = socket.ReceiveData(buffer, 0, 1024, SocketFlags.None); readDataTask.ContinueWith((task) => { if (task.Exception == null) { callback(buffer, task.Result); if (task.Result != 0) { ReadDataFromSocket(socket, callback, exceptionsHandler); } else { socket.Shutdown(SocketShutdown.Receive); } } }); try { readDataTask.Wait(); } catch(AggregateException ex) { ex.Handle(exceptionsHandler); } } }3. Finally, I've used these classes to write an elegant query to do my socket work:
var dataRead = socket.ReadToObservable(exceptionHandler).ObserveOn(Scheduler.CurrentThread); var messages = from headerBytes in dataRead.Buffer(4) let headerLength = BitConverter.ToInt32(headerBytes.Reverse().ToArray(), 0) let message = dataRead.Take(headerLength) select new Tuple<int,byte[]>(headerLength,message.ToEnumerable().ToArray()); messages.Subscribe(x => { Console.WriteLine("message received length: " + x.Item1); Console.Write(Encoding.Unicode.GetChars(x.Item2)); });