“Mongodb 源码阅读笔记”的版本间的差异
来自Dennis的知识库
Dennis zhuang(讨论 | 贡献) (→网络层) |
Dennis zhuang(讨论 | 贡献) (→网络层) |
||
第122行: | 第122行: | ||
* 网络层来看, accept 采用了 select 调用,读写请求还是同步阻塞的过程。 | * 网络层来看, accept 采用了 select 调用,读写请求还是同步阻塞的过程。 | ||
+ | |||
+ | == db 层 == | ||
+ | |||
+ | * db 这一层源码都在 src/mongo/db/ 下,其中 commands 对应各种命令, storage 是各种存储引擎,比如 MMAPv1, wiredtiger 等, query 就是专门用于查询的。 | ||
+ | * mongod 是 ServiceEntryPointImpl 的实现,在 src/mongo/db/service_entry_point_mongod.cpp ,实现 handleRequest 方法: | ||
+ | |||
+ | <pre> | ||
+ | DbResponse ServiceEntryPointMongod::handleRequest(OperationContext* opCtx, | ||
+ | const Message& request, | ||
+ | const HostAndPort& client) { | ||
+ | return assembleResponse(opCtx, request, client); | ||
+ | } | ||
+ | |||
+ | </pre> | ||
+ | |||
+ | 转交调用了 assembleResponse 方法,定义在 assemble_response.cpp 中: | ||
+ | |||
+ | <pre> | ||
+ | DbResponse assembleResponse(OperationContext* opCtx, const Message& m, const HostAndPort& remote) { | ||
+ | // before we lock... | ||
+ | NetworkOp op = m.operation(); | ||
+ | bool isCommand = false; | ||
+ | |||
+ | DbMessage dbmsg(m); | ||
+ | |||
+ | if (op == dbQuery) { | ||
+ | if (nsString.isCommand()) { | ||
+ | isCommand = true; | ||
+ | opwrite(m); | ||
+ | } | ||
+ | // TODO: remove this entire code path after 3.2. Refs SERVER-7775 | ||
+ | else if (nsString.isSpecialCommand()) { | ||
+ | opwrite(m); | ||
+ | |||
+ | if (nsString.coll() == "$cmd.sys.inprog") { | ||
+ | return receivedPseudoCommand(opCtx, c, m, "currentOp"); | ||
+ | } | ||
+ | if (nsString.coll() == "$cmd.sys.killop") { | ||
+ | return receivedPseudoCommand(opCtx, c, m, "killOp"); | ||
+ | } | ||
+ | if (nsString.coll() == "$cmd.sys.unlock") { | ||
+ | return receivedPseudoCommand(opCtx, c, m, "fsyncUnlock"); | ||
+ | } | ||
+ | } else { | ||
+ | opread(m); | ||
+ | } | ||
+ | } else if (op == dbGetMore) { | ||
+ | opread(m); | ||
+ | } else if (op == dbCommand || op == dbMsg) { | ||
+ | isCommand = true; | ||
+ | opwrite(m); | ||
+ | } else { | ||
+ | opwrite(m); | ||
+ | } | ||
+ | |||
+ | CurOp& currentOp = *CurOp::get(opCtx); | ||
+ | { | ||
+ | stdx::lock_guard<Client> lk(*opCtx->getClient()); | ||
+ | // Commands handling code will reset this if the operation is a command | ||
+ | // which is logically a basic CRUD operation like query, insert, etc. | ||
+ | currentOp.setNetworkOp_inlock(op); | ||
+ | currentOp.setLogicalOp_inlock(networkOpToLogicalOp(op)); | ||
+ | } | ||
+ | |||
+ | OpDebug& debug = currentOp.debug(); | ||
+ | |||
+ | long long logThresholdMs = serverGlobalParams.slowMS; | ||
+ | bool shouldLogOpDebug = shouldLog(logger::LogSeverity::Debug(1)); | ||
+ | |||
+ | DbResponse dbresponse; | ||
+ | if (op == dbQuery) { | ||
+ | dbresponse = isCommand ? receivedCommand(opCtx, nsString, c, m) | ||
+ | : receivedQuery(opCtx, nsString, c, m); | ||
+ | } else if (op == dbMsg) { | ||
+ | dbresponse = receivedMsg(opCtx, c, m); | ||
+ | } else if (op == dbCommand) { | ||
+ | dbresponse = receivedRpc(opCtx, c, m); | ||
+ | } else if (op == dbGetMore) { | ||
+ | dbresponse = receivedGetMore(opCtx, m, currentOp, &shouldLogOpDebug); | ||
+ | } else { | ||
+ | </pre> | ||
+ | |||
+ | 这就是一个典型的根据命令类型派发的过程,比较奇怪的是没有使用枚举+ switch 的方式,而是一堆 if .. else。 | ||
+ | |||
+ | 派发的核心是转发 commands 和 query,分别调用的是 runCommands 和 runQuery 方法,前者定义在 run_commands.cpp,后者定义在 query/find.cpp 里,一个是各种命令,一个是查询。 |
2017年5月24日 (三) 07:42的版本
网络层
- 具体实现在 src/mongo/util/net 目录下:
├── abstract_message_port.h ├── asio_message_port.cpp ├── asio_message_port.h ├── asio_ssl_context.cpp ├── asio_ssl_context.h ├── hostandport.cpp ├── hostandport.h ├── hostandport_test.cpp ├── hostname_canonicalization.cpp ├── hostname_canonicalization.h ├── listen.cpp ├── listen.h ├── message.cpp ├── message.h ├── message_port.cpp ├── message_port.h ├── message_port_mock.cpp ├── message_port_mock.h ├── message_port_startup_param.cpp ├── message_port_startup_param.h ├── op_msg.cpp ├── op_msg.h ├── sock.cpp ├── sock.h ├── sock_test.cpp ├── sockaddr.cpp ├── sockaddr.h ├── socket_exception.cpp ├── socket_exception.h ├── socket_poll.cpp ├── socket_poll.h ├── ssl_expiration.cpp ├── ssl_expiration.h ├── ssl_manager.cpp ├── ssl_manager.h ├── ssl_options.cpp ├── ssl_options.h ├── ssl_types.h ├── thread_idle_callback.cpp └── thread_idle_callback.h
- 针对上层提供的服务接口定义在 src/mongo/transport 下,核心就是 transport_layer , transport_layer_legacy 和 service_entry_point,具体的关系是 TransportLayer 持有一个 acceptor ,当连接进来,包装成 session,然后调用ServiceEntryPoint.startSession 方法,进入一个读取请求-处理请求-应答请求的循环。 这里可能为了兼容老的代码,transport_layer_legacy 实现了 TransportLayer,并兼容老的代码。 ServiceEntryPoint 的子类 ServiceEntryPointImpl 里实现了 startSession 和 _sessionLoop 框架,每个连接启动一个线程处理:
//service_entry_point_impl.cpp void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { // Pass ownership of the transport::SessionHandle into our worker thread. When this // thread exits, the session will end. launchWrappedServiceEntryWorkerThread( std::move(session), [this](const transport::SessionHandle& session) { _nWorkers.fetchAndAdd(1); auto guard = MakeGuard([&] { _nWorkers.fetchAndSubtract(1); }); _sessionLoop(session); }); } void ServiceEntryPointImpl::_sessionLoop(const transport::SessionHandle& session) { Message inMessage; bool inExhaust = false; int64_t counter = 0; while (true) { ......... // The handleRequest is implemented in a subclass for mongod/mongos and actually all the // database work for this request. DbResponse dbresponse = this->handleRequest(opCtx.get(), inMessage, session->remote()); ...... } }
实际是转交给 ServiceEntryPointImpl 的子类 mongod 和 mongos 的 handleRequest 处理请求。
- transport_layer_legacy.cpp 就是调用 util/net 下面的类和方法,实现一个典型的 TCP 服务器了,其中 handleNewConnection 处理新建连接,构造函数里开始 listen:
TransportLayerLegacy::TransportLayerLegacy(const TransportLayerLegacy::Options& opts, ServiceEntryPoint* sep) : _sep(sep), _listener(stdx::make_unique<ListenerLegacy>( opts, stdx::bind(&TransportLayerLegacy::_handleNewConnection, this, stdx::placeholders::_1))), _running(false), _options(opts) {} void TransportLayerLegacy::_handleNewConnection(std::unique_ptr<AbstractMessagingPort> amp) { if (!Listener::globalTicketHolder.tryAcquire()) { log() << "connection refused because too many open connections: " << Listener::globalTicketHolder.used(); amp->shutdown(); return; } amp->setLogLevel(logger::LogSeverity::Debug(1)); auto session = LegacySession::create(std::move(amp), this); stdx::list<std::weak_ptr<LegacySession>> list; auto it = list.emplace(list.begin(), session); { // Add the new session to our list stdx::lock_guard<stdx::mutex> lk(_sessionsMutex); session->setIter(it); _sessions.splice(_sessions.begin(), list, it); } invariant(_sep); _sep->startSession(std::move(session)); }
调用了 _sep->startSession(std::move(session));,其中 _sep 就是 ServiceEntryPoint 指针,在头文件 class 定义了。
- 网络层来看, accept 采用了 select 调用,读写请求还是同步阻塞的过程。
db 层
- db 这一层源码都在 src/mongo/db/ 下,其中 commands 对应各种命令, storage 是各种存储引擎,比如 MMAPv1, wiredtiger 等, query 就是专门用于查询的。
- mongod 是 ServiceEntryPointImpl 的实现,在 src/mongo/db/service_entry_point_mongod.cpp ,实现 handleRequest 方法:
DbResponse ServiceEntryPointMongod::handleRequest(OperationContext* opCtx, const Message& request, const HostAndPort& client) { return assembleResponse(opCtx, request, client); }
转交调用了 assembleResponse 方法,定义在 assemble_response.cpp 中:
DbResponse assembleResponse(OperationContext* opCtx, const Message& m, const HostAndPort& remote) { // before we lock... NetworkOp op = m.operation(); bool isCommand = false; DbMessage dbmsg(m); if (op == dbQuery) { if (nsString.isCommand()) { isCommand = true; opwrite(m); } // TODO: remove this entire code path after 3.2. Refs SERVER-7775 else if (nsString.isSpecialCommand()) { opwrite(m); if (nsString.coll() == "$cmd.sys.inprog") { return receivedPseudoCommand(opCtx, c, m, "currentOp"); } if (nsString.coll() == "$cmd.sys.killop") { return receivedPseudoCommand(opCtx, c, m, "killOp"); } if (nsString.coll() == "$cmd.sys.unlock") { return receivedPseudoCommand(opCtx, c, m, "fsyncUnlock"); } } else { opread(m); } } else if (op == dbGetMore) { opread(m); } else if (op == dbCommand || op == dbMsg) { isCommand = true; opwrite(m); } else { opwrite(m); } CurOp& currentOp = *CurOp::get(opCtx); { stdx::lock_guard<Client> lk(*opCtx->getClient()); // Commands handling code will reset this if the operation is a command // which is logically a basic CRUD operation like query, insert, etc. currentOp.setNetworkOp_inlock(op); currentOp.setLogicalOp_inlock(networkOpToLogicalOp(op)); } OpDebug& debug = currentOp.debug(); long long logThresholdMs = serverGlobalParams.slowMS; bool shouldLogOpDebug = shouldLog(logger::LogSeverity::Debug(1)); DbResponse dbresponse; if (op == dbQuery) { dbresponse = isCommand ? receivedCommand(opCtx, nsString, c, m) : receivedQuery(opCtx, nsString, c, m); } else if (op == dbMsg) { dbresponse = receivedMsg(opCtx, c, m); } else if (op == dbCommand) { dbresponse = receivedRpc(opCtx, c, m); } else if (op == dbGetMore) { dbresponse = receivedGetMore(opCtx, m, currentOp, &shouldLogOpDebug); } else {
这就是一个典型的根据命令类型派发的过程,比较奇怪的是没有使用枚举+ switch 的方式,而是一堆 if .. else。
派发的核心是转发 commands 和 query,分别调用的是 runCommands 和 runQuery 方法,前者定义在 run_commands.cpp,后者定义在 query/find.cpp 里,一个是各种命令,一个是查询。