C#中一个高性能异步socket封装库的实现思路分享

作者:源之缘 时间:2023-12-01 15:20:45 

前言

socket是软件之间通讯最常用的一种方式。c#实现socket通讯有很多中方法,其中效率最高就是异步通讯。

异步通讯实际是利用windows完成端口(IOCP)来处理的,关于完成端口实现原理,大家可以参考网上文章。

我这里想强调的是采用完成端口机制的异步通讯是windows下效率最高的通讯方式,没有之一!

异步通讯比同步通讯处理要难很多,代码编写中会遇到许多“坑“。如果没有经验,很难完成。

我搜集了大量资料,完成了对异步socket的封装。此库已用稳定高效的运行几个月。

纵观网上的资料,我还没有遇到一个满意的封装库。许多文章把数据收发和协议处理杂糅在一块,代码非常难懂,也无法扩展。

在编写该库时,避免以上缺陷。将逻辑处理层次化,模块化!同时实现了高可用性与高性能。

为了使大家对通讯效率有初步了解,先看测试图。

C#中一个高性能异步socket封装库的实现思路分享

主机配置情况

C#中一个高性能异步socket封装库的实现思路分享

百兆带宽基本占满,cpu占用40%,我的电脑在空闲时,cpu占用大概20%,也就是说程序占用cpu 20%左右。

这个库是可扩展的,就是说即使10万个连接,收发同样的数据,cpu占用基本相同。

库的结构图

C#中一个高性能异步socket封装库的实现思路分享

目标

即可作为服务端(监听)也可以作为客户端(主动连接)使用。

可以适应任何网络协议。收发的数据针对字节流或一个完整的包。对协议内容不做处理。

高可用性。将复杂的底层处理封装,对外接口非常友好。

高性能。最大限度优化处理。单机可支持数万连接,收发速度可达几百兆bit。

实现思路

网络处理逻辑可以分为以下几个部分:

网络监听 可以在多个端口实现监听。负责生成socket,生成的socket供后续处理。监听模块功能比较单一,如有必要,可对监听模块做进一步优化。

主动连接 可以异步或同步的连接对方。连接成功后,对socket的后续处理,与监听得到的socket完全一样。注:无论是监听得到的socket,还是连接得到的socket,后续处理完全一样。

Socket收发处理 每个socket对应一个收发实例,socket收发只针对字节流处理。收发时,做了优化。比如发送时,对数据做了沾包,提高发送性能;接收时,一次投递1K的数据。

组包处理 一般数据包都有包长度指示;比如 报头的前俩个字节表示长度,根据这个值就可以组成一个完整的包。

NetListener 监听


using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;

namespace IocpCore
{
class NetListener
{
 private Socket listenSocket;
 public ListenParam _listenParam { get; set; }
 public event Action<ListenParam, AsyncSocketClient> OnAcceptSocket;

bool start;

NetServer _netServer;
 public NetListener(NetServer netServer)
 {
  _netServer = netServer;
 }

public int _acceptAsyncCount = 0;
 public bool StartListen()
 {
  try
  {
   start = true;
   IPEndPoint listenPoint = new IPEndPoint(IPAddress.Parse("0.0.0.0"), _listenParam._port);
   listenSocket = new Socket(listenPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
   listenSocket.Bind(listenPoint);
   listenSocket.Listen(200);

Thread thread1 = new Thread(new ThreadStart(NetProcess));
   thread1.Start();

StartAccept();
   return true;
  }
  catch (Exception ex)
  {
   NetLogger.Log(string.Format("**监听异常!{0}", ex.Message));
   return false;
  }
 }

AutoResetEvent _acceptEvent = new AutoResetEvent(false);
 private void NetProcess()
 {
  while (start)
  {
   DealNewAccept();
   _acceptEvent.WaitOne(1000 * 10);
  }
 }

private void DealNewAccept()
 {
  try
  {
   if(_acceptAsyncCount <= 10)
   {
    StartAccept();
   }

while (true)
   {
    AsyncSocketClient client = _newSocketClientList.GetObj();
    if (client == null)
     break;

DealNewAccept(client);
   }
  }
  catch (Exception ex)
  {
   NetLogger.Log(string.Format("DealNewAccept 异常 {0}***{1}", ex.Message, ex.StackTrace));
  }
 }

private void DealNewAccept(AsyncSocketClient client)
 {
  client.SendBufferByteCount = _netServer.SendBufferBytePerClient;
  OnAcceptSocket?.Invoke(_listenParam, client);
 }

private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs acceptEventArgs)
 {
  try
  {
   Interlocked.Decrement(ref _acceptAsyncCount);
   _acceptEvent.Set();
   acceptEventArgs.Completed -= AcceptEventArg_Completed;
   ProcessAccept(acceptEventArgs);
  }
  catch (Exception ex)
  {
   NetLogger.Log(string.Format("AcceptEventArg_Completed {0}***{1}", ex.Message, ex.StackTrace));
  }
 }

public bool StartAccept()
 {
  SocketAsyncEventArgs acceptEventArgs = new SocketAsyncEventArgs();
  acceptEventArgs.Completed += AcceptEventArg_Completed;

bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArgs);
  Interlocked.Increment(ref _acceptAsyncCount);

if (!willRaiseEvent)
  {
   Interlocked.Decrement(ref _acceptAsyncCount);
   _acceptEvent.Set();
   acceptEventArgs.Completed -= AcceptEventArg_Completed;
   ProcessAccept(acceptEventArgs);
  }
  return true;
 }

ObjectPool<AsyncSocketClient> _newSocketClientList = new ObjectPool<AsyncSocketClient>();
 private void ProcessAccept(SocketAsyncEventArgs acceptEventArgs)
 {
  try
  {
   using (acceptEventArgs)
   {
    if (acceptEventArgs.AcceptSocket != null)
    {
     AsyncSocketClient client = new AsyncSocketClient(acceptEventArgs.AcceptSocket);
     client.CreateClientInfo(this);

_newSocketClientList.PutObj(client);
     _acceptEvent.Set();
    }
   }
  }
  catch (Exception ex)
  {
   NetLogger.Log(string.Format("ProcessAccept {0}***{1}", ex.Message, ex.StackTrace));
  }
 }
}
}

NetConnectManage连接处理


using System;
using System.Net;
using System.Net.Sockets;

namespace IocpCore
{
class NetConnectManage
{
 public event Action<SocketEventParam, AsyncSocketClient> OnSocketConnectEvent;

public bool ConnectAsyn(string peerIp, int peerPort, object tag)
 {
  try
  {
   Socket socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
   SocketAsyncEventArgs socketEventArgs = new SocketAsyncEventArgs();
   socketEventArgs.RemoteEndPoint = new IPEndPoint(IPAddress.Parse(peerIp), peerPort);
   socketEventArgs.Completed += SocketConnect_Completed;

SocketClientInfo clientInfo = new SocketClientInfo();
   socketEventArgs.UserToken = clientInfo;
   clientInfo.PeerIp = peerIp;
   clientInfo.PeerPort = peerPort;
   clientInfo.Tag = tag;

bool willRaiseEvent = socket.ConnectAsync(socketEventArgs);
   if (!willRaiseEvent)
   {
    ProcessConnect(socketEventArgs);
    socketEventArgs.Completed -= SocketConnect_Completed;
    socketEventArgs.Dispose();
   }
   return true;
  }
  catch (Exception ex)
  {
   NetLogger.Log("ConnectAsyn",ex);
   return false;
  }
 }

private void SocketConnect_Completed(object sender, SocketAsyncEventArgs socketEventArgs)
 {
  ProcessConnect(socketEventArgs);
  socketEventArgs.Completed -= SocketConnect_Completed;
  socketEventArgs.Dispose();
 }

private void ProcessConnect(SocketAsyncEventArgs socketEventArgs)
 {
  SocketClientInfo clientInfo = socketEventArgs.UserToken as SocketClientInfo;
  if (socketEventArgs.SocketError == SocketError.Success)
  {
   DealConnectSocket(socketEventArgs.ConnectSocket, clientInfo);
  }
  else
  {
   SocketEventParam socketParam = new SocketEventParam(EN_SocketEvent.connect, null);
   socketParam.ClientInfo = clientInfo;
   OnSocketConnectEvent?.Invoke(socketParam, null);
  }
 }

void DealConnectSocket(Socket socket, SocketClientInfo clientInfo)
 {
  clientInfo.SetClientInfo(socket);

AsyncSocketClient client = new AsyncSocketClient(socket);
  client.SetClientInfo(clientInfo);

//触发事件
  SocketEventParam socketParam = new SocketEventParam(EN_SocketEvent.connect, socket);
  socketParam.ClientInfo = clientInfo;
  OnSocketConnectEvent?.Invoke(socketParam, client);
 }

public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)
 {
  socket = null;
  try
  {
   Socket socketTmp = new Socket(SocketType.Stream, ProtocolType.Tcp);

SocketClientInfo clientInfo = new SocketClientInfo();
   clientInfo.PeerIp = peerIp;
   clientInfo.PeerPort = peerPort;
   clientInfo.Tag = tag;

EndPoint remoteEP = new IPEndPoint(IPAddress.Parse(peerIp), peerPort);
   socketTmp.Connect(remoteEP);
   if (!socketTmp.Connected)
    return false;

DealConnectSocket(socketTmp, clientInfo);
   socket = socketTmp;
   return true;
  }
  catch (Exception ex)
  {
   NetLogger.Log(string.Format("连接对方:({0}:{1})出错!", peerIp, peerPort), ex);
   return false;
  }
 }
}
}

AsyncSocketClient socket收发处理


using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Net.Sockets;

namespace IocpCore
{
public class AsyncSocketClient
{
 public static int IocpReadLen = 1024;

public readonly Socket ConnectSocket;

protected SocketAsyncEventArgs m_receiveEventArgs;
 public SocketAsyncEventArgs ReceiveEventArgs { get { return m_receiveEventArgs; } set { m_receiveEventArgs = value; } }
 protected byte[] m_asyncReceiveBuffer;

protected SocketAsyncEventArgs m_sendEventArgs;
 public SocketAsyncEventArgs SendEventArgs { get { return m_sendEventArgs; } set { m_sendEventArgs = value; } }
 protected byte[] m_asyncSendBuffer;

public event Action<AsyncSocketClient, byte[]> OnReadData;
 public event Action<AsyncSocketClient, int> OnSendData;
 public event Action<AsyncSocketClient> OnSocketClose;

static object releaseLock = new object();
 public static int createCount = 0;
 public static int releaseCount = 0;

~AsyncSocketClient()
 {
  lock (releaseLock)
  {
   releaseCount++;
  }
 }

public AsyncSocketClient(Socket socket)
 {
  lock (releaseLock)
  {
   createCount++;
  }

ConnectSocket = socket;

m_receiveEventArgs = new SocketAsyncEventArgs();
  m_asyncReceiveBuffer = new byte[IocpReadLen];
  m_receiveEventArgs.AcceptSocket = ConnectSocket;
  m_receiveEventArgs.Completed += ReceiveEventArgs_Completed;

m_sendEventArgs = new SocketAsyncEventArgs();
  m_asyncSendBuffer = new byte[IocpReadLen * 2];
  m_sendEventArgs.AcceptSocket = ConnectSocket;
  m_sendEventArgs.Completed += SendEventArgs_Completed;
 }

SocketClientInfo _clientInfo;

public SocketClientInfo ClientInfo
 {
  get
  {
   return _clientInfo;
  }
 }

internal void CreateClientInfo(NetListener netListener)
 {
  _clientInfo = new SocketClientInfo();
  try
  {
   _clientInfo.Tag = netListener._listenParam._tag;
   IPEndPoint ip = ConnectSocket.LocalEndPoint as IPEndPoint;
   Debug.Assert(netListener._listenParam._port == ip.Port);

_clientInfo.LocalIp = ip.Address.ToString();
   _clientInfo.LocalPort = netListener._listenParam._port;

ip = ConnectSocket.RemoteEndPoint as IPEndPoint;
   _clientInfo.PeerIp = ip.Address.ToString();
   _clientInfo.PeerPort = ip.Port;
  }
  catch (Exception ex)
  {
   NetLogger.Log("CreateClientInfo", ex);
  }
 }
 internal void SetClientInfo(SocketClientInfo clientInfo)
 {
  _clientInfo = clientInfo;
 }

#region read process
 bool _inReadPending = false;
 public EN_SocketReadResult ReadNextData()
 {
  lock (this)
  {
   if (_socketError)
    return EN_SocketReadResult.ReadError;
   if (_inReadPending)
    return EN_SocketReadResult.InAsyn;
   if(!ConnectSocket.Connected)
   {
    OnReadError();
    return EN_SocketReadResult.ReadError;
   }

try
   {
    m_receiveEventArgs.SetBuffer(m_asyncReceiveBuffer, 0, m_asyncReceiveBuffer.Length);
    _inReadPending = true;
    bool willRaiseEvent = ConnectSocket.ReceiveAsync(ReceiveEventArgs); //投递接收请求
    if (!willRaiseEvent)
    {
     _inReadPending = false;
     ProcessReceive();
     if (_socketError)
     {
      OnReadError();
      return EN_SocketReadResult.ReadError;
     }
     return EN_SocketReadResult.HaveRead;
    }
    else
    {
     return EN_SocketReadResult.InAsyn;
    }
   }
   catch (Exception ex)
   {
    NetLogger.Log("ReadNextData", ex);
    _inReadPending = false;
    OnReadError();
    return EN_SocketReadResult.ReadError;
   }
  }
 }

private void ProcessReceive()
 {
  if (ReceiveEventArgs.BytesTransferred > 0
   && ReceiveEventArgs.SocketError == SocketError.Success)
  {
   int offset = ReceiveEventArgs.Offset;
   int count = ReceiveEventArgs.BytesTransferred;

byte[] readData = new byte[count];
   Array.Copy(m_asyncReceiveBuffer, offset, readData, 0, count);

_inReadPending = false;
   if (!_socketError)
    OnReadData?.Invoke(this, readData);
  }
  else
  {
   _inReadPending = false;
   OnReadError();
  }
 }

private void ReceiveEventArgs_Completed(object sender, SocketAsyncEventArgs e)
 {
  lock (this)
  {
   _inReadPending = false;
   ProcessReceive();
   if (_socketError)
   {
    OnReadError();
   }
  }
 }

bool _socketError = false;
 private void OnReadError()
 {
  lock (this)
  {
   if (_socketError == false)
   {
    _socketError = true;
    OnSocketClose?.Invoke(this);
   }
   CloseClient();
  }
 }
 #endregion

#region send process
 int _sendBufferByteCount = 102400;
 public int SendBufferByteCount
 {
  get
  {
   return _sendBufferByteCount;
  }
  set
  {
   if (value < 1024)
   {
    _sendBufferByteCount = 1024;
   }
   else
   {
    _sendBufferByteCount = value;
   }
  }
 }

SendBufferPool _sendDataPool = new SendBufferPool();
 internal EN_SendDataResult PutSendData(byte[] data)
 {
  if (_socketError)
   return EN_SendDataResult.no_client;

if (_sendDataPool._bufferByteCount >= _sendBufferByteCount)
  {
   return EN_SendDataResult.buffer_overflow;
  }

if (data.Length <= IocpReadLen)
  {
   _sendDataPool.PutObj(data);
  }
  else
  {
   List<byte[]> dataItems = SplitData(data, IocpReadLen);
   foreach (byte[] item in dataItems)
   {
    _sendDataPool.PutObj(item);
   }
  }

return EN_SendDataResult.ok;
 }

bool _inSendPending = false;
 public EN_SocketSendResult SendNextData()
 {
  lock (this)
  {
   if (_socketError)
   {
    return EN_SocketSendResult.SendError;
   }

if (_inSendPending)
   {
    return EN_SocketSendResult.InAsyn;
   }

int sendByteCount = GetSendData();
   if (sendByteCount == 0)
   {
    return EN_SocketSendResult.NoSendData;
   }

//防止抛出异常,否则影响性能
   if (!ConnectSocket.Connected)
   {
    OnSendError();
    return EN_SocketSendResult.SendError;
   }

try
   {
    m_sendEventArgs.SetBuffer(m_asyncSendBuffer, 0, sendByteCount);
    _inSendPending = true;
    bool willRaiseEvent = ConnectSocket.SendAsync(m_sendEventArgs);
    if (!willRaiseEvent)
    {
     _inSendPending = false;
     ProcessSend(m_sendEventArgs);
     if (_socketError)
     {
      OnSendError();
      return EN_SocketSendResult.SendError;
     }
     else
     {
      OnSendData?.Invoke(this, sendByteCount);
      //继续发下一条
      return EN_SocketSendResult.HaveSend;
     }
    }
    else
    {
     return EN_SocketSendResult.InAsyn;
    }
   }
   catch (Exception ex)
   {
    NetLogger.Log("SendNextData", ex);
    _inSendPending = false;
    OnSendError();
    return EN_SocketSendResult.SendError;
   }
  }
 }

private void SendEventArgs_Completed(object sender, SocketAsyncEventArgs sendEventArgs)
 {
  lock (this)
  {
   try
   {
    _inSendPending = false;
    ProcessSend(m_sendEventArgs);

int sendCount = 0;
    if (sendEventArgs.SocketError == SocketError.Success)
    {
     sendCount = sendEventArgs.BytesTransferred;
    }
    OnSendData?.Invoke(this, sendCount);

if (_socketError)
    {
     OnSendError();
    }
   }
   catch (Exception ex)
   {
    NetLogger.Log("SendEventArgs_Completed", ex);
   }
  }
 }

private bool ProcessSend(SocketAsyncEventArgs sendEventArgs)
 {
  if (sendEventArgs.SocketError == SocketError.Success)
  {
   return true;
  }
  else
  {
   OnSendError();
   return false;
  }
 }

private int GetSendData()
 {
  int dataLen = 0;
  while (true)
  {
   byte[] data = _sendDataPool.GetObj();
   if (data == null)
    return dataLen;
   Array.Copy(data, 0, m_asyncSendBuffer, dataLen, data.Length);
   dataLen += data.Length;
   if (dataLen > IocpReadLen)
    break;
  }
  return dataLen;
 }
 private void OnSendError()
 {
  lock (this)
  {
   if (_socketError == false)
   {
    _socketError = true;
    OnSocketClose?.Invoke(this);
   }
   CloseClient();
  }
 }
 #endregion

internal void CloseSocket()
 {
  try
  {
   ConnectSocket.Close();
  }
  catch (Exception ex)
  {
   NetLogger.Log("CloseSocket", ex);
  }
 }

static object socketCloseLock = new object();
 public static int closeSendCount = 0;
 public static int closeReadCount = 0;

bool _disposeSend = false;
 void CloseSend()
 {
  if (!_disposeSend && !_inSendPending)
  {
   lock (socketCloseLock)
    closeSendCount++;

_disposeSend = true;
   m_sendEventArgs.SetBuffer(null, 0, 0);
   m_sendEventArgs.Completed -= SendEventArgs_Completed;
   m_sendEventArgs.Dispose();
  }
 }

bool _disposeRead = false;
 void CloseRead()
 {
  if (!_disposeRead && !_inReadPending)
  {
   lock (socketCloseLock)
    closeReadCount++;

_disposeRead = true;
   m_receiveEventArgs.SetBuffer(null, 0, 0);
   m_receiveEventArgs.Completed -= ReceiveEventArgs_Completed;
   m_receiveEventArgs.Dispose();
  }
 }
 private void CloseClient()
 {
  try
  {
   CloseSend();
   CloseRead();
   ConnectSocket.Close();
  }
  catch (Exception ex)
  {
   NetLogger.Log("CloseClient", ex);
  }
 }

//发送缓冲大小
 private List<byte[]> SplitData(byte[] data, int maxLen)
 {
  List<byte[]> items = new List<byte[]>();

int start = 0;
  while (true)
  {
   int itemLen = Math.Min(maxLen, data.Length - start);
   if (itemLen == 0)
    break;
   byte[] item = new byte[itemLen];
   Array.Copy(data, start, item, 0, itemLen);
   items.Add(item);

start += itemLen;
  }
  return items;
 }
}

public enum EN_SocketReadResult
{
 InAsyn,
 HaveRead,
 ReadError
}

public enum EN_SocketSendResult
{
 InAsyn,
 HaveSend,
 NoSendData,
 SendError
}

class SendBufferPool
{
 ObjectPool<byte[]> _bufferPool = new ObjectPool<byte[]>();

public Int64 _bufferByteCount = 0;
 public bool PutObj(byte[] obj)
 {
  if (_bufferPool.PutObj(obj))
  {
   lock (this)
   {
    _bufferByteCount += obj.Length;
   }
   return true;
  }
  else
  {
   return false;
  }
 }

public byte[] GetObj()
 {
  byte[] result = _bufferPool.GetObj();
  if (result != null)
  {
   lock (this)
   {
    _bufferByteCount -= result.Length;
   }
  }
  return result;
 }
}
}

NetServer 聚合其他类


using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net.Sockets;
using System.Threading;

namespace IocpCore
{
public class NetServer
{
 public Action<SocketEventParam> OnSocketPacketEvent;

//每个连接发送缓冲大小
 public int SendBufferBytePerClient { get; set; } = 1024 * 100;

bool _serverStart = false;
 List<NetListener> _listListener = new List<NetListener>();

//负责对收到的字节流 组成完成的包
 ClientPacketManage _clientPacketManage;

public Int64 SendByteCount { get; set; }
 public Int64 ReadByteCount { get; set; }

List<ListenParam> _listListenPort = new List<ListenParam>();
 public void AddListenPort(int port, object tag)
 {
  _listListenPort.Add(new ListenParam(port, tag));
 }
 /// <summary>
 ///
 /// </summary>
 /// <param name="listenFault">监听失败的端口</param>
 /// <returns></returns>
 public bool StartListen(out List<int> listenFault)
 {
  _serverStart = true;

_clientPacketManage = new ClientPacketManage(this);
  _clientPacketManage.OnSocketPacketEvent += PutClientPacket;

_netConnectManage.OnSocketConnectEvent += SocketConnectEvent;

_listListener.Clear();
  Thread thread1 = new Thread(new ThreadStart(NetPacketProcess));
  thread1.Start();

Thread thread2 = new Thread(new ThreadStart(NetSendProcess));
  thread2.Start();

Thread thread3 = new Thread(new ThreadStart(NetReadProcess));
  thread3.Start();

listenFault = new List<int>();
  foreach (ListenParam param in _listListenPort)
  {
   NetListener listener = new NetListener(this);
   listener._listenParam = param;
   listener.OnAcceptSocket += Listener_OnAcceptSocket;
   if (!listener.StartListen())
   {
    listenFault.Add(param._port);
   }
   else
   {
    _listListener.Add(listener);
    NetLogger.Log(string.Format("监听成功!端口:{0}", param._port));
   }
  }

return listenFault.Count == 0;
 }

public void PutClientPacket(SocketEventParam param)
 {
  OnSocketPacketEvent?.Invoke(param);
 }

//获取包的最小长度
 int _packetMinLen;
 int _packetMaxLen;
 public int PacketMinLen
 {
  get { return _packetMinLen; }
 }
 public int PacketMaxLen
 {
  get { return _packetMaxLen; }
 }

/// <summary>
 /// 设置包的最小和最大长度
 /// 当minLen=0时,认为是接收字节流
 /// </summary>
 /// <param name="minLen"></param>
 /// <param name="maxLen"></param>
 public void SetPacketParam(int minLen, int maxLen)
 {
  Debug.Assert(minLen >= 0);
  Debug.Assert(maxLen > minLen);
  _packetMinLen = minLen;
  _packetMaxLen = maxLen;
 }

//获取包的总长度
 public delegate int delegate_GetPacketTotalLen(byte[] data, int offset);
 public delegate_GetPacketTotalLen GetPacketTotalLen_Callback;

ObjectPoolWithEvent<SocketEventParam> _socketEventPool = new ObjectPoolWithEvent<SocketEventParam>();
 private void NetPacketProcess()
 {
  while (_serverStart)
  {
   try
   {
    DealEventPool();
   }
   catch (Exception ex)
   {
    NetLogger.Log(string.Format("DealEventPool 异常 {0}***{1}", ex.Message, ex.StackTrace));
   }
   _socketEventPool.WaitOne(1000);
  }
 }

Dictionary<Socket, AsyncSocketClient> _clientGroup = new Dictionary<Socket, AsyncSocketClient>();
 public int ClientCount
 {
  get
  {
   lock (_clientGroup)
   {
    return _clientGroup.Count;
   }
  }
 }
 public List<Socket> ClientList
 {
  get
  {
   lock (_clientGroup)
   {
    return _clientGroup.Keys.ToList();
   }
  }
 }

private void DealEventPool()
 {
  while (true)
  {
   SocketEventParam param = _socketEventPool.GetObj();
   if (param == null)
    return;

if (param.SocketEvent == EN_SocketEvent.close)
   {
    lock (_clientGroup)
    {
     _clientGroup.Remove(param.Socket);
    }
   }

if (_packetMinLen == 0)//字节流处理
   {
    OnSocketPacketEvent?.Invoke(param);
   }
   else
   {
    //组成一个完整的包 逻辑
    _clientPacketManage.PutSocketParam(param);
   }
  }
 }

private void SocketConnectEvent(SocketEventParam param, AsyncSocketClient client)
 {
  try
  {
   if (param.Socket == null || client == null) //连接失败
   {

}
   else
   {
    lock (_clientGroup)
    {
     bool remove = _clientGroup.Remove(client.ConnectSocket);
     Debug.Assert(!remove);
     _clientGroup.Add(client.ConnectSocket, client);
    }

client.OnSocketClose += Client_OnSocketClose;
    client.OnReadData += Client_OnReadData;
    client.OnSendData += Client_OnSendData;

_listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));
   }
   _socketEventPool.PutObj(param);
  }
  catch (Exception ex)
  {
   NetLogger.Log(string.Format("SocketConnectEvent 异常 {0}***{1}", ex.Message, ex.StackTrace));
  }
 }

internal void OnRcvPacketLenError(Socket socket, byte[] buffer, int offset, int packetLen)
 {
  try
  {
   lock (_clientGroup)
   {
    if (!_clientGroup.ContainsKey(socket))
    {
     Debug.Assert(false);
     return;
    }

NetLogger.Log(string.Format("报长度异常!包长:{0}", packetLen));
    AsyncSocketClient client = _clientGroup[socket];
    client.CloseSocket();
   }
  }
  catch (Exception ex)
  {
   NetLogger.Log(string.Format("OnRcvPacketLenError 异常 {0}***{1}", ex.Message, ex.StackTrace));
  }
 }

#region listen port
 private void Listener_OnAcceptSocket(ListenParam listenPatam, AsyncSocketClient client)
 {
  try
  {
   lock (_clientGroup)
   {
    bool remove = _clientGroup.Remove(client.ConnectSocket);
    Debug.Assert(!remove);
    _clientGroup.Add(client.ConnectSocket, client);
   }

client.OnSocketClose += Client_OnSocketClose;
   client.OnReadData += Client_OnReadData;
   client.OnSendData += Client_OnSendData;

_listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));

SocketEventParam param = new SocketEventParam(EN_SocketEvent.accept, client.ConnectSocket);
   param.ClientInfo = client.ClientInfo;

_socketEventPool.PutObj(param);
  }
  catch (Exception ex)
  {
   NetLogger.Log(string.Format("Listener_OnAcceptSocket 异常 {0}***{1}", ex.Message, ex.StackTrace));
  }
 }

ObjectPoolWithEvent<SocketEventDeal> _listSendEvent = new ObjectPoolWithEvent<SocketEventDeal>();
 private void NetSendProcess()
 {
  while (true)
  {
   DealSendEvent();
   _listSendEvent.WaitOne(1000);
  }
 }

ObjectPoolWithEvent<SocketEventDeal> _listReadEvent = new ObjectPoolWithEvent<SocketEventDeal>();
 private void NetReadProcess()
 {
  while (true)
  {
   DealReadEvent();
   _listReadEvent.WaitOne(1000);
  }
 }

private void DealSendEvent()
 {
  while (true)
  {
   SocketEventDeal item = _listSendEvent.GetObj();
   if (item == null)
    break;
   switch (item.SocketEvent)
   {
    case EN_SocketDealEvent.send:
     {
      while (true)
      {
       EN_SocketSendResult result = item.Client.SendNextData();
       if (result == EN_SocketSendResult.HaveSend)
        continue;
       else
        break;
      }
     }
     break;
    case EN_SocketDealEvent.read:
     {
      Debug.Assert(false);
     }
     break;    
   }
  }
 }

private void DealReadEvent()
 {
  while (true)
  {
   SocketEventDeal item = _listReadEvent.GetObj();
   if (item == null)
    break;
   switch (item.SocketEvent)
   {
    case EN_SocketDealEvent.read:
     {
      while (true)
      {
       EN_SocketReadResult result = item.Client.ReadNextData();
       if (result == EN_SocketReadResult.HaveRead)
        continue;
       else
        break;
      }
     }
     break;
    case EN_SocketDealEvent.send:
     {
      Debug.Assert(false);
     }
     break;
   }
  }
 }

private void Client_OnReadData(AsyncSocketClient client, byte[] readData)
 {
  //读下一条
  _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));

try
  {
   SocketEventParam param = new SocketEventParam(EN_SocketEvent.read, client.ConnectSocket);
   param.ClientInfo = client.ClientInfo;
   param.Data = readData;
   _socketEventPool.PutObj(param);

lock (this)
   {
    ReadByteCount += readData.Length;
   }
  }
  catch (Exception ex)
  {
   NetLogger.Log(string.Format("Client_OnReadData 异常 {0}***{1}", ex.Message, ex.StackTrace));
  }
 }
#endregion

private void Client_OnSendData(AsyncSocketClient client, int sendCount)
 {
  //发送下一条
  _listSendEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.send));
  lock (this)
  {
   SendByteCount += sendCount;
  }
 }

private void Client_OnSocketClose(AsyncSocketClient client)
 {
  try
  {
   SocketEventParam param = new SocketEventParam(EN_SocketEvent.close, client.ConnectSocket);
   param.ClientInfo = client.ClientInfo;
   _socketEventPool.PutObj(param);
  }
  catch (Exception ex)
  {
   NetLogger.Log(string.Format("Client_OnSocketClose 异常 {0}***{1}", ex.Message, ex.StackTrace));
  }
 }

/// <summary>
 /// 放到发送缓冲
 /// </summary>
 /// <param name="socket"></param>
 /// <param name="data"></param>
 /// <returns></returns>
 public EN_SendDataResult SendData(Socket socket, byte[] data)
 {
  if (socket == null)
   return EN_SendDataResult.no_client;
  lock (_clientGroup)
  {
   if (!_clientGroup.ContainsKey(socket))
    return EN_SendDataResult.no_client;
   AsyncSocketClient client = _clientGroup[socket];
   EN_SendDataResult result = client.PutSendData(data);
   if (result == EN_SendDataResult.ok)
   {
    //发送下一条
    _listSendEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.send));    
   }
   return result;
  }
 }

/// <summary>
 /// 设置某个连接的发送缓冲大小
 /// </summary>
 /// <param name="socket"></param>
 /// <param name="byteCount"></param>
 /// <returns></returns>
 public bool SetClientSendBuffer(Socket socket, int byteCount)
 {
  lock (_clientGroup)
  {
   if (!_clientGroup.ContainsKey(socket))
    return false;
   AsyncSocketClient client = _clientGroup[socket];
   client.SendBufferByteCount = byteCount;
   return true;
  }
 }

#region connect process
 NetConnectManage _netConnectManage = new NetConnectManage();
 /// <summary>
 /// 异步连接一个客户端
 /// </summary>
 /// <param name="peerIp"></param>
 /// <param name="peerPort"></param>
 /// <param name="tag"></param>
 /// <returns></returns>
 public bool ConnectAsyn(string peerIp, int peerPort, object tag)
 {
  return _netConnectManage.ConnectAsyn(peerIp, peerPort, tag);
 }

/// <summary>
 /// 同步连接一个客户端
 /// </summary>
 /// <param name="peerIp"></param>
 /// <param name="peerPort"></param>
 /// <param name="tag"></param>
 /// <param name="socket"></param>
 /// <returns></returns>
 public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)
 {
  return _netConnectManage.Connect(peerIp, peerPort, tag, out socket);
 }
 #endregion
}

enum EN_SocketDealEvent
{
 read,
 send,
}
class SocketEventDeal
{
 public AsyncSocketClient Client { get; set; }
 public EN_SocketDealEvent SocketEvent { get; set; }
 public SocketEventDeal(AsyncSocketClient client, EN_SocketDealEvent socketEvent)
 {
  Client = client;
  SocketEvent = socketEvent;
 }
}
}

库的使用

使用起来非常简单,示例如下


using IocpCore;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
using System.Windows;

namespace WarningClient
{
public class SocketServer
{
 public Action<SocketEventParam> OnSocketEvent;

public Int64 SendByteCount
 {
  get
  {
   if (_netServer == null)
    return 0;
   return _netServer.SendByteCount;
  }
 }
 public Int64 ReadByteCount
 {
  get
  {
   if (_netServer == null)
    return 0;
   return _netServer.ReadByteCount;
  }
 }

NetServer _netServer;
 EN_PacketType _packetType = EN_PacketType.byteStream;
 public void SetPacktType(EN_PacketType packetType)
 {
  _packetType = packetType;
  if (_netServer == null)
   return;
  if (packetType == EN_PacketType.byteStream)
  {
   _netServer.SetPacketParam(0, 1024);
  }
  else
  {
   _netServer.SetPacketParam(9, 1024);
  }
 }

public bool Init(List<int> listenPort)
 {
  NetLogger.OnLogEvent += NetLogger_OnLogEvent;
  _netServer = new NetServer();
  SetPacktType(_packetType);
  _netServer.GetPacketTotalLen_Callback += GetPacketTotalLen;
  _netServer.OnSocketPacketEvent += SocketPacketDeal;

foreach (int n in listenPort)
  {
   _netServer.AddListenPort(n, n);
  }

List<int> listenFault;
  bool start = _netServer.StartListen(out listenFault);
  return start;
 }

int GetPacketTotalLen(byte[] data, int offset)
 {
  if (MainWindow._packetType == EN_PacketType.znss)
   return GetPacketZnss(data, offset);
  else
   return GetPacketAnzhiyuan(data, offset);
 }

int GetPacketAnzhiyuan(byte[] data, int offset)
 {
  int n = data[offset + 5] + 6;
  return n;
 }

int GetPacketZnss(byte[] data, int offset)
 {
  int packetLen = (int)(data[4]) + 5;
  return packetLen;
 }

public bool ConnectAsyn(string peerIp, int peerPort, object tag)
 {
  return _netServer.ConnectAsyn(peerIp, peerPort, tag);
 }

public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)
 {
  return _netServer.Connect(peerIp, peerPort, tag, out socket);
 }

private void NetLogger_OnLogEvent(string message)
 {
  AppLog.Log(message);
 }

Dictionary<Socket, SocketEventParam> _clientGroup = new Dictionary<Socket, SocketEventParam>();

public int ClientCount
 {
  get
  {
   lock (_clientGroup)
   {
    return _clientGroup.Count;
   }
  }
 }
 public List<Socket> ClientList
 {
  get
  {
   if (_netServer != null)
    return _netServer.ClientList;
   return new List<Socket>();
  }
 }
 void AddClient(SocketEventParam socketParam)
 {
  lock (_clientGroup)
  {
   _clientGroup.Remove(socketParam.Socket);
   _clientGroup.Add(socketParam.Socket, socketParam);
  }
 }

void RemoveClient(SocketEventParam socketParam)
 {
  lock (_clientGroup)
  {
   _clientGroup.Remove(socketParam.Socket);
  }
 }

ObjectPool<SocketEventParam> _readDataPool = new ObjectPool<SocketEventParam>();

public ObjectPool<SocketEventParam> ReadDataPool
 {
  get
  {
   return _readDataPool;
  }
 }

private void SocketPacketDeal(SocketEventParam socketParam)
 {
  OnSocketEvent?.Invoke(socketParam);
  if (socketParam.SocketEvent == EN_SocketEvent.read)
  {
   if (MainWindow._isShowReadPacket)
    _readDataPool.PutObj(socketParam);
  }
  else if (socketParam.SocketEvent == EN_SocketEvent.accept)
  {
   AddClient(socketParam);
   string peerIp = socketParam.ClientInfo.PeerIpPort;
   AppLog.Log(string.Format("客户端链接!本地端口:{0},对端:{1}",
    socketParam.ClientInfo.LocalPort, peerIp));
  }
  else if (socketParam.SocketEvent == EN_SocketEvent.connect)
  {
   string peerIp = socketParam.ClientInfo.PeerIpPort;
   if (socketParam.Socket != null)
   {
    AddClient(socketParam);

AppLog.Log(string.Format("连接对端成功!本地端口:{0},对端:{1}",
     socketParam.ClientInfo.LocalPort, peerIp));
   }
   else
   {
    AppLog.Log(string.Format("连接对端失败!本地端口:{0},对端:{1}",
     socketParam.ClientInfo.LocalPort, peerIp));
   }
  }
  else if (socketParam.SocketEvent == EN_SocketEvent.close)
  {
   MainWindow.MainWnd.OnSocketDisconnect(socketParam.Socket);
   RemoveClient(socketParam);
   string peerIp = socketParam.ClientInfo.PeerIpPort;
   AppLog.Log(string.Format("客户端断开!本地端口:{0},对端:{1},",
    socketParam.ClientInfo.LocalPort, peerIp));
  }
 }

public EN_SendDataResult SendData(Socket socket, byte[] data)
 {
  if(socket == null)
  {
   MessageBox.Show("还没连接!");
   return EN_SendDataResult.no_client;
  }
  return _netServer.SendData(socket, data);
 }

internal void SendToAll(byte[] data)
 {
  lock (_clientGroup)
  {
   foreach (Socket socket in _clientGroup.Keys)
   {
    SendData(socket, data);
   }
  }
 }
}
}

来源:http://www.cnblogs.com/yuanchenhui/archive/2017/11/28/asyn_scoket.html

标签:C#,socket,封装库,高性能,异步
0
投稿

猜你喜欢

  • Android选项菜单用法实例分析

    2022-11-02 07:42:44
  • 使用Spring boot + jQuery上传文件(kotlin)功能实例详解

    2022-09-03 14:12:21
  • C语言对CSV文件从最后往前一行一行读取的实现方法

    2023-06-24 08:05:57
  • Kotlin协程Dispatchers原理示例详解

    2022-09-26 00:09:45
  • Java如何实现Word文档分栏效果

    2023-09-11 00:28:53
  • 深入理解Android Bitmap

    2023-07-29 08:52:14
  • C#实现一个控制台的点餐系统

    2023-09-03 20:47:24
  • java实现udp通讯的代码

    2022-07-18 13:28:52
  • Entity Framework使用ObjectContext类

    2023-01-25 16:40:04
  • PowerShell 定时执行.Net(C#)程序的方法

    2023-07-09 14:10:39
  • Java中JSON处理工具类使用详解

    2023-09-19 17:59:08
  • Flutter音乐播放插件audioplayers使用步骤详解

    2021-10-12 14:46:05
  • Java调用windows系统的CMD命令并启动新程序

    2021-11-27 17:09:13
  • C#之CLR内存原理初探

    2023-02-16 00:58:09
  • Android学习笔记之应用单元测试实例分析

    2021-10-17 20:25:21
  • 详解如何使用Java编写图形化的窗口

    2022-04-18 03:25:52
  • C#调用带结构体指针Dll的方法

    2022-06-14 19:55:32
  • 替换so文件来动态替换Flutter代码实现详解

    2023-06-23 16:24:06
  • C#微信开发之获取接口调用凭据

    2023-10-28 07:57:54
  • 解析C#多线程编程中异步多线程的实现及线程池的使用

    2021-10-23 02:36:23
  • asp之家 软件编程 m.aspxhome.com