“Mongodb 源码阅读笔记”的版本间的差异

来自Dennis的知识库
跳转到: 导航搜索
db 层
第125行: 第125行:
 
== db 层 ==
 
== db 层 ==
  
* db 这一层源码都在 src/mongo/db/ 下,其中 commands 对应各种命令, storage 是各种存储引擎,比如 MMAPv1, wiredtiger 等, query 就是专门用于查询的。
+
* db 这一层源码都在 src/mongo/db/ 下,其中 commands 对应各种命令, storage 是各种存储引擎,比如 MMAPv1, wiredtiger 等, query 就是专门用于查询的。ops 里面是各种写入(插入、更新和删除等)的 ops 实现,query 专门处理查询,catalog 管理 db 和 collection。
 
* mongod 是  ServiceEntryPointImpl 的实现,在 src/mongo/db/service_entry_point_mongod.cpp ,实现 handleRequest 方法:
 
* mongod 是  ServiceEntryPointImpl 的实现,在 src/mongo/db/service_entry_point_mongod.cpp ,实现 handleRequest 方法:
  
第306行: 第306行:
  
 
插入对非 capped 集合做一次 all-at-once 批量插入,如果不行,再循环走一次 one-at-a-time 插入。
 
插入对非 capped 集合做一次 all-at-once 批量插入,如果不行,再循环走一次 one-at-a-time 插入。
 +
 +
insertDocuments 定义在 src/mongo/db/catalog/collection_impl.cpp 里,每个 collection 都保存了一个 StorageEngine 创建的 RecordStore,调用 recordStore 做插入:
 +
 +
<pre>
 +
Status CollectionImpl::_insertDocuments(OperationContext* opCtx,
 +
                                        const vector<BSONObj>::const_iterator begin,
 +
                                        const vector<BSONObj>::const_iterator end,
 +
                                        bool enforceQuota,
 +
                                        OpDebug* opDebug) {
 +
 +
    Status status = _recordStore->insertRecords(opCtx, &records, _enforceQuota(enforceQuota));
 +
    if (!status.isOK())
 +
        return status;
 +
 +
    status = _indexCatalog.indexRecords(opCtx, bsonRecords, &keysInserted);
 +
    if (opDebug) {
 +
        opDebug->keysInserted += keysInserted;
 +
    }
 +
 +
</pre>
 +
 +
可以看到除了插入数据,还要更新下索引。

2017年5月25日 (四) 03:25的版本

网络层

  • 具体实现在 src/mongo/util/net 目录下:
├── abstract_message_port.h
├── asio_message_port.cpp
├── asio_message_port.h
├── asio_ssl_context.cpp
├── asio_ssl_context.h
├── hostandport.cpp
├── hostandport.h
├── hostandport_test.cpp
├── hostname_canonicalization.cpp
├── hostname_canonicalization.h
├── listen.cpp
├── listen.h
├── message.cpp
├── message.h
├── message_port.cpp
├── message_port.h
├── message_port_mock.cpp
├── message_port_mock.h
├── message_port_startup_param.cpp
├── message_port_startup_param.h
├── op_msg.cpp
├── op_msg.h
├── sock.cpp
├── sock.h
├── sock_test.cpp
├── sockaddr.cpp
├── sockaddr.h
├── socket_exception.cpp
├── socket_exception.h
├── socket_poll.cpp
├── socket_poll.h
├── ssl_expiration.cpp
├── ssl_expiration.h
├── ssl_manager.cpp
├── ssl_manager.h
├── ssl_options.cpp
├── ssl_options.h
├── ssl_types.h
├── thread_idle_callback.cpp
└── thread_idle_callback.h
  • 针对上层提供的服务接口定义在 src/mongo/transport 下,核心就是 transport_layer , transport_layer_legacy 和 service_entry_point,具体的关系是 TransportLayer 持有一个 acceptor ,当连接进来,包装成 session,然后调用ServiceEntryPoint.startSession 方法,进入一个读取请求-处理请求-应答请求的循环。 这里可能为了兼容老的代码,transport_layer_legacy 实现了 TransportLayer,并兼容老的代码。 ServiceEntryPoint 的子类 ServiceEntryPointImpl 里实现了 startSession 和 _sessionLoop 框架,每个连接启动一个线程处理:
//service_entry_point_impl.cpp
void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
    // Pass ownership of the transport::SessionHandle into our worker thread. When this
    // thread exits, the session will end.
    launchWrappedServiceEntryWorkerThread(
        std::move(session), [this](const transport::SessionHandle& session) {
            _nWorkers.fetchAndAdd(1);
            auto guard = MakeGuard([&] { _nWorkers.fetchAndSubtract(1); });

            _sessionLoop(session);
        });
}

void ServiceEntryPointImpl::_sessionLoop(const transport::SessionHandle& session) {
    Message inMessage;
    bool inExhaust = false;
    int64_t counter = 0;

    while (true) {
    .........
        // The handleRequest is implemented in a subclass for mongod/mongos and actually all the
        // database work for this request.
        DbResponse dbresponse = this->handleRequest(opCtx.get(), inMessage, session->remote());
     ......
   }
}

实际是转交给 ServiceEntryPointImpl 的子类 mongod 和 mongos 的 handleRequest 处理请求。

  • transport_layer_legacy.cpp 就是调用 util/net 下面的类和方法,实现一个典型的 TCP 服务器了,其中 handleNewConnection 处理新建连接,构造函数里开始 listen:
TransportLayerLegacy::TransportLayerLegacy(const TransportLayerLegacy::Options& opts,
                                           ServiceEntryPoint* sep)
    : _sep(sep),
      _listener(stdx::make_unique<ListenerLegacy>(
          opts,
          stdx::bind(&TransportLayerLegacy::_handleNewConnection, this, stdx::placeholders::_1))),
      _running(false),
      _options(opts) {}

void TransportLayerLegacy::_handleNewConnection(std::unique_ptr<AbstractMessagingPort> amp) {
    if (!Listener::globalTicketHolder.tryAcquire()) {
        log() << "connection refused because too many open connections: "
              << Listener::globalTicketHolder.used();
        amp->shutdown();
        return;
    }

    amp->setLogLevel(logger::LogSeverity::Debug(1));
    auto session = LegacySession::create(std::move(amp), this);

    stdx::list<std::weak_ptr<LegacySession>> list;
    auto it = list.emplace(list.begin(), session);

    {
        // Add the new session to our list
        stdx::lock_guard<stdx::mutex> lk(_sessionsMutex);
        session->setIter(it);
        _sessions.splice(_sessions.begin(), list, it);
    }

    invariant(_sep);
    _sep->startSession(std::move(session));
}

调用了 _sep->startSession(std::move(session));,其中 _sep 就是 ServiceEntryPoint 指针,在头文件 class 定义了。

  • 网络层来看, accept 采用了 select 调用,读写请求还是同步阻塞的过程。

db 层

  • db 这一层源码都在 src/mongo/db/ 下,其中 commands 对应各种命令, storage 是各种存储引擎,比如 MMAPv1, wiredtiger 等, query 就是专门用于查询的。ops 里面是各种写入(插入、更新和删除等)的 ops 实现,query 专门处理查询,catalog 管理 db 和 collection。
  • mongod 是 ServiceEntryPointImpl 的实现,在 src/mongo/db/service_entry_point_mongod.cpp ,实现 handleRequest 方法:
DbResponse ServiceEntryPointMongod::handleRequest(OperationContext* opCtx,
                                                  const Message& request,
                                                  const HostAndPort& client) {
    return assembleResponse(opCtx, request, client);
}

转交调用了 assembleResponse 方法,定义在 assemble_response.cpp 中:

DbResponse assembleResponse(OperationContext* opCtx, const Message& m, const HostAndPort& remote) {
    // before we lock...
    NetworkOp op = m.operation();
    bool isCommand = false;

    DbMessage dbmsg(m);

  if (op == dbQuery) {
        if (nsString.isCommand()) {
            isCommand = true;
            opwrite(m);
        }
        // TODO: remove this entire code path after 3.2. Refs SERVER-7775
        else if (nsString.isSpecialCommand()) {
            opwrite(m);

            if (nsString.coll() == "$cmd.sys.inprog") {
                return receivedPseudoCommand(opCtx, c, m, "currentOp");
            }
            if (nsString.coll() == "$cmd.sys.killop") {
                return receivedPseudoCommand(opCtx, c, m, "killOp");
            }
            if (nsString.coll() == "$cmd.sys.unlock") {
                return receivedPseudoCommand(opCtx, c, m, "fsyncUnlock");
            }
        } else {
            opread(m);
        }
    } else if (op == dbGetMore) {
        opread(m);
    } else if (op == dbCommand || op == dbMsg) {
        isCommand = true;
        opwrite(m);
    } else {
        opwrite(m);
    }

 CurOp& currentOp = *CurOp::get(opCtx);
    {
        stdx::lock_guard<Client> lk(*opCtx->getClient());
        // Commands handling code will reset this if the operation is a command
        // which is logically a basic CRUD operation like query, insert, etc.
        currentOp.setNetworkOp_inlock(op);
        currentOp.setLogicalOp_inlock(networkOpToLogicalOp(op));
    }

    OpDebug& debug = currentOp.debug();

    long long logThresholdMs = serverGlobalParams.slowMS;
    bool shouldLogOpDebug = shouldLog(logger::LogSeverity::Debug(1));

    DbResponse dbresponse;
    if (op == dbQuery) {
        dbresponse = isCommand ? receivedCommand(opCtx, nsString, c, m)
                               : receivedQuery(opCtx, nsString, c, m);
    } else if (op == dbMsg) {
        dbresponse = receivedMsg(opCtx, c, m);
    } else if (op == dbCommand) {
        dbresponse = receivedRpc(opCtx, c, m);
    } else if (op == dbGetMore) {
        dbresponse = receivedGetMore(opCtx, m, currentOp, &shouldLogOpDebug);
    } else {

这就是一个典型的根据命令类型派发的过程,比较奇怪的是没有使用枚举+ switch 的方式,而是一堆 if .. else。

派发的核心是转发 commands 和 query,分别调用的是 runCommands 和 runQuery 方法,前者定义在 run_commands.cpp,后者定义在 query/find.cpp 里,一个是各种命令,一个是查询。不过在 assembleResponse 里还有一些 receivedInsert, receivedDelete 之类的处理逻辑,看代码注释是说『// The remaining operations do not return any response. They are fire-and-forget.』,也就是这些请求都不需要应答,因此应该不是正常的插入、更新和删除的请求处理过程,具体用于什么暂时不清楚。

  • 插入、删除和更新过程还是 run_commands.cpp 里, 本质上在 commands.cpp 保存了一张表 ,映射了名字到具体的 command, Command 的每个具体子类都会在构造函数里将自己注册到这里。

    //commands.h
    static CommandMap* _commands;
    static CommandMap* _commandsByBestName;


   //commands.cpp

Command::Command(StringData name, bool webUI, StringData oldName)
    : _name(name.toString()),
      _webUI(webUI),
      _commandsExecutedMetric("commands." + _name + ".total", &_commandsExecuted),
      _commandsFailedMetric("commands." + _name + ".failed", &_commandsFailed) {
    // register ourself.
    if (_commands == 0)
        _commands = new CommandMap();
    if (_commandsByBestName == 0)
        _commandsByBestName = new CommandMap();
    Command*& c = (*_commands)[name];
    if (c)
        log() << "warning: 2 commands with name: " << _name;
    c = this;
    (*_commandsByBestName)[name] = this;

    if (!oldName.empty())
        (*_commands)[oldName.toString()] = this;
}


比如 WriteCommand(用于数据写入,包括插入、更新删除),定义在 src/mongo/db/commands/write_commands/write_commands.cpp:

class WriteCommand : public Command {
public:
    explicit WriteCommand(StringData name) : Command(name) {}

而插入 CmdInsert 就是 WriteCommand 的一个子类,其他更新、删除类似:

class CmdInsert final : public WriteCommand {
public:
    CmdInsert() : WriteCommand("insert") {}


    void runImpl(OperationContext* opCtx,
                 const std::string& dbname,
                 const BSONObj& cmdObj,
                 BSONObjBuilder& result) final {
        const auto batch = parseInsertCommand(dbname, cmdObj);
        const auto reply = performInserts(opCtx, batch);
        serializeReply(opCtx,
                       ReplyStyle::kNotUpdate,
                       batch.continueOnError,
                       batch.documents.size(),
                       reply,
                       &result);
    }
} cmdInsert;

实际的插入执行的是 performInsert 方法,定义在 src/mongo/db/ops/write_ops_exec.cpp 中:

WriteResult performInserts(OperationContext* opCtx, const InsertOp& wholeOp) {
    invariant(!opCtx->lockState()->inAWriteUnitOfWork());  // Does own retries.
    auto& curOp = *CurOp::get(opCtx);
    ......
    for (auto&& doc : wholeOp.documents) {
        const bool isLastDoc = (&doc == &wholeOp.documents.back());
        auto fixedDoc = fixDocumentForInsert(opCtx->getServiceContext(), doc);
        ......
                bool canContinue = insertBatchAndHandleErrors(opCtx, wholeOp, batch, &lastOpFixer, &out);
       ...


}

实际执行的是 insertBatchAndHandleErrors 方法,逻辑是比较清楚了,先找到 ns ,获取对应的 db 和 collection,然后执行 collection 的 insertDocuments 方法:

  try {
        acquireCollection();
   ....
   ....
    insertDocuments(opCtx, collection->getCollection(), batch.begin(), batch.end());

插入对非 capped 集合做一次 all-at-once 批量插入,如果不行,再循环走一次 one-at-a-time 插入。

insertDocuments 定义在 src/mongo/db/catalog/collection_impl.cpp 里,每个 collection 都保存了一个 StorageEngine 创建的 RecordStore,调用 recordStore 做插入:

Status CollectionImpl::_insertDocuments(OperationContext* opCtx,
                                        const vector<BSONObj>::const_iterator begin,
                                        const vector<BSONObj>::const_iterator end,
                                        bool enforceQuota,
                                        OpDebug* opDebug) {

    Status status = _recordStore->insertRecords(opCtx, &records, _enforceQuota(enforceQuota));
    if (!status.isOK())
        return status;

    status = _indexCatalog.indexRecords(opCtx, bsonRecords, &keysInserted);
    if (opDebug) {
        opDebug->keysInserted += keysInserted;
    }

可以看到除了插入数据,还要更新下索引。

个人工具
名字空间

变换
操作
导航
工具箱