Socket.BeginReceive issue with thread blocking

Hi All,
I am having trouble understanding the correct functionality of BeginReceive in the .NET socket object. According to MSDN, BeginReceive will return immediately and the callback function will be run on a separate thread when data is received.
However, I have found that blocking the main thread also causes the receive process to be blocked. From the code below, I would expect to see the following output:
"Listening"
"Data received"
"Return"
This would indicate that the data is correctly received while the main thread sleeps (in real life, it would be doing other work). However, no data is received while the thread sleeps, instead all calls to OnDataReceived only occur after the sleep has ended.
Could anyone shed any light on this?
Many thanks,
Samuele Armondi
public class Client
    {
        private Socket socket;
        private AutoResetEvent autoResetEvent;
        private IPEndPoint serverEndPoint;
        private byte[] buffer;
 
        public bool Connect()
        {
            //connection code
            socket.Connect(serverEndPoint);
                       
            //start listening for data
            Debug.Print("Listening");
            socket.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, new AsyncCallback(OnDataReceived), null);
 
            Thread.Sleep(1000);
            Debug.Print("Return");
        }
 
        //Async method called when data is received.
        private void OnDataReceived(IAsyncResult res)
        {
            //handle data
            Debug.Print("Data received.");
            
            //carry on receiving
            socket.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, new AsyncCallback(OnDataReceived), null);            
        }
    }

Open in new window

LVL 1
peddleAsked:
Who is Participating?
I wear a lot of hats...

"The solutions and answers provided on Experts Exchange have been extremely helpful to me over the last few years. I wear a lot of hats - Developer, Database Administrator, Help Desk, etc., so I know a lot of things but not a lot about one thing. Experts Exchange gives me answers from people who do know a lot about one thing, in a easy to use platform." -Todd S.

MogalManicCommented:
How are you creating the socket?  If you are using the SocketInformation class there is an option in SocketInformationOptions to create a NonBlocking socket.

I made some simple modifictaions to your sample and it worked for me.
  1)  Increased the delay (one second might not be long enough):
        Debug.Print("Sleeping");
        do {
            Thread.Sleep(1000);  //Keep sleeping until async result is processed
        } while (!Done);
        Debug.Print("Return");
   2) did not receive again if aync result was complete also set the Done flag:
    private void OnDataReceived(IAsyncResult res)
    {
        //handle data
        Debug.Print("Data received:"+Encoding.UTF8.GetString(buffer));

        if (res.IsCompleted == false)
        {
            //carry on receiving
            socket.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, new AsyncCallback(OnDataReceived), null);
        }
        Debug.Print("All Done");
        Done = true;
    }
0
peddleAuthor Commented:
Thanks for the information. However increasing the delay has the same effect - no receiving seems to take place on the socket while the main thread is sleeping or waiting, which is in contrast to multi-threaded functionality!
See my comments in the ConnectToServer function for more details.

Here is my complete code (apologies for the lengthy snippet!). I'm not sure whether the problem is arising in PacketHelper.ReadPacket - this uses a blocking call to Receive by design; maybe this is somehow 'breaking' the async call chain?
Many thank
using System;
using System.Net;
using System.Text;
using System.Threading;
using System.Collections.Generic;
using System.Net.Sockets;
 
public class Client
{
    #region Private Variables
    private string userName;
    private string machineName;
    private IPAddress serverIP;
    private int serverPort;
    private bool connected;
    private Socket socket;
    private Guid clientID;
    private byte[] buffer;
    private StringBuilder log;
    private AutoResetEvent[] autoResetEvents;
    private bool raiseUpdateEvents;
    #endregion
 
    #region Constructor
    public Client()
    {
        userName = Environment.UserName;
        machineName = Environment.MachineName;
        connected = false;
        buffer = new byte[PacketHelper.PacketStartMarker.Length];
        log = new StringBuilder();
 
        //holds four events, one each for: commodities, cal periods, efa periods, prices.
        autoResetEvents = new AutoResetEvent[] {
            new AutoResetEvent(false),
            new AutoResetEvent(false),
            new AutoResetEvent(false),
            new AutoResetEvent(false)};
 
       raiseUpdateEvents = false;
    }
    #endregion
 
    #region Private Methods
    private void OnDataReceived(IAsyncResult res)
    {
        try
        {
            //get the client which sent the data.
            SocketError errorCode;
            int bytesReceived = socket.EndReceive(res, out errorCode);
            Log("data received");
            if (bytesReceived == 0)
            {
                //socket disconnect message - this means the server disconnected the client.
                LoggedIn = false;
                RaiseConnectionEvent(ConnectionEventArgs.EventType.Disconnected, ConnectionEventArgs.EventSource.Server);
                Log("Server disconnected.");
            }
            if (bytesReceived > 0)
            {
                //make sure this is a proper packet
                if (bytesReceived == PacketHelper.PacketStartMarker.Length && PacketHelper.CompareArrays(buffer, PacketHelper.PacketStartMarker))
                {
                    byte[] data = PacketHelper.ReadPacketData(socket, true);
                    IDataPacket packet = PacketHelper.Decode(data);
 
                    if (packet is LoginResponse)
                    {
                        ProcessLoginResponse(packet as LoginResponse);
                    }
                    else if (packet is ServerMessage)
                        RaiseServerMessageEvent(packet as ServerMessage);
                    else if (packet is UpdateNotificationBase)
                        ProcessUpdateNotification(packet as UpdateNotificationBase);
                    else if (packet is DataQueryResponse)
                        ProcessQueryResponse(packet as DataQueryResponse);
                    else if (packet is LogoutAcknowledgement)
                    {
                        CloseSocket();
                        RaiseConnectionEvent(ConnectionEventArgs.EventType.Disconnected);
                    }
                }
            }
        }
        catch (ObjectDisposedException)
        {
            Log("Socket closed from client {0}.", clientID.ToString());
        }
        catch (SocketException se)
        {
            if (se.ErrorCode == 10054) // Error code for Connection reset by client
            {
                Log("Client {0} disconnected, client removed from server.", clientID.ToString());
            }
            else
            {
                Log("Error receiving data from client {0}.\r\nError details:\r\n{1}", clientID.ToString(), se.Message);
            }
        }
        catch (Exception e)
        {
            Log("Error: {0}", e.Message);
        }
        finally
        {
            //make sure we continue listening on this socket
            if (socket != null && socket.Connected)
            {
                buffer = new byte[PacketHelper.PacketStartMarker.Length];
                socket.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, new AsyncCallback(OnDataReceived), null);
            }
        }
    }
 
    private void ProcessLoginResponse(LoginResponse Packet)
    {
        try
        {
            if (Packet.LoggedIn == true)
            {
                LoggedIn = true;
                clientID = Packet.ClientID;
                RaiseConnectionEvent(ConnectionEventArgs.EventType.Connected);
                Log("logged in");
            }
        }
        catch (Exceptions.SocketClosedException)
        {
            LoggedIn = false;
            RaiseConnectionEvent(ConnectionEventArgs.EventType.Disconnected);
        }
    }
 
    private void ProcessQueryResponse(DataQueryResponse Response)
    {
        switch (Response.QueryID)
        {
            case "CommodityNames":
                if (Response.DataType == commodityNames.GetType())
                {
                    commodityNames = new Dictionary<PriceCommodity, string>(Response.GetTypedResult<Dictionary<PriceCommodity, string>>());
                    Log("Got commodity names.");
                    autoResetEvents[0].Set(); ;
                }
                break;
 
            case "CalPeriodNames":
                if (Response.DataType == calendarPeriodNames.GetType())
                {
                    calendarPeriodNames = new Dictionary<PricePeriod, string>(Response.GetTypedResult<Dictionary<PricePeriod, string>>());
                    Log("Got cal period names.");
                    autoResetEvents[1].Set();
                }
                break;
 
            case "EfaPeriodNames":
                if (Response.DataType == efaPeriodNames.GetType())
                {
                    efaPeriodNames = new Dictionary<PricePeriod, string>(Response.GetTypedResult<Dictionary<PricePeriod, string>>());
                    Log("Got efa period names.");
                    autoResetEvents[2].Set();
                }
                break;
 
            case "CurrPrices":
                if (Response.DataType == prices.GetType())
                {
                    prices = new Dictionary<PriceCommodity, Dictionary<string, PriceContainer>>(Response.GetTypedResult<Dictionary<PriceCommodity, Dictionary<string, PriceContainer>>>());
                    Log("Got prices.");
                    autoResetEvents[3].Set();
                }
                break;
            default:
                break;
        }
    }
    
    private void QueryCommodityNames(object State)
    {
        DataQueryRequest request = new DataQueryRequest("CommodityNames", "GET CommodityNames", commodityNames.GetType());
        byte[] packet = PacketHelper.Encode(request);
 
        PacketHelper.WritePacketData(packet, socket);
 
        //wait for the query to complete
        autoResetEvents[0].WaitOne();
    }
 
    private void QueryPeriodNames(object State)
    {
        DataQueryRequest calRequest = new DataQueryRequest("CalPeriodNames", "GET CalendarPeriodNames", calendarPeriodNames.GetType());
        DataQueryRequest efaRequest = new DataQueryRequest("EfaPeriodNames", "GET EfaPeriodNames", efaPeriodNames.GetType());
        byte[] calPacket = PacketHelper.Encode(calRequest);
        byte[] efaPacket = PacketHelper.Encode(efaRequest);
 
        PacketHelper.WritePacketData(calPacket, socket);
        PacketHelper.WritePacketData(efaPacket, socket);
        
        //wait for the query to complete        
        autoResetEvents[1].WaitOne();     
        autoResetEvents[2].WaitOne();
    }
 
    private void QueryPrices(object State)
    {
        DataQueryRequest request = new DataQueryRequest("CurrPrices", "GET CurrentPrices", prices.GetType());
        byte[] packet = PacketHelper.Encode(request);
 
        PacketHelper.WritePacketData(packet, socket);
 
        //wait for the query to complete        
        autoResetEvents[3].WaitOne();
    }
    #endregion
 
    #region Public Methods
    /// <summary>
    /// Connects to the price server. Throws an Exceptions.ServerNotRunningException if the remote server is unavailable.
    /// </summary>
    /// <param name="ServerIP">The IP address of the server.</param>
    /// <param name="ServerPort">The port number to connect to.</param>
    public void ConnectToServer(IPAddress ServerIP, int ServerPort)
    {
        try
        {
            //if the socket has never been opened or it has been closed, it will be null.
            if (socket != null)
                return;
 
            serverIP = ServerIP;
            serverPort = ServerPort;
 
            socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
 
            IPEndPoint serverEndPoint = new IPEndPoint(serverIP, serverPort);
            socket.Connect(serverEndPoint);
 
            if (socket.Connected)
            {
                Log("Connected to server.");
                connected = true;
 
                //start listening for data									
                socket.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, new AsyncCallback(OnDataReceived), null);
 
                //send the login request
                LoginRequest loginRequest = new LoginRequest(userName, machineName);
                PacketHelper.WritePacketData(PacketHelper.Encode(loginRequest), socket);
 
                //run initial queries
 
                //code runs fine if the queries are run on a ThreadPool thread, but 
                //this function returns before all queries have completed.
                //ThreadPool.QueueUserWorkItem(new WaitCallback(QueryCommodityNames));
                //ThreadPool.QueueUserWorkItem(new WaitCallback(QueryPeriodNames));
                //ThreadPool.QueueUserWorkItem(new WaitCallback(QueryPrices));
 
                //running the query functions directly, each one should block until a response has been received.
                //however, the code stops indefinitely when calling WaitOne on any of the WaitHandles.
                //QueryCommodityNames(null);
                //QueryPeriodNames(null);
                //QueryPrices(null);
 
                raiseUpdateEvents = true;
            }
 
            Log("return");
        }
        catch (SocketException ex)
        {
            socket = null;
            throw new Exceptions.ServerNotRunningException(ex);
        }
        catch (Exception ex)
        {
            Debug.Write(ex.Message);
            throw new Exceptions.ConnectionErrorException(ex);
        }
    }
    #endregion
}
 
//PacketHelper class
using System;
using System.Collections.Generic;
using System.Text;
using System.Runtime.Serialization.Formatters.Binary;
using System.IO;
using System.Net.Sockets;
using System.Diagnostics;
 
namespace DataPackets
{
    public class PacketHelper
    {
        public static IDataPacket Decode(byte[] PacketData)
        {
            if (PacketData == null)
                return null;
 
            BinaryFormatter binaryFormatter = new BinaryFormatter();
            MemoryStream ms = new MemoryStream();
 
            try
            {
                ms = new MemoryStream(PacketData);
                IDataPacket packet = (IDataPacket)binaryFormatter.Deserialize(ms);
                return packet;
            }
            catch (Exception e)
            {
                throw new Exceptions.DecodingException(e, PacketData.ToString());
            }
            finally
            {
                ms.Close();
            }
        }
 
        public static byte[] Encode(IDataPacket Packet)
        {
            if (Packet == null)
                return new byte[] { };
 
            BinaryFormatter binaryFormatter = new BinaryFormatter();
            MemoryStream ms = new MemoryStream();
            try
            {
                binaryFormatter.Serialize(ms, Packet);
                byte[] data = ms.ToArray();
                return data;
            }
            catch (Exception e)
            {
                throw new Exceptions.DecodingException(e, Packet.ToString());
            }
            finally
            {
                ms.Close();
            }
        }
 
        /// <summary>
        /// Reads a packet from a socket.
        /// </summary>
        /// <param name="SourceSocket">To socket to read the data from.</param>
        /// <returns>A byte[] representing an IDataPacket serialized in binary format.</returns>
        public static byte[] ReadPacketData(Socket SourceSocket)
        {
            return ReadPacketData(SourceSocket, false);
        }
 
        /// <summary>
        /// Reads a packet from a socket.
        /// </summary>
        /// <param name="SourceSocket">To socket to read the data from.</param>
        /// <param name="SkipPacketStartMarker">If true, the function will assume that the PacketStartMarker
        /// byte is present and has already been read.</param>
        /// <returns>A byte[] representing an IDataPacket serialized in binary format plus 
        /// size information and start and end markers. This can be turned into an IDataPacket object
        /// using the Decode function.</returns>
        public static byte[] ReadPacketData(Socket SourceSocket, bool SkipPacketStartMarker)
        {
            if (SourceSocket == null)
                throw new ArgumentNullException("SourceSocket");
            if (!SourceSocket.Connected)
                throw new Exceptions.SocketClosedException(null);
 
            try
            {
				byte[] b = new byte[PacketStartMarker.Length];
				int receivedBytes = 0;
 
                if (!SkipPacketStartMarker)
					receivedBytes = SourceSocket.Receive(b);
 
                if (SkipPacketStartMarker || (receivedBytes == PacketStartMarker.Length && CompareArrays(b, PacketStartMarker)))
                {
                    byte[] lenBytes = new byte[TotalPacketLenghtStringSize];
					receivedBytes = SourceSocket.Receive(lenBytes);
					if (receivedBytes != TotalPacketLenghtStringSize)
						throw new Exceptions.InvalidPacketException();
					
					string packetLengthString = Encoding.UTF8.GetString(lenBytes).Replace("x", "");
					int packetLength = 0;
					if (!int.TryParse(packetLengthString, out packetLength))
						throw new Exceptions.InvalidPacketException();
 
                    //swallop up the packet length end marker
					b = new byte[PacketDataStartMarker.Length];
					receivedBytes = SourceSocket.Receive(b);
					if (receivedBytes != PacketDataStartMarker.Length)
						throw new Exceptions.InvalidPacketException();
 
                    byte[] packet = new byte[packetLength];                    
                    receivedBytes = SourceSocket.Receive(packet);
 
					int retries = 0;
					while (receivedBytes < packetLength)
					{
						if (retries == MaximumPacketReadAttempts)
							throw new Exceptions.InvalidPacketException();
 
						retries++;
						int bytes = SourceSocket.Receive(packet, receivedBytes, packetLength - receivedBytes, SocketFlags.None);
						receivedBytes += bytes;
					}
 
					//swallop up the packet end marker
					b = new byte[PacketEndMarker.Length];
					receivedBytes = SourceSocket.Receive(b);
					if (receivedBytes != PacketEndMarker.Length)
						throw new Exceptions.InvalidPacketException();
                    
                    return packet;
                }
 
                return new byte[] { };
            }
            catch (SocketException sockEx)
            {
                //ignore this exception, generated when the listener is closed.
                if (sockEx.ErrorCode != 10004)
                    throw new Exceptions.SocketErrorException(sockEx.ErrorCode, sockEx);
                else
                    return new byte[] { };
            }
            catch (ObjectDisposedException objEx)
            {
                throw new Exceptions.SocketClosedException(objEx);
            }
            catch (Exception e)
            {
                throw new Exceptions.PacketSendingException(e);
            }
        }
 
        public static void WritePacketData(byte[] Data, Socket DestinationSocket)
        {
            if (Data == null)
                throw new ArgumentNullException("Data");
            if (Data.Length == 0)
                throw new ArgumentNullException("Data");
            if (DestinationSocket == null)
                throw new ArgumentNullException("DestinationSocket");
            if (!DestinationSocket.Connected)
                throw new Exceptions.SocketClosedException(null);
 
            try
            {
                string packetLength = Data.Length.ToString();
				if (packetLength.Length > TotalPacketLenghtStringSize)
					throw new ArgumentException("Data length is greater than maximum allowed.");
 
				packetLength = packetLength.PadLeft(TotalPacketLenghtStringSize, 'x');
                byte[] lengthBytes = Encoding.UTF8.GetBytes(packetLength);
				//total lenght = <PS> + length bytes + <PD> + data + <PE>
                byte[] packet = new byte[PacketStartMarker.Length 
										+ lengthBytes.Length 
										+ PacketDataStartMarker.Length 
										+ Data.Length 
										+ PacketEndMarker.Length];
 
                int currentPosition = 0;
				PacketStartMarker.CopyTo(packet, currentPosition);
				currentPosition += PacketStartMarker.Length;
 
                lengthBytes.CopyTo(packet, currentPosition);
                currentPosition += lengthBytes.Length;
 
				PacketDataStartMarker.CopyTo(packet, currentPosition);
				currentPosition += PacketDataStartMarker.Length;
 
                Data.CopyTo(packet, currentPosition);
                currentPosition += Data.Length;
 
				PacketEndMarker.CopyTo(packet, currentPosition);
 
                //DestinationSocket.Send(packet, SocketFlags.None);
                IAsyncResult res = DestinationSocket.BeginSend(packet, 0, packet.Length, SocketFlags.None, new AsyncCallback(OnSendCompleted), DestinationSocket);
                Debug.Assert(DestinationSocket.Connected);
                res.AsyncWaitHandle.WaitOne();
            }
            catch (SocketException sockEx)
            {
                throw new Exceptions.SocketErrorException(sockEx.ErrorCode, sockEx);
            }
            catch (ObjectDisposedException objEx)
            {
                throw new Exceptions.SocketClosedException(objEx);
            }
            catch (Exception e)
            {
                throw new Exceptions.PacketSendingException(e);
            }
        }
 
        private static void OnSendCompleted(IAsyncResult res)
        {
            try
            {
                Socket socket = res.AsyncState as Socket;
                socket.EndSend(res);
            }
            catch
            {
            }
        }
 
		public static bool CompareArrays(byte[] Rhs, byte[] Lhs)
		{
			if (Rhs.Length != Lhs.Length)
				return false;
 
			for (int i = 0; i < Rhs.Length; i++)
			{
				if (Rhs[i] != Lhs[i])
					return false;
			}
 
			return true;
		}
 
        public static byte[] PacketStartMarker
        {
			get { return new byte[] { 60, 80, 83, 62 }; /* byte equivalent of <PS>*/ }
        }
 
		public static byte[] PacketEndMarker
        {
			get { return new byte[] { 60, 80, 69, 62 }; /* byte equivalent of <PE>*/ }
        }
 
		public static byte[] PacketDataStartMarker
        {
			get { return new byte[] { 60, 80, 68, 62 }; /* byte equivalent of <PD>*/ }
        }
 
		public static int TotalPacketLenghtStringSize
		{
			get { return 6; }
		}
 
		public static int MaximumPacketReadAttempts
		{
			get { return 10; }
		}
    }
}

Open in new window

0

Experts Exchange Solution brought to you by

Your issues matter to us.

Facing a tech roadblock? Get the help and guidance you need from experienced professionals who care. Ask your question anytime, anywhere, with no hassle.

Start your 7-day free trial
It's more than this solution.Get answers and train to solve all your tech problems - anytime, anywhere.Try it for free Edge Out The Competitionfor your dream job with proven skills and certifications.Get started today Stand Outas the employee with proven skills.Start learning today for free Move Your Career Forwardwith certification training in the latest technologies.Start your trial today
.NET Programming

From novice to tech pro — start learning today.