“Mongodb 源码阅读笔记”的版本间的差异

来自Dennis的知识库
跳转到: 导航搜索
网络层
网络层
第69行: 第69行:
  
 
     while (true) {
 
     while (true) {
     ...
+
     .........
 +
        // The handleRequest is implemented in a subclass for mongod/mongos and actually all the
 +
        // database work for this request.
 +
        DbResponse dbresponse = this->handleRequest(opCtx.get(), inMessage, session->remote());
 +
    ......
 
   }
 
   }
 
}
 
}
 
</pre>
 
</pre>
 +
 +
实际是转交给 ServiceEntryPointImpl 的子类 mongod 和 mongos 的 handleRequest 处理请求。
 +
 +
* transport_layer_legacy.cpp 就是调用 util/net 下面的类和方法,实现一个典型的 TCP 服务器了,其中 handleNewConnection 处理新建连接,构造函数里开始 listen:
 +
 +
<pre>
 +
TransportLayerLegacy::TransportLayerLegacy(const TransportLayerLegacy::Options& opts,
 +
                                          ServiceEntryPoint* sep)
 +
    : _sep(sep),
 +
      _listener(stdx::make_unique<ListenerLegacy>(
 +
          opts,
 +
          stdx::bind(&TransportLayerLegacy::_handleNewConnection, this, stdx::placeholders::_1))),
 +
      _running(false),
 +
      _options(opts) {}
 +
 +
void TransportLayerLegacy::_handleNewConnection(std::unique_ptr<AbstractMessagingPort> amp) {
 +
    if (!Listener::globalTicketHolder.tryAcquire()) {
 +
        log() << "connection refused because too many open connections: "
 +
              << Listener::globalTicketHolder.used();
 +
        amp->shutdown();
 +
        return;
 +
    }
 +
 +
    amp->setLogLevel(logger::LogSeverity::Debug(1));
 +
    auto session = LegacySession::create(std::move(amp), this);
 +
 +
    stdx::list<std::weak_ptr<LegacySession>> list;
 +
    auto it = list.emplace(list.begin(), session);
 +
 +
    {
 +
        // Add the new session to our list
 +
        stdx::lock_guard<stdx::mutex> lk(_sessionsMutex);
 +
        session->setIter(it);
 +
        _sessions.splice(_sessions.begin(), list, it);
 +
    }
 +
 +
    invariant(_sep);
 +
    _sep->startSession(std::move(session));
 +
}
 +
 +
</pre>
 +
 +
调用了 _sep->startSession(std::move(session));,其中 _sep 就是 ServiceEntryPoint 指针,在头文件 class 定义了。
 +
 +
* 网络层来看, accept 采用了 select 调用,读写请求还是同步阻塞的过程。

2017年5月24日 (三) 07:32的版本

网络层

  • 具体实现在 src/mongo/util/net 目录下:
├── abstract_message_port.h
├── asio_message_port.cpp
├── asio_message_port.h
├── asio_ssl_context.cpp
├── asio_ssl_context.h
├── hostandport.cpp
├── hostandport.h
├── hostandport_test.cpp
├── hostname_canonicalization.cpp
├── hostname_canonicalization.h
├── listen.cpp
├── listen.h
├── message.cpp
├── message.h
├── message_port.cpp
├── message_port.h
├── message_port_mock.cpp
├── message_port_mock.h
├── message_port_startup_param.cpp
├── message_port_startup_param.h
├── op_msg.cpp
├── op_msg.h
├── sock.cpp
├── sock.h
├── sock_test.cpp
├── sockaddr.cpp
├── sockaddr.h
├── socket_exception.cpp
├── socket_exception.h
├── socket_poll.cpp
├── socket_poll.h
├── ssl_expiration.cpp
├── ssl_expiration.h
├── ssl_manager.cpp
├── ssl_manager.h
├── ssl_options.cpp
├── ssl_options.h
├── ssl_types.h
├── thread_idle_callback.cpp
└── thread_idle_callback.h
  • 针对上层提供的服务接口定义在 src/mongo/transport 下,核心就是 transport_layer , transport_layer_legacy 和 service_entry_point,具体的关系是 TransportLayer 持有一个 acceptor ,当连接进来,包装成 session,然后调用ServiceEntryPoint.startSession 方法,进入一个读取请求-处理请求-应答请求的循环。 这里可能为了兼容老的代码,transport_layer_legacy 实现了 TransportLayer,并兼容老的代码。 ServiceEntryPoint 的子类 ServiceEntryPointImpl 里实现了 startSession 和 _sessionLoop 框架,每个连接启动一个线程处理:
//service_entry_point_impl.cpp
void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
    // Pass ownership of the transport::SessionHandle into our worker thread. When this
    // thread exits, the session will end.
    launchWrappedServiceEntryWorkerThread(
        std::move(session), [this](const transport::SessionHandle& session) {
            _nWorkers.fetchAndAdd(1);
            auto guard = MakeGuard([&] { _nWorkers.fetchAndSubtract(1); });

            _sessionLoop(session);
        });
}

void ServiceEntryPointImpl::_sessionLoop(const transport::SessionHandle& session) {
    Message inMessage;
    bool inExhaust = false;
    int64_t counter = 0;

    while (true) {
    .........
        // The handleRequest is implemented in a subclass for mongod/mongos and actually all the
        // database work for this request.
        DbResponse dbresponse = this->handleRequest(opCtx.get(), inMessage, session->remote());
     ......
   }
}

实际是转交给 ServiceEntryPointImpl 的子类 mongod 和 mongos 的 handleRequest 处理请求。

  • transport_layer_legacy.cpp 就是调用 util/net 下面的类和方法,实现一个典型的 TCP 服务器了,其中 handleNewConnection 处理新建连接,构造函数里开始 listen:
TransportLayerLegacy::TransportLayerLegacy(const TransportLayerLegacy::Options& opts,
                                           ServiceEntryPoint* sep)
    : _sep(sep),
      _listener(stdx::make_unique<ListenerLegacy>(
          opts,
          stdx::bind(&TransportLayerLegacy::_handleNewConnection, this, stdx::placeholders::_1))),
      _running(false),
      _options(opts) {}

void TransportLayerLegacy::_handleNewConnection(std::unique_ptr<AbstractMessagingPort> amp) {
    if (!Listener::globalTicketHolder.tryAcquire()) {
        log() << "connection refused because too many open connections: "
              << Listener::globalTicketHolder.used();
        amp->shutdown();
        return;
    }

    amp->setLogLevel(logger::LogSeverity::Debug(1));
    auto session = LegacySession::create(std::move(amp), this);

    stdx::list<std::weak_ptr<LegacySession>> list;
    auto it = list.emplace(list.begin(), session);

    {
        // Add the new session to our list
        stdx::lock_guard<stdx::mutex> lk(_sessionsMutex);
        session->setIter(it);
        _sessions.splice(_sessions.begin(), list, it);
    }

    invariant(_sep);
    _sep->startSession(std::move(session));
}

调用了 _sep->startSession(std::move(session));,其中 _sep 就是 ServiceEntryPoint 指针,在头文件 class 定义了。

  • 网络层来看, accept 采用了 select 调用,读写请求还是同步阻塞的过程。
个人工具
名字空间

变换
操作
导航
工具箱