C#中一个高性能异步socket封装库的实现思路分享

2019-12-30 18:35:44丽君

NetServer 聚合其他类


using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net.Sockets;
using System.Threading;

namespace IocpCore
{
 public class NetServer
 {
  public Action<SocketEventParam> OnSocketPacketEvent;

  //每个连接发送缓冲大小
  public int SendBufferBytePerClient { get; set; } = 1024 * 100;

  bool _serverStart = false;
  List<NetListener> _listListener = new List<NetListener>();

  //负责对收到的字节流 组成完成的包
  ClientPacketManage _clientPacketManage;

  public Int64 SendByteCount { get; set; }
  public Int64 ReadByteCount { get; set; }

  List<ListenParam> _listListenPort = new List<ListenParam>();
  public void AddListenPort(int port, object tag)
  {
   _listListenPort.Add(new ListenParam(port, tag));
  }
  /// <summary>
  /// 
  /// </summary>
  /// <param name="listenFault">监听失败的端口</param>
  /// <returns></returns>
  public bool StartListen(out List<int> listenFault)
  {
   _serverStart = true;

   _clientPacketManage = new ClientPacketManage(this);
   _clientPacketManage.OnSocketPacketEvent += PutClientPacket;

   _netConnectManage.OnSocketConnectEvent += SocketConnectEvent;

   _listListener.Clear();
   Thread thread1 = new Thread(new ThreadStart(NetPacketProcess));
   thread1.Start();

   Thread thread2 = new Thread(new ThreadStart(NetSendProcess));
   thread2.Start();

   Thread thread3 = new Thread(new ThreadStart(NetReadProcess));
   thread3.Start();

   listenFault = new List<int>();
   foreach (ListenParam param in _listListenPort)
   {
    NetListener listener = new NetListener(this);
    listener._listenParam = param;
    listener.OnAcceptSocket += Listener_OnAcceptSocket;
    if (!listener.StartListen())
    {
     listenFault.Add(param._port);
    }
    else
    {
     _listListener.Add(listener);
     NetLogger.Log(string.Format("监听成功!端口:{0}", param._port));
    }
   }

   return listenFault.Count == 0;
  }

  public void PutClientPacket(SocketEventParam param)
  {
   OnSocketPacketEvent?.Invoke(param);
  }

  //获取包的最小长度
  int _packetMinLen;
  int _packetMaxLen;
  public int PacketMinLen
  {
   get { return _packetMinLen; }
  }
  public int PacketMaxLen
  {
   get { return _packetMaxLen; }
  }

  /// <summary>
  /// 设置包的最小和最大长度
  /// 当minLen=0时,认为是接收字节流
  /// </summary>
  /// <param name="minLen"></param>
  /// <param name="maxLen"></param>
  public void SetPacketParam(int minLen, int maxLen)
  {
   Debug.Assert(minLen >= 0);
   Debug.Assert(maxLen > minLen);
   _packetMinLen = minLen;
   _packetMaxLen = maxLen;
  }

  //获取包的总长度
  public delegate int delegate_GetPacketTotalLen(byte[] data, int offset);
  public delegate_GetPacketTotalLen GetPacketTotalLen_Callback;

  ObjectPoolWithEvent<SocketEventParam> _socketEventPool = new ObjectPoolWithEvent<SocketEventParam>();
  private void NetPacketProcess()
  {
   while (_serverStart)
   {
    try
    {
     DealEventPool();
    }
    catch (Exception ex)
    {
     NetLogger.Log(string.Format("DealEventPool 异常 {0}***{1}", ex.Message, ex.StackTrace));
    }
    _socketEventPool.WaitOne(1000);
   }
  }

  Dictionary<Socket, AsyncSocketClient> _clientGroup = new Dictionary<Socket, AsyncSocketClient>();
  public int ClientCount
  {
   get
   {
    lock (_clientGroup)
    {
     return _clientGroup.Count;
    }
   }
  }
  public List<Socket> ClientList
  {
   get
   {
    lock (_clientGroup)
    {
     return _clientGroup.Keys.ToList();
    }
   }
  }

  private void DealEventPool()
  {
   while (true)
   {
    SocketEventParam param = _socketEventPool.GetObj();
    if (param == null)
     return;

    if (param.SocketEvent == EN_SocketEvent.close)
    {
     lock (_clientGroup)
     {
      _clientGroup.Remove(param.Socket);
     }
    }

    if (_packetMinLen == 0)//字节流处理
    {
     OnSocketPacketEvent?.Invoke(param);
    }
    else
    {
     //组成一个完整的包 逻辑
     _clientPacketManage.PutSocketParam(param);
    }
   }
  }

  private void SocketConnectEvent(SocketEventParam param, AsyncSocketClient client)
  {
   try
   {
    if (param.Socket == null || client == null) //连接失败
    {
     
    }
    else
    {
     lock (_clientGroup)
     {
      bool remove = _clientGroup.Remove(client.ConnectSocket);
      Debug.Assert(!remove);
      _clientGroup.Add(client.ConnectSocket, client);
     }

     client.OnSocketClose += Client_OnSocketClose;
     client.OnReadData += Client_OnReadData;
     client.OnSendData += Client_OnSendData;

     _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));
    }
    _socketEventPool.PutObj(param);
   }
   catch (Exception ex)
   {
    NetLogger.Log(string.Format("SocketConnectEvent 异常 {0}***{1}", ex.Message, ex.StackTrace));
   }
  }

  internal void OnRcvPacketLenError(Socket socket, byte[] buffer, int offset, int packetLen)
  {
   try
   {
    lock (_clientGroup)
    {
     if (!_clientGroup.ContainsKey(socket))
     {
      Debug.Assert(false);
      return;
     }

     NetLogger.Log(string.Format("报长度异常!包长:{0}", packetLen));
     AsyncSocketClient client = _clientGroup[socket];
     client.CloseSocket();
    }
   }
   catch (Exception ex)
   {
    NetLogger.Log(string.Format("OnRcvPacketLenError 异常 {0}***{1}", ex.Message, ex.StackTrace));
   }
  }

  #region listen port
  private void Listener_OnAcceptSocket(ListenParam listenPatam, AsyncSocketClient client)
  {
   try
   {
    lock (_clientGroup)
    {
     bool remove = _clientGroup.Remove(client.ConnectSocket);
     Debug.Assert(!remove);
     _clientGroup.Add(client.ConnectSocket, client);
    }

    client.OnSocketClose += Client_OnSocketClose;
    client.OnReadData += Client_OnReadData;
    client.OnSendData += Client_OnSendData;

    _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));

    SocketEventParam param = new SocketEventParam(EN_SocketEvent.accept, client.ConnectSocket);
    param.ClientInfo = client.ClientInfo;

    _socketEventPool.PutObj(param);
   }
   catch (Exception ex)
   {
    NetLogger.Log(string.Format("Listener_OnAcceptSocket 异常 {0}***{1}", ex.Message, ex.StackTrace));
   }
  }


  ObjectPoolWithEvent<SocketEventDeal> _listSendEvent = new ObjectPoolWithEvent<SocketEventDeal>();
  private void NetSendProcess()
  {
   while (true)
   {
    DealSendEvent();
    _listSendEvent.WaitOne(1000);
   }
  }

  ObjectPoolWithEvent<SocketEventDeal> _listReadEvent = new ObjectPoolWithEvent<SocketEventDeal>();
  private void NetReadProcess()
  {
   while (true)
   {
    DealReadEvent();
    _listReadEvent.WaitOne(1000);
   }
  }

  
  private void DealSendEvent()
  {
   while (true)
   {
    SocketEventDeal item = _listSendEvent.GetObj();
    if (item == null)
     break;
    switch (item.SocketEvent)
    {
     case EN_SocketDealEvent.send:
      {
       while (true)
       {
        EN_SocketSendResult result = item.Client.SendNextData();
        if (result == EN_SocketSendResult.HaveSend)
         continue;
        else
         break;
       }
      }
      break;
     case EN_SocketDealEvent.read:
      {
       Debug.Assert(false);
      }
      break;     
    }
   }
  }

  private void DealReadEvent()
  {
   while (true)
   {
    SocketEventDeal item = _listReadEvent.GetObj();
    if (item == null)
     break;
    switch (item.SocketEvent)
    {
     case EN_SocketDealEvent.read:
      {
       while (true)
       {
        EN_SocketReadResult result = item.Client.ReadNextData();
        if (result == EN_SocketReadResult.HaveRead)
         continue;
        else
         break;
       }
      }
      break;
     case EN_SocketDealEvent.send:
      {
       Debug.Assert(false);
      }
      break;
    }
   }
  }

  private void Client_OnReadData(AsyncSocketClient client, byte[] readData)
  {
   //读下一条
   _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));

   try
   {
    SocketEventParam param = new SocketEventParam(EN_SocketEvent.read, client.ConnectSocket);
    param.ClientInfo = client.ClientInfo;
    param.Data = readData;
    _socketEventPool.PutObj(param);

    lock (this)
    {
     ReadByteCount += readData.Length;
    }
   }
   catch (Exception ex)
   {
    NetLogger.Log(string.Format("Client_OnReadData 异常 {0}***{1}", ex.Message, ex.StackTrace));
   }
  }
#endregion

  private void Client_OnSendData(AsyncSocketClient client, int sendCount)
  {
   //发送下一条
   _listSendEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.send));
   lock (this)
   {
    SendByteCount += sendCount;
   }
  }

  private void Client_OnSocketClose(AsyncSocketClient client)
  {
   try
   {
    SocketEventParam param = new SocketEventParam(EN_SocketEvent.close, client.ConnectSocket);
    param.ClientInfo = client.ClientInfo;
    _socketEventPool.PutObj(param);
   }
   catch (Exception ex)
   {
    NetLogger.Log(string.Format("Client_OnSocketClose 异常 {0}***{1}", ex.Message, ex.StackTrace));
   }
  }

  /// <summary>
  /// 放到发送缓冲
  /// </summary>
  /// <param name="socket"></param>
  /// <param name="data"></param>
  /// <returns></returns>
  public EN_SendDataResult SendData(Socket socket, byte[] data)
  {
   if (socket == null)
    return EN_SendDataResult.no_client;
   lock (_clientGroup)
   {
    if (!_clientGroup.ContainsKey(socket))
     return EN_SendDataResult.no_client;
    AsyncSocketClient client = _clientGroup[socket];
    EN_SendDataResult result = client.PutSendData(data);
    if (result == EN_SendDataResult.ok)
    {
     //发送下一条
     _listSendEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.send));     
    }
    return result;
   }
  }

  /// <summary>
  /// 设置某个连接的发送缓冲大小
  /// </summary>
  /// <param name="socket"></param>
  /// <param name="byteCount"></param>
  /// <returns></returns>
  public bool SetClientSendBuffer(Socket socket, int byteCount)
  {
   lock (_clientGroup)
   {
    if (!_clientGroup.ContainsKey(socket))
     return false;
    AsyncSocketClient client = _clientGroup[socket];
    client.SendBufferByteCount = byteCount;
    return true;
   }
  }


  #region connect process
  NetConnectManage _netConnectManage = new NetConnectManage();
  /// <summary>
  /// 异步连接一个客户端
  /// </summary>
  /// <param name="peerIp"></param>
  /// <param name="peerPort"></param>
  /// <param name="tag"></param>
  /// <returns></returns>
  public bool ConnectAsyn(string peerIp, int peerPort, object tag)
  {
   return _netConnectManage.ConnectAsyn(peerIp, peerPort, tag);
  }

  /// <summary>
  /// 同步连接一个客户端
  /// </summary>
  /// <param name="peerIp"></param>
  /// <param name="peerPort"></param>
  /// <param name="tag"></param>
  /// <param name="socket"></param>
  /// <returns></returns>
  public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)
  {
   return _netConnectManage.Connect(peerIp, peerPort, tag, out socket);
  }
  #endregion
 }

 enum EN_SocketDealEvent
 {
  read,
  send,
 }
 class SocketEventDeal
 {
  public AsyncSocketClient Client { get; set; }
  public EN_SocketDealEvent SocketEvent { get; set; }
  public SocketEventDeal(AsyncSocketClient client, EN_SocketDealEvent socketEvent)
  {
   Client = client;
   SocketEvent = socketEvent;
  }
 }
}