NetServer 聚合其他类
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net.Sockets;
using System.Threading;
namespace IocpCore
{
public class NetServer
{
public Action<SocketEventParam> OnSocketPacketEvent;
//每个连接发送缓冲大小
public int SendBufferBytePerClient { get; set; } = 1024 * 100;
bool _serverStart = false;
List<NetListener> _listListener = new List<NetListener>();
//负责对收到的字节流 组成完成的包
ClientPacketManage _clientPacketManage;
public Int64 SendByteCount { get; set; }
public Int64 ReadByteCount { get; set; }
List<ListenParam> _listListenPort = new List<ListenParam>();
public void AddListenPort(int port, object tag)
{
_listListenPort.Add(new ListenParam(port, tag));
}
/// <summary>
///
/// </summary>
/// <param name="listenFault">监听失败的端口</param>
/// <returns></returns>
public bool StartListen(out List<int> listenFault)
{
_serverStart = true;
_clientPacketManage = new ClientPacketManage(this);
_clientPacketManage.OnSocketPacketEvent += PutClientPacket;
_netConnectManage.OnSocketConnectEvent += SocketConnectEvent;
_listListener.Clear();
Thread thread1 = new Thread(new ThreadStart(NetPacketProcess));
thread1.Start();
Thread thread2 = new Thread(new ThreadStart(NetSendProcess));
thread2.Start();
Thread thread3 = new Thread(new ThreadStart(NetReadProcess));
thread3.Start();
listenFault = new List<int>();
foreach (ListenParam param in _listListenPort)
{
NetListener listener = new NetListener(this);
listener._listenParam = param;
listener.OnAcceptSocket += Listener_OnAcceptSocket;
if (!listener.StartListen())
{
listenFault.Add(param._port);
}
else
{
_listListener.Add(listener);
NetLogger.Log(string.Format("监听成功!端口:{0}", param._port));
}
}
return listenFault.Count == 0;
}
public void PutClientPacket(SocketEventParam param)
{
OnSocketPacketEvent?.Invoke(param);
}
//获取包的最小长度
int _packetMinLen;
int _packetMaxLen;
public int PacketMinLen
{
get { return _packetMinLen; }
}
public int PacketMaxLen
{
get { return _packetMaxLen; }
}
/// <summary>
/// 设置包的最小和最大长度
/// 当minLen=0时,认为是接收字节流
/// </summary>
/// <param name="minLen"></param>
/// <param name="maxLen"></param>
public void SetPacketParam(int minLen, int maxLen)
{
Debug.Assert(minLen >= 0);
Debug.Assert(maxLen > minLen);
_packetMinLen = minLen;
_packetMaxLen = maxLen;
}
//获取包的总长度
public delegate int delegate_GetPacketTotalLen(byte[] data, int offset);
public delegate_GetPacketTotalLen GetPacketTotalLen_Callback;
ObjectPoolWithEvent<SocketEventParam> _socketEventPool = new ObjectPoolWithEvent<SocketEventParam>();
private void NetPacketProcess()
{
while (_serverStart)
{
try
{
DealEventPool();
}
catch (Exception ex)
{
NetLogger.Log(string.Format("DealEventPool 异常 {0}***{1}", ex.Message, ex.StackTrace));
}
_socketEventPool.WaitOne(1000);
}
}
Dictionary<Socket, AsyncSocketClient> _clientGroup = new Dictionary<Socket, AsyncSocketClient>();
public int ClientCount
{
get
{
lock (_clientGroup)
{
return _clientGroup.Count;
}
}
}
public List<Socket> ClientList
{
get
{
lock (_clientGroup)
{
return _clientGroup.Keys.ToList();
}
}
}
private void DealEventPool()
{
while (true)
{
SocketEventParam param = _socketEventPool.GetObj();
if (param == null)
return;
if (param.SocketEvent == EN_SocketEvent.close)
{
lock (_clientGroup)
{
_clientGroup.Remove(param.Socket);
}
}
if (_packetMinLen == 0)//字节流处理
{
OnSocketPacketEvent?.Invoke(param);
}
else
{
//组成一个完整的包 逻辑
_clientPacketManage.PutSocketParam(param);
}
}
}
private void SocketConnectEvent(SocketEventParam param, AsyncSocketClient client)
{
try
{
if (param.Socket == null || client == null) //连接失败
{
}
else
{
lock (_clientGroup)
{
bool remove = _clientGroup.Remove(client.ConnectSocket);
Debug.Assert(!remove);
_clientGroup.Add(client.ConnectSocket, client);
}
client.OnSocketClose += Client_OnSocketClose;
client.OnReadData += Client_OnReadData;
client.OnSendData += Client_OnSendData;
_listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));
}
_socketEventPool.PutObj(param);
}
catch (Exception ex)
{
NetLogger.Log(string.Format("SocketConnectEvent 异常 {0}***{1}", ex.Message, ex.StackTrace));
}
}
internal void OnRcvPacketLenError(Socket socket, byte[] buffer, int offset, int packetLen)
{
try
{
lock (_clientGroup)
{
if (!_clientGroup.ContainsKey(socket))
{
Debug.Assert(false);
return;
}
NetLogger.Log(string.Format("报长度异常!包长:{0}", packetLen));
AsyncSocketClient client = _clientGroup[socket];
client.CloseSocket();
}
}
catch (Exception ex)
{
NetLogger.Log(string.Format("OnRcvPacketLenError 异常 {0}***{1}", ex.Message, ex.StackTrace));
}
}
#region listen port
private void Listener_OnAcceptSocket(ListenParam listenPatam, AsyncSocketClient client)
{
try
{
lock (_clientGroup)
{
bool remove = _clientGroup.Remove(client.ConnectSocket);
Debug.Assert(!remove);
_clientGroup.Add(client.ConnectSocket, client);
}
client.OnSocketClose += Client_OnSocketClose;
client.OnReadData += Client_OnReadData;
client.OnSendData += Client_OnSendData;
_listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));
SocketEventParam param = new SocketEventParam(EN_SocketEvent.accept, client.ConnectSocket);
param.ClientInfo = client.ClientInfo;
_socketEventPool.PutObj(param);
}
catch (Exception ex)
{
NetLogger.Log(string.Format("Listener_OnAcceptSocket 异常 {0}***{1}", ex.Message, ex.StackTrace));
}
}
ObjectPoolWithEvent<SocketEventDeal> _listSendEvent = new ObjectPoolWithEvent<SocketEventDeal>();
private void NetSendProcess()
{
while (true)
{
DealSendEvent();
_listSendEvent.WaitOne(1000);
}
}
ObjectPoolWithEvent<SocketEventDeal> _listReadEvent = new ObjectPoolWithEvent<SocketEventDeal>();
private void NetReadProcess()
{
while (true)
{
DealReadEvent();
_listReadEvent.WaitOne(1000);
}
}
private void DealSendEvent()
{
while (true)
{
SocketEventDeal item = _listSendEvent.GetObj();
if (item == null)
break;
switch (item.SocketEvent)
{
case EN_SocketDealEvent.send:
{
while (true)
{
EN_SocketSendResult result = item.Client.SendNextData();
if (result == EN_SocketSendResult.HaveSend)
continue;
else
break;
}
}
break;
case EN_SocketDealEvent.read:
{
Debug.Assert(false);
}
break;
}
}
}
private void DealReadEvent()
{
while (true)
{
SocketEventDeal item = _listReadEvent.GetObj();
if (item == null)
break;
switch (item.SocketEvent)
{
case EN_SocketDealEvent.read:
{
while (true)
{
EN_SocketReadResult result = item.Client.ReadNextData();
if (result == EN_SocketReadResult.HaveRead)
continue;
else
break;
}
}
break;
case EN_SocketDealEvent.send:
{
Debug.Assert(false);
}
break;
}
}
}
private void Client_OnReadData(AsyncSocketClient client, byte[] readData)
{
//读下一条
_listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));
try
{
SocketEventParam param = new SocketEventParam(EN_SocketEvent.read, client.ConnectSocket);
param.ClientInfo = client.ClientInfo;
param.Data = readData;
_socketEventPool.PutObj(param);
lock (this)
{
ReadByteCount += readData.Length;
}
}
catch (Exception ex)
{
NetLogger.Log(string.Format("Client_OnReadData 异常 {0}***{1}", ex.Message, ex.StackTrace));
}
}
#endregion
private void Client_OnSendData(AsyncSocketClient client, int sendCount)
{
//发送下一条
_listSendEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.send));
lock (this)
{
SendByteCount += sendCount;
}
}
private void Client_OnSocketClose(AsyncSocketClient client)
{
try
{
SocketEventParam param = new SocketEventParam(EN_SocketEvent.close, client.ConnectSocket);
param.ClientInfo = client.ClientInfo;
_socketEventPool.PutObj(param);
}
catch (Exception ex)
{
NetLogger.Log(string.Format("Client_OnSocketClose 异常 {0}***{1}", ex.Message, ex.StackTrace));
}
}
/// <summary>
/// 放到发送缓冲
/// </summary>
/// <param name="socket"></param>
/// <param name="data"></param>
/// <returns></returns>
public EN_SendDataResult SendData(Socket socket, byte[] data)
{
if (socket == null)
return EN_SendDataResult.no_client;
lock (_clientGroup)
{
if (!_clientGroup.ContainsKey(socket))
return EN_SendDataResult.no_client;
AsyncSocketClient client = _clientGroup[socket];
EN_SendDataResult result = client.PutSendData(data);
if (result == EN_SendDataResult.ok)
{
//发送下一条
_listSendEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.send));
}
return result;
}
}
/// <summary>
/// 设置某个连接的发送缓冲大小
/// </summary>
/// <param name="socket"></param>
/// <param name="byteCount"></param>
/// <returns></returns>
public bool SetClientSendBuffer(Socket socket, int byteCount)
{
lock (_clientGroup)
{
if (!_clientGroup.ContainsKey(socket))
return false;
AsyncSocketClient client = _clientGroup[socket];
client.SendBufferByteCount = byteCount;
return true;
}
}
#region connect process
NetConnectManage _netConnectManage = new NetConnectManage();
/// <summary>
/// 异步连接一个客户端
/// </summary>
/// <param name="peerIp"></param>
/// <param name="peerPort"></param>
/// <param name="tag"></param>
/// <returns></returns>
public bool ConnectAsyn(string peerIp, int peerPort, object tag)
{
return _netConnectManage.ConnectAsyn(peerIp, peerPort, tag);
}
/// <summary>
/// 同步连接一个客户端
/// </summary>
/// <param name="peerIp"></param>
/// <param name="peerPort"></param>
/// <param name="tag"></param>
/// <param name="socket"></param>
/// <returns></returns>
public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)
{
return _netConnectManage.Connect(peerIp, peerPort, tag, out socket);
}
#endregion
}
enum EN_SocketDealEvent
{
read,
send,
}
class SocketEventDeal
{
public AsyncSocketClient Client { get; set; }
public EN_SocketDealEvent SocketEvent { get; set; }
public SocketEventDeal(AsyncSocketClient client, EN_SocketDealEvent socketEvent)
{
Client = client;
SocketEvent = socketEvent;
}
}
}










