欢迎光临
我们一直在努力

MaNGOS-Zero源码学习之mangosd游戏逻辑服务器(二):socket的处理方式

    本篇将介绍客户端与游戏逻辑服务器连接建立以后,mangosd如何接收、解析和处理客户端发过来的协议。本篇不再讨论mangosd与客户端的认证及建立最终RC4流加密的过程,想了解这部分内容请看该系列的第一篇

 

 

一、acceptor socket的监听启动及注册

    mangosd的main ()函数调用单例对象sMaster的Run ()函数,启动监听socket的代码如下:

 

 1: int Master::Run()
 2: {
 3: ........
 4:  
 5: ///- Launch the world listener socket
 6: uint16 wsport = sWorld.getConfig (CONFIG_UINT32_PORT_WORLD);
 7: std::string bind_ip = sConfig.GetStringDefault ("BindIP", "0.0.0.0");
 8:  
 9: if (sWorldSocketMgr->StartNetwork (wsport, bind_ip) == -1)
 10: {
 11: sLog.outError ("Failed to start network");
 12: Log::WaitBeforeContinueIfNeed();
 13: World::StopNow(ERROR_EXIT_CODE);
 14: // go down and shutdown the server
 15: }
 16:  
 17: sWorldSocketMgr->Wait ();
 18:  
 19: ........
 20: }

   

    mangosd用WorldSocketMgr类来管理socket。StartNetwork ()会调用StartReactiveIO ()来启动监听socket,处理代码如下:

 1: int WorldSocketMgr::StartReactiveIO (ace_UINT16 port, const char* address)
 2: {
 3: ........
 4:  
 5: //(1)
 6: m_NetThreadsCount = static_cast<size_t> (num_threads + 1);
 7: m_NetThreads = new reactorRunnable[m_NetThreadsCount];
 8:  
 9: // -1 means use default
 10: m_SockOutKBuff = sConfig.GetIntDefault ("Network.OutKBuff", -1);
 11: m_SockOutUBuff = sConfig.GetIntDefault ("Network.OutUBuff", 65536);
 12: if ( m_SockOutUBuff <= 0 )
 13: {
 14: sLog.outError ("Network.OutUBuff is wrong in your config file");
 15: return -1;
 16: }
 17:  
 18: //(2)
 19: WorldSocket::Acceptor *acc = new WorldSocket::Acceptor;
 20: m_Acceptor = acc;
 21:  
 22: ACE_INET_Addr listen_addr (port, address);
 23: if (acc->open (listen_addr, m_NetThreads[0].GetReactor (), ACE_NONBLOCK) == -1)
 24: {
 25: sLog.outError ("Failed to open acceptor ,check if the port is free");
 26: return -1;
 27: }
 28:  
 29: //(3)
 30: for (size_t i = 0; i < m_NetThreadsCount; ++i)
 31: m_NetThreads[i].Start ();
 32:  
 33: return 0;
 34: }

(1)ReactorRunnable类继承了ACE_Task_Base,ACE_Task_Base 是ACE 中的任务或主动对象“处理结构”的基类。在ACE 中使用了此类来实现主动对象模式。所有希望成为“主动对象”的对象都必须从此类派生。可以把ACE_TASK 看作是更高级的、更为面向对象的线程类[1]。ACE_Task_Base调用时继承类必须重写svc方法,并且在使用时保证调用了activate ()方法。

(2)指定监听地址,端口并把Acceptor绑定到第一个线程的Reactor上。启动Acceptor开始监听网络IO。

(3)启动所有线程,每个线程上有一个单独的ACE_Reactor* m_Reactor;,这里的Reactor使用的是多线程的ACE_TP_Reactor。可以各自单独完成事件的多路复用。

 

 

二、线程体函数

    线程体函数ReactorRunnable::svc () 如下:

 1: virtual int svc ()
 2: {
 3: //(1)
 4: WorldDatabase.ThreadStart ();
 5: 
 6: SocketSet::iterator i, t;
 7: while (!m_Reactor->reactor_event_loop_done ())
 8: {
 9: // dont be too smart to move this outside the loop
 10: // the run_reactor_event_loop will modify interval
 11: ACE_Time_Value interval (0, 10000);
 12:  
 13: //(2)
 14: if (m_Reactor->run_reactor_event_loop (interval) == -1)
 15: break;
 16:  
 17: //(3)
 18: AddNewSockets ();
 19: 
 20: for (i = m_Sockets.begin (); i != m_Sockets.end ();)
 21: {
 22: //(4)
 23: if ((*i)->Update () == -1)
 24: {
 25: t = i;
 26: ++i;
 27: (*t)->CloseSocket ();
 28: (*t)->RemoveReference ();
 29: --m_Connections;
 30: m_Sockets.erase (t);
 31: }
 32: else
 33: ++i;
 34: }
 35: }
 36:  
 37: WorldDatabase.ThreadEnd ();
 38: DEBUG_LOG ("Network Thread Exitting");
 39:  
 40: return 0;
 41: }

(1)会调用mysql_thread_init ()函数,初始化与该线程相关的变量。

(2)run_reactor_event_loop ()函数为多路复用的等待函数,当注册的事件发生、运行超时或者出现错误时返回。

(3)AddNewSockets ()函数会将缓存在m_NewSockets里的新到达的socket添加到SocketSet m_Sockets;里,同时检查并处理m_Sockets;里已经closed的socket。

(4)循环每一个WorldSocket,调用其Update ()方法,这里只处理每个socket的handle_output,即每个在此线程上的写事件,向客户端发送数据。下一节详细介绍:

 

 

三、WorldSocket::Update () 方法

    Update方法用于处理每个socket的输出:

 1: int WorldSocket::Update (void)
 2: {
 3: if (closing_)
 4: return -1;
 5:  
 6: //(1)
 7: if (m_OutActive || m_OutBuffer->length () == 0)
 8: return 0;
 9:  
 10: return handle_output (get_handle ());
 11: }

(1)m_OutBuffer有数据时才会调用handle_output,handle_output ()用于处理输出,如果输出不能一次性做完,会调用schedule_wakeup_output ()再次激活write事件。当输出处理完毕后则调用cancel_wakeup_output ()取消激活write事件,使reactor恢复到正常的loop ()循环中。详细过程如下:

 

 1: int WorldSocket::handle_output (ACE_HANDLE)
 2: {
 3: //(1)
 4: ACE_GUARD_RETURN (LockType, Guard, m_OutBufferLock, -1);
 5:  
 6: if (closing_)
 7: return -1;
 8:  
 9: const size_t send_len = m_OutBuffer->length ();
 10: if (send_len == 0)
 11: return cancel_wakeup_output (Guard);
 12:  
 13: #ifdef MSG_NOSIGNAL
 14: ssize_t n = peer ().send (m_OutBuffer->rd_ptr (), send_len, MSG_NOSIGNAL);
 15: #else
 16: ssize_t n = peer ().send (m_OutBuffer->rd_ptr (), send_len);
 17: #endif // MSG_NOSIGNAL
 18:  
 19: if (n == 0)
 20: return -1;
 21: else if (n == -1)
 22: {
 23: if (errno == EWOULDBLOCK || errno == EAGAIN) //----------(2)
 24: return schedule_wakeup_output (Guard);
 25:  
 26: return -1;
 27: }
 28: else if (n < (ssize_t)send_len) //now n > 0 //----------(3)
 29: {
 30: m_OutBuffer->rd_ptr (static_cast<size_t> (n));
 31:  
 32: // move the data to the base of the buffer
 33: m_OutBuffer->crunch ();
 34:  
 35: return schedule_wakeup_output (Guard);
 36: }
 37: else //now n == send_len //----------(4)
 38: {
 39: m_OutBuffer->reset ();
 40:  
 41: if (!iFlushPacketQueue ())
 42: return cancel_wakeup_output (Guard);
 43: else
 44: return schedule_wakeup_output (Guard);
 45: }
 46:  
 47: ACE_NOTREACHED (return 0);
 48: }

(1)对m_OutBuffer加锁。

(2)考虑信号打断的情况等,暂时不能写。

(3)只发送了部分数据则继续wakeup该线程对应的Reactor。

(4)检查m_OutBuffer数据发送完毕同时等待buffer(PacketQueueT m_PacketQueue;)里已经没有数据时,cancel wakeup让Reactor恢复正常。

 

 

四、socket到对应线程的指派

    上一节内容可以看到线程内如何处理socket,及新到达的socket。但从第一节中可知只有第一个线程注册为acceptor线程,那么新连接到达时,是如何被指派到对应的“接待”线程的呢?

    可以先看一下ACE_Acceptor的处理时序图:

MaNGOS-Zero源码学习之mangosd游戏逻辑服务器(二):socket的处理方式

图3.1 连接到达处理时序 [2]

   

    上图可以看出,当连接到达时,acceptor会调用对应的SVC_HANDLER的open ()函数,在mangosd里就是acceptor对应的int WorldSocket::open (void *a),如下:

 1: int WorldSocket::open (void *a)
 2: {
 3: ........
 4:  
 5: // Hook for the manager.
 6: if (sWorldSocketMgr->OnSocketOpen (this) == -1)
 7: return -1;
 8:  
 9: ........
 10: }

 

OnSocketOpen方法:

 1: int WorldSocketMgr::OnSocketOpen (WorldSocket* sock)
 2: {
 3: ........
 4:  
 5: // we skip the Acceptor Thread
 6: size_t min = 1;
 7:  
 8: MANGOS_ASSERT (m_NetThreadsCount >= 1);
 9:  
 10: //(1)
 11: for (size_t i = 1; i < m_NetThreadsCount; ++i)
 12: if (m_NetThreads[i].Connections () < m_NetThreads[min].Connections ())
 13: min = i;
 14:  
 15: return m_NetThreads[min].AddSocket (sock);
 16: }

(1)将WorldSocket均衡的分配给每个线程。AddSocket ()将socket添加到m_NewSockets中做缓存,待该线程自行调用AddNewSockets ()添加到处理队列里。

 

  

总结:

    mangosd对socket的处理因为使用了ACE,逻辑处理代码相对比较简单,写事件的异常处理主要涉及

(1)一次不能写完则不断的wakeup Reactor。

(2)信号中断等错误的判断。似乎这里并没有考虑全面(见附录)

(3)使用另一个buffer缓存,因写缓存m_OutBuffer满而带来的多出的数据。

 

References:

[1] http://blog.csdn.net/yecao_kinux/article/details/1546914

[2] http://postfiles12.naver.net/data41/2009/4/11/187/33_kbkim007.jpg?type=w3

 

 

附录:

不同平台不同版本的read/write、send/recv有些不同,挺郁闷的一件事……………

EWOULDBLOCK:基于 berkeley 实现,当客户端异常终止连接时

ECONNABORTED:基于 posix 实现,当客户端异常终止连接时

EPROTO:基于 SVR4 实现,当客户端异常终止连接时)以及 EINTR

  • 海报
海报图正在生成中...
赞(0) 打赏
声明:
1、本博客不从事任何主机及服务器租赁业务,不参与任何交易,也绝非中介。博客内容仅记录博主个人感兴趣的服务器测评结果及一些服务器相关的优惠活动,信息均摘自网络或来自服务商主动提供;所以对本博客提及的内容不作直接、间接、法定、约定的保证,博客内容也不具备任何参考价值及引导作用,访问者需自行甄别。
2、访问本博客请务必遵守有关互联网的相关法律、规定与规则;不能利用本博客所提及的内容从事任何违法、违规操作;否则造成的一切后果由访问者自行承担。
3、未成年人及不能独立承担法律责任的个人及群体请勿访问本博客。
4、一旦您访问本博客,即表示您已经知晓并接受了以上声明通告。
文章名称:《MaNGOS-Zero源码学习之mangosd游戏逻辑服务器(二):socket的处理方式》
文章链接:https://www.456zj.com/41046.html
本站资源仅供个人学习交流,请于下载后24小时内删除,不允许用于商业用途,否则法律问题自行承担。

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址