AsyncSocketClient socket收发处理
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Net.Sockets;
namespace IocpCore
{
public class AsyncSocketClient
{
public static int IocpReadLen = 1024;
public readonly Socket ConnectSocket;
protected SocketAsyncEventArgs m_receiveEventArgs;
public SocketAsyncEventArgs ReceiveEventArgs { get { return m_receiveEventArgs; } set { m_receiveEventArgs = value; } }
protected byte[] m_asyncReceiveBuffer;
protected SocketAsyncEventArgs m_sendEventArgs;
public SocketAsyncEventArgs SendEventArgs { get { return m_sendEventArgs; } set { m_sendEventArgs = value; } }
protected byte[] m_asyncSendBuffer;
public event Action<AsyncSocketClient, byte[]> OnReadData;
public event Action<AsyncSocketClient, int> OnSendData;
public event Action<AsyncSocketClient> OnSocketClose;
static object releaseLock = new object();
public static int createCount = 0;
public static int releaseCount = 0;
~AsyncSocketClient()
{
lock (releaseLock)
{
releaseCount++;
}
}
public AsyncSocketClient(Socket socket)
{
lock (releaseLock)
{
createCount++;
}
ConnectSocket = socket;
m_receiveEventArgs = new SocketAsyncEventArgs();
m_asyncReceiveBuffer = new byte[IocpReadLen];
m_receiveEventArgs.AcceptSocket = ConnectSocket;
m_receiveEventArgs.Completed += ReceiveEventArgs_Completed;
m_sendEventArgs = new SocketAsyncEventArgs();
m_asyncSendBuffer = new byte[IocpReadLen * 2];
m_sendEventArgs.AcceptSocket = ConnectSocket;
m_sendEventArgs.Completed += SendEventArgs_Completed;
}
SocketClientInfo _clientInfo;
public SocketClientInfo ClientInfo
{
get
{
return _clientInfo;
}
}
internal void CreateClientInfo(NetListener netListener)
{
_clientInfo = new SocketClientInfo();
try
{
_clientInfo.Tag = netListener._listenParam._tag;
IPEndPoint ip = ConnectSocket.LocalEndPoint as IPEndPoint;
Debug.Assert(netListener._listenParam._port == ip.Port);
_clientInfo.LocalIp = ip.Address.ToString();
_clientInfo.LocalPort = netListener._listenParam._port;
ip = ConnectSocket.RemoteEndPoint as IPEndPoint;
_clientInfo.PeerIp = ip.Address.ToString();
_clientInfo.PeerPort = ip.Port;
}
catch (Exception ex)
{
NetLogger.Log("CreateClientInfo", ex);
}
}
internal void SetClientInfo(SocketClientInfo clientInfo)
{
_clientInfo = clientInfo;
}
#region read process
bool _inReadPending = false;
public EN_SocketReadResult ReadNextData()
{
lock (this)
{
if (_socketError)
return EN_SocketReadResult.ReadError;
if (_inReadPending)
return EN_SocketReadResult.InAsyn;
if(!ConnectSocket.Connected)
{
OnReadError();
return EN_SocketReadResult.ReadError;
}
try
{
m_receiveEventArgs.SetBuffer(m_asyncReceiveBuffer, 0, m_asyncReceiveBuffer.Length);
_inReadPending = true;
bool willRaiseEvent = ConnectSocket.ReceiveAsync(ReceiveEventArgs); //投递接收请求
if (!willRaiseEvent)
{
_inReadPending = false;
ProcessReceive();
if (_socketError)
{
OnReadError();
return EN_SocketReadResult.ReadError;
}
return EN_SocketReadResult.HaveRead;
}
else
{
return EN_SocketReadResult.InAsyn;
}
}
catch (Exception ex)
{
NetLogger.Log("ReadNextData", ex);
_inReadPending = false;
OnReadError();
return EN_SocketReadResult.ReadError;
}
}
}
private void ProcessReceive()
{
if (ReceiveEventArgs.BytesTransferred > 0
&& ReceiveEventArgs.SocketError == SocketError.Success)
{
int offset = ReceiveEventArgs.Offset;
int count = ReceiveEventArgs.BytesTransferred;
byte[] readData = new byte[count];
Array.Copy(m_asyncReceiveBuffer, offset, readData, 0, count);
_inReadPending = false;
if (!_socketError)
OnReadData?.Invoke(this, readData);
}
else
{
_inReadPending = false;
OnReadError();
}
}
private void ReceiveEventArgs_Completed(object sender, SocketAsyncEventArgs e)
{
lock (this)
{
_inReadPending = false;
ProcessReceive();
if (_socketError)
{
OnReadError();
}
}
}
bool _socketError = false;
private void OnReadError()
{
lock (this)
{
if (_socketError == false)
{
_socketError = true;
OnSocketClose?.Invoke(this);
}
CloseClient();
}
}
#endregion
#region send process
int _sendBufferByteCount = 102400;
public int SendBufferByteCount
{
get
{
return _sendBufferByteCount;
}
set
{
if (value < 1024)
{
_sendBufferByteCount = 1024;
}
else
{
_sendBufferByteCount = value;
}
}
}
SendBufferPool _sendDataPool = new SendBufferPool();
internal EN_SendDataResult PutSendData(byte[] data)
{
if (_socketError)
return EN_SendDataResult.no_client;
if (_sendDataPool._bufferByteCount >= _sendBufferByteCount)
{
return EN_SendDataResult.buffer_overflow;
}
if (data.Length <= IocpReadLen)
{
_sendDataPool.PutObj(data);
}
else
{
List<byte[]> dataItems = SplitData(data, IocpReadLen);
foreach (byte[] item in dataItems)
{
_sendDataPool.PutObj(item);
}
}
return EN_SendDataResult.ok;
}
bool _inSendPending = false;
public EN_SocketSendResult SendNextData()
{
lock (this)
{
if (_socketError)
{
return EN_SocketSendResult.SendError;
}
if (_inSendPending)
{
return EN_SocketSendResult.InAsyn;
}
int sendByteCount = GetSendData();
if (sendByteCount == 0)
{
return EN_SocketSendResult.NoSendData;
}
//防止抛出异常,否则影响性能
if (!ConnectSocket.Connected)
{
OnSendError();
return EN_SocketSendResult.SendError;
}
try
{
m_sendEventArgs.SetBuffer(m_asyncSendBuffer, 0, sendByteCount);
_inSendPending = true;
bool willRaiseEvent = ConnectSocket.SendAsync(m_sendEventArgs);
if (!willRaiseEvent)
{
_inSendPending = false;
ProcessSend(m_sendEventArgs);
if (_socketError)
{
OnSendError();
return EN_SocketSendResult.SendError;
}
else
{
OnSendData?.Invoke(this, sendByteCount);
//继续发下一条
return EN_SocketSendResult.HaveSend;
}
}
else
{
return EN_SocketSendResult.InAsyn;
}
}
catch (Exception ex)
{
NetLogger.Log("SendNextData", ex);
_inSendPending = false;
OnSendError();
return EN_SocketSendResult.SendError;
}
}
}
private void SendEventArgs_Completed(object sender, SocketAsyncEventArgs sendEventArgs)
{
lock (this)
{
try
{
_inSendPending = false;
ProcessSend(m_sendEventArgs);
int sendCount = 0;
if (sendEventArgs.SocketError == SocketError.Success)
{
sendCount = sendEventArgs.BytesTransferred;
}
OnSendData?.Invoke(this, sendCount);
if (_socketError)
{
OnSendError();
}
}
catch (Exception ex)
{
NetLogger.Log("SendEventArgs_Completed", ex);
}
}
}
private bool ProcessSend(SocketAsyncEventArgs sendEventArgs)
{
if (sendEventArgs.SocketError == SocketError.Success)
{
return true;
}
else
{
OnSendError();
return false;
}
}
private int GetSendData()
{
int dataLen = 0;
while (true)
{
byte[] data = _sendDataPool.GetObj();
if (data == null)
return dataLen;
Array.Copy(data, 0, m_asyncSendBuffer, dataLen, data.Length);
dataLen += data.Length;
if (dataLen > IocpReadLen)
break;
}
return dataLen;
}
private void OnSendError()
{
lock (this)
{
if (_socketError == false)
{
_socketError = true;
OnSocketClose?.Invoke(this);
}
CloseClient();
}
}
#endregion
internal void CloseSocket()
{
try
{
ConnectSocket.Close();
}
catch (Exception ex)
{
NetLogger.Log("CloseSocket", ex);
}
}
static object socketCloseLock = new object();
public static int closeSendCount = 0;
public static int closeReadCount = 0;
bool _disposeSend = false;
void CloseSend()
{
if (!_disposeSend && !_inSendPending)
{
lock (socketCloseLock)
closeSendCount++;
_disposeSend = true;
m_sendEventArgs.SetBuffer(null, 0, 0);
m_sendEventArgs.Completed -= SendEventArgs_Completed;
m_sendEventArgs.Dispose();
}
}
bool _disposeRead = false;
void CloseRead()
{
if (!_disposeRead && !_inReadPending)
{
lock (socketCloseLock)
closeReadCount++;
_disposeRead = true;
m_receiveEventArgs.SetBuffer(null, 0, 0);
m_receiveEventArgs.Completed -= ReceiveEventArgs_Completed;
m_receiveEventArgs.Dispose();
}
}
private void CloseClient()
{
try
{
CloseSend();
CloseRead();
ConnectSocket.Close();
}
catch (Exception ex)
{
NetLogger.Log("CloseClient", ex);
}
}
//发送缓冲大小
private List<byte[]> SplitData(byte[] data, int maxLen)
{
List<byte[]> items = new List<byte[]>();
int start = 0;
while (true)
{
int itemLen = Math.Min(maxLen, data.Length - start);
if (itemLen == 0)
break;
byte[] item = new byte[itemLen];
Array.Copy(data, start, item, 0, itemLen);
items.Add(item);
start += itemLen;
}
return items;
}
}
public enum EN_SocketReadResult
{
InAsyn,
HaveRead,
ReadError
}
public enum EN_SocketSendResult
{
InAsyn,
HaveSend,
NoSendData,
SendError
}
class SendBufferPool
{
ObjectPool<byte[]> _bufferPool = new ObjectPool<byte[]>();
public Int64 _bufferByteCount = 0;
public bool PutObj(byte[] obj)
{
if (_bufferPool.PutObj(obj))
{
lock (this)
{
_bufferByteCount += obj.Length;
}
return true;
}
else
{
return false;
}
}
public byte[] GetObj()
{
byte[] result = _bufferPool.GetObj();
if (result != null)
{
lock (this)
{
_bufferByteCount -= result.Length;
}
}
return result;
}
}
}










