欢迎光临
我们一直在努力

Mongodb源码分析--主程序入口main()

 

      VS2010 C++下编译调试mongodb源码

     http://www.cnblogs.com/daizhj/archive/2011/03/07/1973764.html

      好了,开始今天的正文吧。

    为了理解mongodb整体的运行机制,首先我们需要对其主要运行流程有一个大概的理解,而主入口函数main无疑是最佳突破口。首先我们在VS2010中打开db.sln文件,并打开db.cpp文件,找到主入口函数(位于文件613行),如下:

    该方法的开头代码(上面)主要是绑定一个配置操作选项的说明,包括命令行模式下的参数说明,因为内容较长,这里就不做过多描述了,需要说明options_description的是这些内容被放到了boost库(一个C++开源库)的options_description对象中,其类型结构可以理解为key/value模式,主要用于记录一系列的选项描述(符)信息,以便于通过名称查询相应选项信息。同时mongodb将选项大致归为8类,如上所述。

    接下说看一下其初始化时命令行参数的操作,如下:

  

     上面方法对main主函数参数argc,argv及上面的那些选项实例进行存储并以此绑定到params实例上,因为接下来会通过params来设置cmdLine对象(CmdLine类型),并最终以该对象做为最终在mongodb内部标记相应启动命令参数信息的对象。形如:

 

    当搜集到足够的启动信息(参数)后,mongodb开启执行下面两行代码:

 

    上面用params来配置加载的模块信息,而就目前而言,mongodb中的模块有两个:其类模式和MMS模块,后者是当mongodb监视服务有效情况下,以后台线程方式(BackgroundJob)运行的程序,类定义如下:

      因为相关代码比较简单,这里就不多作说明了,如果大家感兴趣的话,以后会专门写一篇介绍Module,BackgroundJob的文章 。

     回到正文,模块实始化完成了,就会运行如下代码:

    这里要说明的是dataFileSync类也派生自BackgroundJob类,而BackgroundJob的功能就是生成一个后台线程并执行相应任务。而当前dataFileSync的任务就是在一段时间后(cmdLine.syncdelay)将内存中的数据flash到磁盘上(因为mongodb使用mmap方式将数据先放入内存中),代码如下:

     main主函数完成上面方法后,就会启动侦听方法,开始侦听客户端的链接请求,如下:

     该侦听方法会最终调用db.cpp (467行)的如下方法,我们来看一下该方法做了些什么:

     首先是初始化一个名称“initandlisten”线程用于侦听客户端传来的操作信息(可能有误):

 

     接着判断当前系统是32或64位系统?并获取当前进程ID并输出进程ID及数据库路径,端口信息以及当前mongodb及系统信息(这些信息也就是我们在命令行下经常看到的启动mongodb信息)

 

    完成这一步之后,接下来mongodb就会对相应路径下的数据文件进行检查,如出现文件错误(文件不存在等):

        

   同时使用"路径锁"方式来移除指定路径下的临时文件夹信息,如下:

   接着,mongodb还会启动持久化功能,该功能貌似是1.7版本后引入到系统中的,主要用于解决因系统宕机时,内存中的数据未写入磁盘而造成的数据丢失。其机制主要是通过log方式定时将操作日志(如cud操作等)记录到db的journal文件夹下,这样当系统再次重启时从该文件夹下恢复丢失的(内存)数据。有关这部分内容我会专门写文章加以介绍。

     注:其命令行枚举定义如下

   完成这一步之后,系统还会初始化脚本引擎,因为mongodb支持脚本语法做为其操作数据库的语言,如下:

   当这些主要工作做完之后,最后系统会调用下面方法正式启动侦听操作:

 

    注意上面的OurListener类其initAndListen()方法位于message.cpp中,因为mongodb采用message相关类来封装c/s双在的数据和操作:

void Listener::initAndListen() {
     checkTicketNumbers();
        vector
<sockAddr> mine = ipToAddrs(_ip.c_str(), _port);
        vector
<int> socks;
        SOCKET maxfd 
= 0// needed for select()

        
for (vector<SockAddr>::iterator it=mine.begin(), end=mine.end(); it != end; ++it) {
            SockAddr
& me = *it;

            SOCKET sock = ::socket(me.getType(), SOCK_STREAM, 0);
            
if ( sock == INVALID_SOCKET ) {
                log() 
<< "ERROR: listen(): invalid socket? " << errnoWithDescription() << endl;
            }

            if (me.getType() == AF_UNIX) {
#if !defined(_WIN32)
                
if (unlink(me.getAddr().c_str()) == -1) {
                    
int x = errno;
                    
if (x != ENOENT) {
                        log() 
<< "couldn't unlink socket file " << me << errnoWithDescription(x) << " skipping" << endl;
                        
continue;
                    }
                }
#endif
            }
            
else if (me.getType() == AF_INET6) {
                
// IPv6 can also accept IPv4 connections as mapped addresses (::ffff:127.0.0.1)
                
// That causes a conflict if we don't do set it to IPV6_ONLY
                const int one = 1;
                setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (
const char*&one, sizeof(one));
            }

            prebindOptions( sock );

            if ( ::bind(sock, me.raw(), me.addressSize) != 0 ) {
                
int x = errno;
                log() 
<< "listen(): bind() failed " << errnoWithDescription(x) << " for socket: " << me.toString() << endl;
                
if ( x == EADDRINUSE )
                    log() 
<< "  addr already in use" << endl;
                closesocket(sock);
                
return;
            }

#if !defined(_WIN32)
            
if (me.getType() == AF_UNIX) {
                
if (chmod(me.getAddr().c_str(), 0777== -1) {
                    log() 
<< "couldn't chmod socket file " << me << errnoWithDescription() << endl;
                }

                ListeningSockets::get()->addPath( me.getAddr() );
            }
#endif

            if ( ::listen(sock, 128!= 0 ) {
                log() 
<< "listen(): listen() failed " << errnoWithDescription() << endl;
                closesocket(sock);
                
return;
            }

            ListeningSockets::get()->add( sock );

            socks.push_back(sock);
            if (sock > maxfd)
                maxfd 
= sock;
        }

        static long connNumber = 0;
        
struct timeval maxSelectTime;
        
while ( ! inShutdown() ) {
            fd_set fds[
1];
            FD_ZERO(fds);

            for (vector<int>::iterator it=socks.begin(), end=socks.end(); it != end; ++it) {
                FD_SET(
*it, fds);
            }

            maxSelectTime.tv_sec = 0;
            maxSelectTime.tv_usec 
= 10000;
            
const int ret = select(maxfd+1, fds, NULL, NULL, &maxSelectTime);

            if (ret == 0) {
#if defined(__linux__)
                _elapsedTime 
+= ( 10000 - maxSelectTime.tv_usec ) / 1000;
#else
                _elapsedTime 
+= 10;
#endif
                
continue;
            }
            _elapsedTime 
+= ret; // assume 1ms to grab connection. very rough

            
if (ret < 0) {
                
int x = errno;
#ifdef EINTR
                
if ( x == EINTR ) {
                    log() 
<< "select() signal caught, continuing" << endl;
                    
continue;
                }
#endif
                
if ( ! inShutdown() )
                    log() 
<< "select() failure: ret=" << ret << " " << errnoWithDescription(x) << endl;
                
return;
            }

            for (vector<int>::iterator it=socks.begin(), end=socks.end(); it != end; ++it) {
                
if (! (FD_ISSET(*it, fds)))
                    
continue;

                SockAddr from;
                int s = accept(*it, from.raw(), &from.addressSize);
                
if ( s < 0 ) {
                    
int x = errno; // so no global issues
                    if ( x == ECONNABORTED || x == EBADF ) {
                        log() 
<< "Listener on port " << _port << " aborted" << endl;
                        
return;
                    }
                    
if ( x == 0 && inShutdown() ) {
                        
return;   // socket closed
                    }
                    
if!inShutdown() )
                        log() 
<< "Listener: accept() returns " << s << " " << errnoWithDescription(x) << endl;
                    
continue;
                }
                
if (from.getType() != AF_UNIX)
                    disableNagle(s);
                
if ( _logConnect && ! cmdLine.quiet )
                    log() 
<< "connection accepted from " << from.toString() << " #" << ++connNumber << endl;
                accepted(s, from);
            }
        }
    }

    上面方法基本上就是一个无限循环( while ( ! inShutdown() ) )的侦听服务端,它调用操作系统的底层socket api接口,并将侦听到的结果使用accepted()方法进行接收。这里要注意的是因为最终我们使用的是OurListener进行的侦听,所以最终系统会调用OurListener所实现的虚(virtual)方法,如下:

    上面方法中的try{}语句中包含的是boost库中的thread方法,其主要提供了跨操作系统的线程创建方式及相关并行操作(相关信息参数boost官方网站),我们这里只要知道,通过该语句,我们最终用一个线程来运行connThread方法及其所需参数mp即可。下面看一下connThread方法的代码:

 

    上面代码主要工作就是不断循环[while ( 1 )]获取当前客户端发来的信息(上面已封装成了message)并将其信息进行分析,并根据相应操作标志位确定当前操作是CRUD或构建索引等[

()],如果一些正常,则向客户端发送应答信息:

  

    运行到这里,main函数的使命就完成了,本来想用一张时序图来大致回顾一下,只有等有时间再补充了。

    好了,今天的内容到这里就告一段落了,在接下来的文章中,将会介绍客户端发起查询操作时,Mongodb的执行流程和运行机制。

    原文链接:

    作者: daizhj, 代震军   

    微博:http://t.sina.com.cn/daizhj

   

Tags: mongodb,c++,source code

Tags: mongodb,c++,source code

  • 海报
海报图正在生成中...
赞(0) 打赏
声明:
1、本博客不从事任何主机及服务器租赁业务,不参与任何交易,也绝非中介。博客内容仅记录博主个人感兴趣的服务器测评结果及一些服务器相关的优惠活动,信息均摘自网络或来自服务商主动提供;所以对本博客提及的内容不作直接、间接、法定、约定的保证,博客内容也不具备任何参考价值及引导作用,访问者需自行甄别。
2、访问本博客请务必遵守有关互联网的相关法律、规定与规则;不能利用本博客所提及的内容从事任何违法、违规操作;否则造成的一切后果由访问者自行承担。
3、未成年人及不能独立承担法律责任的个人及群体请勿访问本博客。
4、一旦您访问本博客,即表示您已经知晓并接受了以上声明通告。
文章名称:《Mongodb源码分析--主程序入口main()》
文章链接:https://www.456zj.com/20691.html
本站资源仅供个人学习交流,请于下载后24小时内删除,不允许用于商业用途,否则法律问题自行承担。

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址