查看Mongodb 源码阅读笔记的源代码
←
Mongodb 源码阅读笔记
跳转到:
导航
、
搜索
因为以下原因,你没有权限编辑本页:
您刚才请求的操作只有这个用户组中的用户才能使用:
用户
您可以查看并复制此页面的源代码:
== db 层 == * db 这一层源码都在 src/mongo/db/ 下,其中 commands 对应各种命令, storage 是各种存储引擎,比如 MMAPv1, wiredtiger 等, query 就是专门用于查询的。ops 里面是各种写入(插入、更新和删除等)的 ops 实现,query 专门处理查询,catalog 管理 db 和 collection。 * mongod 是 ServiceEntryPointImpl 的实现,在 src/mongo/db/service_entry_point_mongod.cpp ,实现 handleRequest 方法: <pre> DbResponse ServiceEntryPointMongod::handleRequest(OperationContext* opCtx, const Message& request, const HostAndPort& client) { return assembleResponse(opCtx, request, client); } </pre> 转交调用了 assembleResponse 方法,定义在 assemble_response.cpp 中: <pre> DbResponse assembleResponse(OperationContext* opCtx, const Message& m, const HostAndPort& remote) { // before we lock... NetworkOp op = m.operation(); bool isCommand = false; DbMessage dbmsg(m); if (op == dbQuery) { if (nsString.isCommand()) { isCommand = true; opwrite(m); } // TODO: remove this entire code path after 3.2. Refs SERVER-7775 else if (nsString.isSpecialCommand()) { opwrite(m); if (nsString.coll() == "$cmd.sys.inprog") { return receivedPseudoCommand(opCtx, c, m, "currentOp"); } if (nsString.coll() == "$cmd.sys.killop") { return receivedPseudoCommand(opCtx, c, m, "killOp"); } if (nsString.coll() == "$cmd.sys.unlock") { return receivedPseudoCommand(opCtx, c, m, "fsyncUnlock"); } } else { opread(m); } } else if (op == dbGetMore) { opread(m); } else if (op == dbCommand || op == dbMsg) { isCommand = true; opwrite(m); } else { opwrite(m); } CurOp& currentOp = *CurOp::get(opCtx); { stdx::lock_guard<Client> lk(*opCtx->getClient()); // Commands handling code will reset this if the operation is a command // which is logically a basic CRUD operation like query, insert, etc. currentOp.setNetworkOp_inlock(op); currentOp.setLogicalOp_inlock(networkOpToLogicalOp(op)); } OpDebug& debug = currentOp.debug(); long long logThresholdMs = serverGlobalParams.slowMS; bool shouldLogOpDebug = shouldLog(logger::LogSeverity::Debug(1)); DbResponse dbresponse; if (op == dbQuery) { dbresponse = isCommand ? receivedCommand(opCtx, nsString, c, m) : receivedQuery(opCtx, nsString, c, m); } else if (op == dbMsg) { dbresponse = receivedMsg(opCtx, c, m); } else if (op == dbCommand) { dbresponse = receivedRpc(opCtx, c, m); } else if (op == dbGetMore) { dbresponse = receivedGetMore(opCtx, m, currentOp, &shouldLogOpDebug); } else { </pre> 这就是一个典型的根据命令类型派发的过程,比较奇怪的是没有使用枚举+ switch 的方式,而是一堆 if .. else。 派发的核心是转发 commands 和 query,分别调用的是 runCommands 和 runQuery 方法,前者定义在 run_commands.cpp,后者定义在 query/find.cpp 里,一个是各种命令,一个是查询。不过在 assembleResponse 里还有一些 receivedInsert, receivedDelete 之类的处理逻辑,看代码注释是说『// The remaining operations do not return any response. They are fire-and-forget.』,也就是这些请求都不需要应答,因此应该不是正常的插入、更新和删除的请求处理过程,具体用于什么暂时不清楚。 * 插入、删除和更新过程还是 run_commands.cpp 里, 本质上在 commands.cpp 保存了一张表 ,映射了名字到具体的 command, Command 的每个具体子类都会在构造函数里将自己注册到这里。 <pre> //commands.h static CommandMap* _commands; static CommandMap* _commandsByBestName; //commands.cpp Command::Command(StringData name, bool webUI, StringData oldName) : _name(name.toString()), _webUI(webUI), _commandsExecutedMetric("commands." + _name + ".total", &_commandsExecuted), _commandsFailedMetric("commands." + _name + ".failed", &_commandsFailed) { // register ourself. if (_commands == 0) _commands = new CommandMap(); if (_commandsByBestName == 0) _commandsByBestName = new CommandMap(); Command*& c = (*_commands)[name]; if (c) log() << "warning: 2 commands with name: " << _name; c = this; (*_commandsByBestName)[name] = this; if (!oldName.empty()) (*_commands)[oldName.toString()] = this; } </pre> 比如 WriteCommand(用于数据写入,包括插入、更新删除),定义在 src/mongo/db/commands/write_commands/write_commands.cpp: <pre> class WriteCommand : public Command { public: explicit WriteCommand(StringData name) : Command(name) {} </pre> 而插入 CmdInsert 就是 WriteCommand 的一个子类,其他更新、删除类似: <pre> class CmdInsert final : public WriteCommand { public: CmdInsert() : WriteCommand("insert") {} void runImpl(OperationContext* opCtx, const std::string& dbname, const BSONObj& cmdObj, BSONObjBuilder& result) final { const auto batch = parseInsertCommand(dbname, cmdObj); const auto reply = performInserts(opCtx, batch); serializeReply(opCtx, ReplyStyle::kNotUpdate, batch.continueOnError, batch.documents.size(), reply, &result); } } cmdInsert; </pre> 实际的插入执行的是 performInsert 方法,定义在 src/mongo/db/ops/write_ops_exec.cpp 中: <pre> WriteResult performInserts(OperationContext* opCtx, const InsertOp& wholeOp) { invariant(!opCtx->lockState()->inAWriteUnitOfWork()); // Does own retries. auto& curOp = *CurOp::get(opCtx); ...... for (auto&& doc : wholeOp.documents) { const bool isLastDoc = (&doc == &wholeOp.documents.back()); auto fixedDoc = fixDocumentForInsert(opCtx->getServiceContext(), doc); ...... bool canContinue = insertBatchAndHandleErrors(opCtx, wholeOp, batch, &lastOpFixer, &out); ... } </pre> 实际执行的是 insertBatchAndHandleErrors 方法,逻辑是比较清楚了,先找到 ns ,获取对应的 db 和 collection,然后执行 collection 的 insertDocuments 方法: <pre> try { acquireCollection(); .... .... insertDocuments(opCtx, collection->getCollection(), batch.begin(), batch.end()); </pre> 插入对非 capped 集合做一次 all-at-once 批量插入,如果不行,再循环走一次 one-at-a-time 插入。 insertDocuments 定义在 src/mongo/db/catalog/collection_impl.cpp 里,每个 collection 都保存了一个 StorageEngine 创建的 RecordStore,调用 recordStore 做插入: <pre> Status CollectionImpl::_insertDocuments(OperationContext* opCtx, const vector<BSONObj>::const_iterator begin, const vector<BSONObj>::const_iterator end, bool enforceQuota, OpDebug* opDebug) { Status status = _recordStore->insertRecords(opCtx, &records, _enforceQuota(enforceQuota)); if (!status.isOK()) return status; status = _indexCatalog.indexRecords(opCtx, bsonRecords, &keysInserted); if (opDebug) { opDebug->keysInserted += keysInserted; } </pre> 可以看到除了插入数据,还要更新下索引。
返回到
Mongodb 源码阅读笔记
。
个人工具
登录
名字空间
页面
讨论
变换
查看
阅读
查看源代码
查看历史
操作
搜索
导航
首页
社区专页
新闻动态
最近更改
随机页面
帮助
工具箱
链入页面
相关更改
特殊页面