Impala 源码分析-BE

在Impala中SQL Query的入口函数是:void ImpalaServer::query(QueryHandle& query_handle, const Query& query)(),源代码如下:

void ImpalaServer::query(QueryHandle& query_handle, const Query& query) {
  VLOG_QUERY << "query(): query=" << query.query;
  ScopedSessionState session_handle(this);
  shared_ptr<SessionState> session;
  RAISE_IF_ERROR(// 为当前连接返回唯一标识,标记会话为使用中并保存
      session_handle.WithSession(ThriftServer::GetThreadConnectionId(), &session),
      SQLSTATE_GENERAL_ERROR);
  TQueryCtx query_ctx;
  // 将 Query 转化为 TQueryCtx
  // raise general error for request conversion error;
  RAISE_IF_ERROR(QueryToTQueryContext(query, &query_ctx), SQLSTATE_GENERAL_ERROR);

  // raise Syntax error or access violation; it's likely to be syntax/analysis error
  // TODO: that may not be true; fix this
  shared_ptr<QueryExecState> exec_state;
  	// 开始异步执行查询,内部调用 ImpalaServer::Execute() 函数
  	// 将 TQueryCtx 转换为 QueryExecState,注册并调用 Coordinator::Execute()
  RAISE_IF_ERROR(Execute(&query_ctx, session, &exec_state),
      SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);

  exec_state->UpdateQueryState(QueryState::RUNNING);
  // start thread to wait for results to become available, which will allow
  // us to advance query state to FINISHED or EXCEPTION
  exec_state->WaitAsync();
  // Once the query is running do a final check for session closure and add it to the
  // set of in-flight queries.
  Status status = SetQueryInflight(session, exec_state);
  if (!status.ok()) {
    UnregisterQuery(exec_state->query_id(), false, &status);
    RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
  }
  TUniqueIdToQueryHandle(exec_state->query_id(), &query_handle);
}

内部调用 ImpalaServer::Execute()(ImpalaServer.h)函数将 TQueryCtx 转换为 TExecRequest,具体逻辑通过调用 ImpalaServer::ExecuteInternal() 实现。代码如下:

Status ImpalaServer::Execute(TQueryCtx* query_ctx,
    shared_ptr<SessionState> session_state,
    shared_ptr<QueryExecState>* exec_state) {
  PrepareQueryContext(query_ctx);
  bool registered_exec_state;
  ImpaladMetrics::IMPALA_SERVER_NUM_QUERIES->Increment(1L);

  // Redact the SQL stmt and update the query context
  string stmt = replace_all_copy(query_ctx->request.stmt, "\n", " ");
  Redact(&stmt);
  query_ctx->request.__set_redacted_stmt((const string) stmt);
	// 实现 Execute() 逻辑,出错时不取消注册查询
  Status status = ExecuteInternal(*query_ctx, session_state, &registered_exec_state,
      exec_state);
  if (!status.ok() && registered_exec_state) {
    UnregisterQuery((*exec_state)->query_id(), false, &status);
  }
  return status;
}

上面的函数调用 ImpalaServer::ExecuteInternal()(ImpalaServer.h),在这个函数里通过 JNI 接口调用 frontend.createExecRequest() 生成 TExecRequest,具体代码如下:

Status ImpalaServer::ExecuteInternal(
    const TQueryCtx& query_ctx,
    shared_ptr<SessionState> session_state,
    bool* registered_exec_state,
    shared_ptr<QueryExecState>* exec_state) {
  DCHECK(session_state != NULL);
  *registered_exec_state = false;
  if (IsOffline()) {
    return Status("This Impala server is offline. Please retry your query later.");
  }
  exec_state->reset(new QueryExecState(query_ctx, exec_env_, exec_env_->frontend(),
      this, session_state));

  (*exec_state)->query_events()->MarkEvent("Start execution");

  TExecRequest result;
  {
    // Keep a lock on exec_state so that registration and setting
    // result_metadata are atomic.
    //
    // Note: this acquires the exec_state lock *before* the
    // query_exec_state_map_ lock. This is the opposite of
    // GetQueryExecState(..., true), and therefore looks like a
    // candidate for deadlock. The reason this works here is that
    // GetQueryExecState cannot find exec_state (under the exec state
    // map lock) and take it's lock until RegisterQuery has
    // finished. By that point, the exec state map lock will have been
    // given up, so the classic deadlock interleaving is not possible.
    lock_guard<mutex> l(*(*exec_state)->lock());

    // register exec state as early as possible so that queries that
    // take a long time to plan show up, and to handle incoming status
    // reports before execution starts.
    RETURN_IF_ERROR(RegisterQuery(session_state, *exec_state));
    *registered_exec_state = true;

    RETURN_IF_ERROR((*exec_state)->UpdateQueryStatus(
    // 通过 JNI 接口调用 frontend.createExecRequest() 生成 TExecRequest
        exec_env_->frontend()->GetExecRequest(query_ctx, &result)));
    (*exec_state)->query_events()->MarkEvent("Planning finished");
    (*exec_state)->summary_profile()->AddEventSequence(
        result.timeline.name, result.timeline);
    if (result.__isset.result_set_metadata) {
      (*exec_state)->set_result_metadata(result.result_set_metadata);
    }
  }
  VLOG(2) << "Execution request: " << ThriftDebugString(result);

  // 开始执行查询;同时开始 fragment 状态报告
  RETURN_IF_ERROR((*exec_state)->Exec(&result));
  if (result.stmt_type == TStmtType::DDL) {
    Status status = UpdateCatalogMetrics();
    if (!status.ok()) {
      VLOG_QUERY << "Couldn't update catalog metrics: " << status.GetDetail();
    }
  }

  if ((*exec_state)->coord() != NULL) {
    const unordered_set<TNetworkAddress>& unique_hosts =
        (*exec_state)->schedule()->unique_hosts();
    if (!unique_hosts.empty()) {
      lock_guard<mutex> l(query_locations_lock_);
      BOOST_FOREACH(const TNetworkAddress& port, unique_hosts) {
        query_locations_[port].insert((*exec_state)->query_id());
      }
    }
  }
  return Status::OK();
}

Frontend::GetExecRequest()(Frontend.h)通过 JNI 接口调用 frontend.createExecRequest() 生成 TExecRequest,这个过程的详细介绍可以参考我的上一篇博文 Impala 源码分析-FE。得到 Query 对应的 TExecRequest 对象之后,交由 impala-backend 开始执行查询,同时开始 fragment 状态报告。从下面的这个函数开始 backend 执行查询:Status ImpalaServer::QueryExecState::Exec(TExecRequest* exec_request)(QueryExecState.h),该函数的具体实现如下:

/**
 * 初始化 exec_request 执行
 */
Status ImpalaServer::QueryExecState::Exec(TExecRequest* exec_request) {
  MarkActive();
  exec_request_ = *exec_request;

  profile_.AddChild(&server_profile_);
  summary_profile_.AddInfoString("Query Type", PrintTStmtType(stmt_type()));
  summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_));

  // 根据不同的请求类型调用不同的处理函数
  switch (exec_request->stmt_type) {
    // QUERY 或 DML 操作
    case TStmtType::QUERY:
    case TStmtType::DML:
      DCHECK(exec_request_.__isset.query_exec_request);
      return ExecQueryOrDmlRequest(exec_request_.query_exec_request);
    // EXPLAIN 操作
    case TStmtType::EXPLAIN: {
      request_result_set_.reset(new vector<TResultRow>(
          exec_request_.explain_result.results));
      return Status::OK();
    }
    // DDL 操作
    case TStmtType::DDL: {
      DCHECK(exec_request_.__isset.catalog_op_request);
      return ExecDdlRequest();
    }
    // 加载数据
    case TStmtType::LOAD: {
      DCHECK(exec_request_.__isset.load_data_request);
      TLoadDataResp response;
      RETURN_IF_ERROR(
          // 调用 FE LoadData
          frontend_->LoadData(exec_request_.load_data_request, &response));
      request_result_set_.reset(new vector<TResultRow>);
      request_result_set_->push_back(response.load_summary);

      // 刷新表元数据
      TCatalogOpRequest reset_req;
      reset_req.__set_op_type(TCatalogOpType::RESET_METADATA);
      reset_req.__set_reset_metadata_params(TResetMetadataRequest());
      reset_req.reset_metadata_params.__set_header(TCatalogServiceRequestHeader());
      reset_req.reset_metadata_params.__set_is_refresh(true);
      reset_req.reset_metadata_params.__set_table_name(
          exec_request_.load_data_request.table_name);
      catalog_op_executor_.reset(
          new CatalogOpExecutor(exec_env_, frontend_, &server_profile_));
          // 调用 CatalogOpExecutor.Exec 执行指定的 Catalog 操作
      RETURN_IF_ERROR(catalog_op_executor_->Exec(reset_req));
      RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(
          *catalog_op_executor_->update_catalog_result(),
          exec_request_.query_options.sync_ddl));
      return Status::OK();
    }
    case TStmtType::SET: {
      DCHECK(exec_request_.__isset.set_query_option_request);
      lock_guard<mutex> l(session_->lock);
      if (exec_request_.set_query_option_request.__isset.key) {
        // "SET key=value" updates the session query options.
        DCHECK(exec_request_.set_query_option_request.__isset.value);
        RETURN_IF_ERROR(SetQueryOption(
            exec_request_.set_query_option_request.key,
            exec_request_.set_query_option_request.value,
            &session_->default_query_options));
      } else {
        // "SET" returns a table of all query options.
        map<string, string> config;
        TQueryOptionsToMap(
            session_->default_query_options, &config);
        vector<string> keys, values;
        map<string, string>::const_iterator itr = config.begin();
        for (; itr != config.end(); ++itr) {
          keys.push_back(itr->first);
          values.push_back(itr->second);
        }
        SetResultSet(keys, values);
      }
      return Status::OK();
    }
    default:
      stringstream errmsg;
      errmsg << "Unknown  exec request stmt type: " << exec_request_.stmt_type;
      return Status(errmsg.str());
  }
}

首先来看 QUERY 或 DML 操作,调用 QueryExecState 私有成员方法 ExecQueryOrDmlRequest()(QueryExecState.h),该方法的具体实现如下:

/**
 * 发起 query 或 dml 执行请求的核心逻辑;
 * 如果有的话初始化 plan fragments 执行;
 * 为后续调用 FetchRows() 设置输出表达式
 * 同时设置 profile 以及预执行计数器
 * 非阻塞
 */
Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest(
    const TQueryExecRequest& query_exec_request) {
  // we always need at least one plan fragment
  DCHECK_GT(query_exec_request.fragments.size(), 0);

    // 有 plan fragments
  if (query_exec_request.__isset.query_plan) {
    stringstream plan_ss;
    // 添加分隔符以区分 plan 开始和 profile 的结束
    plan_ss << "\n----------------\n"
            << query_exec_request.query_plan
            << "----------------";
    summary_profile_.AddInfoString("Plan", plan_ss.str());
  }
  // 添加 CM 使用的一些信息:预估内存/核数以及表缺失状态
  if (query_exec_request.__isset.per_host_mem_req) {
    stringstream ss;
    ss << query_exec_request.per_host_mem_req;
    summary_profile_.AddInfoString(PER_HOST_MEM_KEY, ss.str());
  }
  if (query_exec_request.__isset.per_host_vcores) {
    stringstream ss;
    ss << query_exec_request.per_host_vcores;
    summary_profile_.AddInfoString(PER_HOST_VCORES_KEY, ss.str());
  }
  if (!query_exec_request.query_ctx.__isset.parent_query_id &&
      query_exec_request.query_ctx.__isset.tables_missing_stats &&
      !query_exec_request.query_ctx.tables_missing_stats.empty()) {
    stringstream ss;
    const vector<TTableName>& tbls = query_exec_request.query_ctx.tables_missing_stats;
    for (int i = 0; i < tbls.size(); ++i) {
      if (i != 0) ss << ",";
      ss << tbls[i].db_name << "." << tbls[i].table_name;
    }
    summary_profile_.AddInfoString(TABLES_MISSING_STATS_KEY, ss.str());
  }

  // 如果没有设置 desc_tbl,SELECT 语句没有 FROM 从句,这种情况下,
  // 查询只能有一个 fragment,而且该 fragment 应该由 coordinator执行
  // 如果设置了 desc_tbl,查询可能有也可能没有 coordinator fragment
  bool has_coordinator_fragment =
      query_exec_request.fragments[0].partition.type == TPartitionType::UNPARTITIONED;
  DCHECK(has_coordinator_fragment || query_exec_request.__isset.desc_tbl);

  if (FLAGS_enable_rm) {
    DCHECK(exec_env_->resource_broker() != NULL);
  }
  // 设置查询调度器
  schedule_.reset(new QuerySchedule(query_id(), query_exec_request,
      exec_request_.query_options, effective_user(), &summary_profile_, query_events_));
  // 设置 coordinator
  coord_.reset(new Coordinator(exec_env_, query_events_));
  // 调度
  Status status = exec_env_->scheduler()->Schedule(coord_.get(), schedule_.get());
  summary_profile_.AddInfoString("Request Pool", schedule_->request_pool());
  if (FLAGS_enable_rm) {
    if (status.ok()) {
      stringstream reservation_request_ss;
      reservation_request_ss << schedule_->reservation_request();
      summary_profile_.AddInfoString("Resource reservation request",
          reservation_request_ss.str());
    }
  }

  {
    lock_guard<mutex> l(lock_);
    RETURN_IF_ERROR(UpdateQueryStatus(status));
  }

  if (FLAGS_enable_rm && schedule_->HasReservation()) {
    // 添加已有的预留到查询信息
    stringstream reservation_ss;
    reservation_ss << *schedule_->reservation();
    summary_profile_.AddInfoString("Granted resource reservation", reservation_ss.str());
    query_events_->MarkEvent("Resources reserved");
  }
  // coordinator 开始调度执行查询
  status = coord_->Exec(*schedule_, &output_expr_ctxs_);
  {
    lock_guard<mutex> l(lock_);
    RETURN_IF_ERROR(UpdateQueryStatus(status));
  }

  profile_.AddChild(coord_->query_profile());
  return Status::OK();
}

上面函数主要的两点是 schedule 和 coordinator。首先来看 schedule,Schedule 是一个虚类,其它 schedulor 通过继承该类并实现该类的方法实现具体的调度。现在 Impala 只有一个简单的调度器 SimpleScheduler,让我们来看看它调度方法 Schedule(SimpleScheduler.h) 的具体实现:

/**
 * 填充给定查询的调度,该查询由 coord 协调
 * 根据 scan ranges 将查询执行请求的 fragments 分发给各个主机
 * 如果启用了资源管理,也会从运行查询的中央资源管理器(Yarn via Llama) 预留资源
 * 该函数会一直阻塞直到预留请求得到满足或被拒绝
 */
Status SimpleScheduler::Schedule(Coordinator* coord, QuerySchedule* schedule) {
  // 没有有效用户
  if (schedule->effective_user().empty()) {
    if (FLAGS_require_username) return Status(ERROR_USER_NOT_SPECIFIED);
    // 回退到 default 用户以便查询能继续执行
    VLOG(2) << "No user specified: using user=default";
  }
  // 使用有效用户或默认用户
  const string& user =
    schedule->effective_user().empty() ? DEFAULT_USER : schedule->effective_user();
  VLOG(3) << "user='" << user << "'";
  string pool;
  // 通过 request_pool_service_ 确定用户池和查询选项
  RETURN_IF_ERROR(GetRequestPool(user, schedule->query_options(), &pool));
  schedule->set_request_pool(pool);
  // 如果刚启动 Statestore 可能还没有更新(impalad 会向 statestore 注册)
  // 但至少已经有这个 backend
  schedule->set_num_hosts(max(num_backends_metric_->value(), 1L));

  if (!FLAGS_disable_admission_control) {
  // 准入控制器判断是否允许执行查询
    RETURN_IF_ERROR(admission_controller_->AdmitQuery(schedule));
  }
  // 判断 Impalad 是否在线
  if (ExecEnv::GetInstance()->impala_server()->IsOffline()) {
    return Status("This Impala server is offine. Please retry your query later.");
  }
  // 为调度器中的每个 scan node 计算分配的 scan ranges
  RETURN_IF_ERROR(ComputeScanRangeAssignment(schedule->request(), schedule));
  
  // 为 exec_request 中的每个 fragment 计算运行该 fragment 
  // 并保存结果到 fragment_exec_params_.hosts 的 host 实例
  ComputeFragmentHosts(schedule->request(), schedule);
  
  // 填充 schedule 中的 fragment_exec_params_
  ComputeFragmentExecParams(schedule->request(), schedule);
  
  if (!FLAGS_enable_rm) return Status::OK();
  schedule->PrepareReservationRequest(pool, user);
  const TResourceBrokerReservationRequest& reservation_request =
      schedule->reservation_request();
  if (!reservation_request.resources.empty()) {
    Status status = resource_broker_->Reserve(
        reservation_request, schedule->reservation());
    if (!status.ok()) {
      // Warn about missing table and/or column stats if necessary.
      const TQueryCtx& query_ctx = schedule->request().query_ctx;
      if(!query_ctx.__isset.parent_query_id &&
          query_ctx.__isset.tables_missing_stats &&
          !query_ctx.tables_missing_stats.empty()) {
        status.AddDetail(GetTablesMissingStatsWarning(query_ctx.tables_missing_stats));
      }
      return status;
    }
    RETURN_IF_ERROR(schedule->ValidateReservation());
    AddToActiveResourceMaps(*schedule->reservation(), coord);
  }
  return Status::OK();
}

上面 SimpleScheduler::Schedule() 函数最重要的是三个 Compute* 函数,下面分别介绍这三个函数:第一个是 ComputeScanRangeAssignment()(SimpleScheduler.h

/**
 * 为调度器中的每个 scan node 计算分配的 scan ranges
 */
Status SimpleScheduler::ComputeScanRangeAssignment(const TQueryExecRequest& exec_request,
    QuerySchedule* schedule) {
  map<TPlanNodeId, vector<TScanRangeLocations> >::const_iterator entry;
  for (entry = exec_request.per_node_scan_ranges.begin();
      entry != exec_request.per_node_scan_ranges.end(); ++entry) {
    int fragment_idx = schedule->GetFragmentIdx(entry->first);
    const TPlanFragment& fragment = exec_request.fragments[fragment_idx];
    bool exec_at_coord = (fragment.partition.type == TPartitionType::UNPARTITIONED);

    FragmentScanRangeAssignment* assignment =
        &(*schedule->exec_params())[fragment_idx].scan_range_assignment;
    RETURN_IF_ERROR(ComputeScanRangeAssignment(
        entry->first, entry->second, exec_request.host_list, exec_at_coord,
        schedule->query_options(), assignment));
    schedule->AddScanRanges(entry->second.size());
  }
  return Status::OK();
}

该函数通过调用私有重载方法 ComputeScanRangeAssignment()(SimpleScheduler.h) 实现具体的逻辑:

/**
 * 基于 scan ranges locations 列表为一个 scan node 计算 scan range
 * 如果 exec_at_coord 值为 true,所有 scan ranges 都会分配给 coord node
 */
Status SimpleScheduler::ComputeScanRangeAssignment(
    PlanNodeId node_id, const vector<TScanRangeLocations>& locations,
    const vector<TNetworkAddress>& host_list, bool exec_at_coord,
    const TQueryOptions& query_options, FragmentScanRangeAssignment* assignment) {
  // 如果启用了 cached reads,会优先使用 cached 备份而不是 非cached 备份。
  // 由于很可能只缓存了一个复制,会导致热点,这也就是能通过查询选项控制的原因
  //
  // 我们按照如下顺序进行贪婪调度:
  // cached collocated replicas > collocated replicas > remote (cached or not) replicas.
  // 停用 cached reads 的查询选项会移除第一组
  bool schedule_with_caching = !query_options.disable_cached_reads;

  // 映射 datanode host 到总的分配字节数
  // 如果 datanode 没有并列的 impalad,实际分配的字节数是:
  // "total assigned - numeric_limits<int64_t>::max()".
  unordered_map<TNetworkAddress, uint64_t> assigned_bytes_per_host;
  unordered_set<TNetworkAddress> remote_hosts;
  int64_t remote_bytes = 0L;
  int64_t local_bytes = 0L;
  int64_t cached_bytes = 0L;

  BOOST_FOREACH(const TScanRangeLocations& scan_range_locations, locations) {
    // assign this scan range to the host w/ the fewest assigned bytes
    uint64_t min_assigned_bytes = numeric_limits<uint64_t>::max();
    const TNetworkAddress* data_host = NULL;  // data server; not necessarily backend
    int volume_id = -1;
    bool is_cached = false;

    // 区分 cached 复制和 非cached 复制
    vector<const TScanRangeLocation*> cached_locations;
    if (schedule_with_caching) {
      BOOST_FOREACH(const TScanRangeLocation& location, scan_range_locations.locations) {
        // 根据查询选项和是否并列调整设置该复制是否为 cached,如果 DN 不是并列的,标记为 非cached
        // 网络传输就是该情形
        // TODO: measure this in a cluster setup. Are remote reads better with caching?
        if (location.is_cached && HasLocalBackend(host_list[location.host_idx])) {
          cached_locations.push_back(&location);
        }
      }
    }
    // 如果没有 cached 复制,根据分配的字节查找
    if (cached_locations.size() == 0) {
      BOOST_FOREACH(const TScanRangeLocation& location, scan_range_locations.locations) {
        DCHECK_LT(location.host_idx, host_list.size());
        const TNetworkAddress& replica_host = host_list[location.host_idx];
        // 通过设置一个很高的初始字节降低 非并列 数据结点的优先级
        uint64_t initial_bytes =
            HasLocalBackend(replica_host) ? 0L : numeric_limits<int64_t>::max();
        uint64_t* assigned_bytes =
            FindOrInsert(&assigned_bytes_per_host, replica_host, initial_bytes);
        // 如果有较空闲的主机,更新分配
        if (*assigned_bytes < min_assigned_bytes) {
          min_assigned_bytes = *assigned_bytes;
          data_host = &replica_host;
          volume_id = location.volume_id;
          is_cached = false;
        }
      }
    } else {
      // 根据提取的 cached local hosts 随机选择一个 cached host
      size_t rand_host = rand() % cached_locations.size();
      const TNetworkAddress& replica_host = host_list[cached_locations[rand_host]->host_idx];
      uint64_t initial_bytes = 0L;
      min_assigned_bytes = *FindOrInsert(&assigned_bytes_per_host, replica_host, initial_bytes);
      data_host = &replica_host;
      volume_id = cached_locations[rand_host]->volume_id;
      is_cached = true;
    }

    int64_t scan_range_length = 0;
    if (scan_range_locations.scan_range.__isset.hdfs_file_split) {
      scan_range_length = scan_range_locations.scan_range.hdfs_file_split.length;
    }
    bool remote_read = min_assigned_bytes >= numeric_limits<int64_t>::max();
    if (remote_read) {
      remote_bytes += scan_range_length;
      remote_hosts.insert(*data_host);
    } else {
      local_bytes += scan_range_length;
      if (is_cached) cached_bytes += scan_range_length;
    }
    assigned_bytes_per_host[*data_host] += scan_range_length;

    // 转换 datahost 为 backend host
    DCHECK(data_host != NULL);

    TNetworkAddress exec_hostport;
    if (!exec_at_coord) {
      TBackendDescriptor backend;
      RETURN_IF_ERROR(GetBackend(*data_host, &backend));
      exec_hostport = backend.address;
    } else {
      exec_hostport = MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port);
    }

    PerNodeScanRanges* scan_ranges =
        FindOrInsert(assignment, exec_hostport, PerNodeScanRanges());
    vector<TScanRangeParams>* scan_range_params_list =
        FindOrInsert(scan_ranges, node_id, vector<TScanRangeParams>());
    // 添加扫描范围
    TScanRangeParams scan_range_params;
    scan_range_params.scan_range = scan_range_locations.scan_range;
    // 显式设置 optional 域
    scan_range_params.__set_volume_id(volume_id);
    scan_range_params.__set_is_cached(is_cached);
    scan_range_params.__set_is_remote(remote_read);
    scan_range_params_list->push_back(scan_range_params);
  }

  if (VLOG_FILE_IS_ON) {
    VLOG_FILE << "Total remote scan volume = " <<
        PrettyPrinter::Print(remote_bytes, TUnit::BYTES);
    VLOG_FILE << "Total local scan volume = " <<
        PrettyPrinter::Print(local_bytes, TUnit::BYTES);
    VLOG_FILE << "Total cached scan volume = " <<
        PrettyPrinter::Print(cached_bytes, TUnit::BYTES);
    if (remote_hosts.size() > 0) {
      stringstream remote_node_log;
      remote_node_log << "Remote data node list: ";
      BOOST_FOREACH(const TNetworkAddress& remote_host, remote_hosts) {
        remote_node_log << remote_host << " ";
      }
    }

    BOOST_FOREACH(FragmentScanRangeAssignment::value_type& entry, *assignment) {
      VLOG_FILE << "ScanRangeAssignment: server=" << ThriftDebugString(entry.first);
      BOOST_FOREACH(PerNodeScanRanges::value_type& per_node_scan_ranges, entry.second) {
        stringstream str;
        BOOST_FOREACH(TScanRangeParams& params, per_node_scan_ranges.second) {
          str << ThriftDebugString(params) << " ";
        }
        VLOG_FILE << "node_id=" << per_node_scan_ranges.first << " ranges=" << str.str();
      }
    }
  }

  return Status::OK();
}

第二个是 ComputeFragmentHosts()(SimpleScheduler.h),它为每个 PlanFragment 找到执行所在的 backend instance。如果一个 PlanFragment 是 UNPARTITIONED,那么就在这个 Coordinator 所在的 host 上运行;如果一个 PlanFragment 含有 ScanNode,那么就调度这个 PlanFragment 到 HDFS/HBase 数据块所在的那些 DataNodes 上,也就是这些 DataNodes 就成为了执行这个 Query 的 backend instance。

/**
 * 为 exec_request 中的每个 fragment 计算运行该 fragment 
 * 并保存结果到 fragment_exec_params_.hosts 的 host 实例
 */
void SimpleScheduler::ComputeFragmentHosts(const TQueryExecRequest& exec_request,
    QuerySchedule* schedule) {
  vector<FragmentExecParams>* fragment_exec_params = schedule->exec_params();
  TNetworkAddress coord = MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port);
  DCHECK_EQ(fragment_exec_params->size(), exec_request.fragments.size());
  vector<TPlanNodeType::type> scan_node_types;
  scan_node_types.push_back(TPlanNodeType::HDFS_SCAN_NODE);
  scan_node_types.push_back(TPlanNodeType::HBASE_SCAN_NODE);
  scan_node_types.push_back(TPlanNodeType::DATA_SOURCE_NODE);

  // 先为 producer fragment 计算 hosts,consumer fragment(s) 的 hosts 可能
  // 继承前者的 hosts 集合
  for (int i = exec_request.fragments.size() - 1; i >= 0; --i) {
    const TPlanFragment& fragment = exec_request.fragments[i];
    FragmentExecParams& params = (*fragment_exec_params)[i];
    if (fragment.partition.type == TPartitionType::UNPARTITIONED) {
      // 所有单节点 fragments 都在 coordinator 主机上运行
      params.hosts.push_back(coord);
      continue;
    }

    // UnionNodes 比较特殊,它们可能需要多个 partitioned 输入,
    // 以及在相同 fragment 上执行多次扫描。
    // 包含 UnionNode 的 Fragments 会在所有 fragment 的 scan 主机集合
    // 以及所有输入 fragments 的主机上执行(s.t.
    // a UnionNode with partitioned joins or grouping aggregates as children runs on
    // at least as many hosts as the input to those children).
    if (ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE)) {
      vector<TPlanNodeId> scan_nodes;
      FindNodes(fragment.plan, scan_node_types, &scan_nodes);
      vector<TPlanNodeId> exch_nodes;
      FindNodes(fragment.plan,
          vector<TPlanNodeType::type>(1, TPlanNodeType::EXCHANGE_NODE),
          &exch_nodes);

      // 添加 scan nodes 的主机
      vector<TNetworkAddress> scan_hosts;
      for (int j = 0; j < scan_nodes.size(); ++j) {
        GetScanHosts(scan_nodes[j], exec_request, params, &scan_hosts);
      }
      unordered_set<TNetworkAddress> hosts(scan_hosts.begin(), scan_hosts.end());

      // 所有输入 fragments 的主机
      for (int j = 0; j < exch_nodes.size(); ++j) {
        int input_fragment_idx = FindSenderFragment(exch_nodes[j], i, exec_request);
        const vector<TNetworkAddress>& input_fragment_hosts =
            (*fragment_exec_params)[input_fragment_idx].hosts;
        hosts.insert(input_fragment_hosts.begin(), input_fragment_hosts.end());
      }
      DCHECK(!hosts.empty()) << "no hosts for fragment " << i << " with a UnionNode";

      params.hosts.assign(hosts.begin(), hosts.end());
      continue;
    }

    PlanNodeId leftmost_scan_id = FindLeftmostNode(fragment.plan, scan_node_types);
    if (leftmost_scan_id == g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) {
      // 没有 leftmost scan;我们把相同的 hosts 设置为 leftmost input fragment 的主机
      // (因此一个 partitioned aggregation fragment 会在提供输入数据的 hosts 上运行)
      int input_fragment_idx = FindLeftmostInputFragment(i, exec_request);
      DCHECK_GE(input_fragment_idx, 0);
      DCHECK_LT(input_fragment_idx, fragment_exec_params->size());
      params.hosts = (*fragment_exec_params)[input_fragment_idx].hosts;
      // 如果 input fragment 是 unpartitioned,切换到 unpartitioned/coord execution
      //  (可能从分布式中降级)
      continue;
    }

    // 该 fragment 在 scan rangs 包括了 leftmost scan 的主机上运行
    GetScanHosts(leftmost_scan_id, exec_request, params, &params.hosts);
  }

  unordered_set<TNetworkAddress> unique_hosts;
  BOOST_FOREACH(const FragmentExecParams& exec_params, *fragment_exec_params) {
    unique_hosts.insert(exec_params.hosts.begin(), exec_params.hosts.end());
  }

  schedule->SetUniqueHosts(unique_hosts);
}

最后一个是 ComputeFragmentExecParams()(SimpleScheduler.h),它计算 TQueryExecRequest.fragments 中每个 PlanFragment 会在哪些 hosts 上得到执行,填充到 fragment_exec_params_ 中。并依次给每个 PlanFragment 执行的每个 host 分配一个instance_id。填充每个 FragmentExecParams 的 destinations(即 Data Sink 的目的地 PlanFragment)和 per_exch_num_senders(这个 ExchangeNode 会接收来自多少个 PlanFragment 的数据)。

/**
 * 填充 schedule 中的 fragment_exec_params_
 */
void SimpleScheduler::ComputeFragmentExecParams(const TQueryExecRequest& exec_request,
    QuerySchedule* schedule) {
  vector<FragmentExecParams>* fragment_exec_params = schedule->exec_params();
  // 设置实例 id
  int64_t num_backends = 0;
  BOOST_FOREACH(FragmentExecParams& params, *fragment_exec_params) {
    for (int j = 0; j < params.hosts.size(); ++j) {
      int instance_num = num_backends + j;
      // 我们将 instance_num 添加到 query_id.lo 以创建全局唯一的实例 id
      TUniqueId instance_id;
      instance_id.hi = schedule->query_id().hi;
      DCHECK_LT(
          schedule->query_id().lo, numeric_limits<int64_t>::max() - instance_num - 1);
      instance_id.lo = schedule->query_id().lo + instance_num + 1;
      params.instance_ids.push_back(instance_id);
    }
    num_backends += params.hosts.size();
  }
  if (exec_request.fragments[0].partition.type == TPartitionType::UNPARTITIONED) {
    // 根 fragment 由 coordinator 直接执行
    --num_backends;
  }
  schedule->set_num_backends(num_backends);

  // 为每个 exchange node 计算目的地和发送者数目
  // (根 fragment 没有 destination)
  for (int i = 1; i < fragment_exec_params->size(); ++i) {
    FragmentExecParams& params = (*fragment_exec_params)[i];
    int dest_fragment_idx = exec_request.dest_fragment_idx[i - 1];
    DCHECK_LT(dest_fragment_idx, fragment_exec_params->size());
    FragmentExecParams& dest_params = (*fragment_exec_params)[dest_fragment_idx];

    // 设置发送者数目
    DCHECK(exec_request.fragments[i].output_sink.__isset.stream_sink);
    const TDataStreamSink& sink = exec_request.fragments[i].output_sink.stream_sink;
    // 此时我们只能处理 unpartitioned (= broadcast), random-partitioned 或
    // hash-partitioned 输出
    DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED
           || sink.output_partition.type == TPartitionType::HASH_PARTITIONED
           || sink.output_partition.type == TPartitionType::RANDOM);
    PlanNodeId exch_id = sink.dest_node_id;
    // 可能会有多个 fragments 发送到该 exchange node
    // (分布式 MERGE), 这就是需要发送者数目的原因
    params.sender_id_base = dest_params.per_exch_num_senders[exch_id];
    dest_params.per_exch_num_senders[exch_id] += params.hosts.size();

    // 为每个目标主机创建一个 TPlanFragmentDestination
    params.destinations.resize(dest_params.hosts.size());
    for (int j = 0; j < dest_params.hosts.size(); ++j) {
      TPlanFragmentDestination& dest = params.destinations[j];
      dest.fragment_instance_id = dest_params.instance_ids[j];
      dest.server = dest_params.hosts[j];
      VLOG_RPC  << "dest for fragment " << i << ":"
                << " instance_id=" << dest.fragment_instance_id
                << " server=" << dest.server;
    }
  }
}

现在我们已经介绍了Query/DML 请求类型schedule 部分,现在来看看 coordinator 部分。Coordinator 协调执行查询主要通过 Coordinator.Exec()(Coordinator.h) 函数实现,现在让我们看看该函数的具体情况:

/**
 * 用指定的调度初始化异步执行查询
 * 当所有 plan fragments 都在对应的 backend 开始执行时返回
 * ‘schedule’ 必须至少包含一个 Coordinator plan fragment
 * 如果有的话,填充并准备 Coordinator fragment 的 output_expr_ctxs
 * 并和 fragment 的其它 exprs 一起进行 LLVM 优化
 * Exec() 必须必其它成员函数之前调用
 */
Status Coordinator::Exec(QuerySchedule& schedule,
    vector<ExprContext*>* output_expr_ctxs) {
  const TQueryExecRequest& request = schedule.request();
  DCHECK_GT(request.fragments.size(), 0);
  needs_finalization_ = request.__isset.finalize_params;
  if (needs_finalization_) {
    finalize_params_ = request.finalize_params;
  }

  VLOG_QUERY << "Exec() query_id=" << schedule.query_id();
  stmt_type_ = request.stmt_type;
  query_id_ = schedule.query_id();
  desc_tbl_ = request.desc_tbl;
  query_ctx_ = request.query_ctx;

  query_profile_.reset(
      new RuntimeProfile(obj_pool(), "Execution Profile " + PrintId(query_id_)));
  finalization_timer_ = ADD_TIMER(query_profile_, "FinalizationTimer");

  SCOPED_TIMER(query_profile_->total_time_counter());

  vector<FragmentExecParams>* fragment_exec_params = schedule.exec_params();
  TNetworkAddress coord = MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port);

  // 为了简单,直到所有 plan fragments 执行都初始化了才异步调用 Cancle(),
  // 否则我们可能在 fragment 执行还没在 backend 启动时就尝试取消
  lock_guard<mutex> l(lock_);

  // 如果 root fragment 是 unpartitioned,那么在本地运行
  bool has_coordinator_fragment =
      request.fragments[0].partition.type == TPartitionType::UNPARTITIONED;

  // 如果有 Coordinator PlanFragment,那么先 new PlanFragmentExecutor() 生成
  // 这个 PlanFragment 所对应的 PlanFragmentExecutor。然后填充其对应的 TExecPlanFragmentParams。
  if (has_coordinator_fragment) {
    executor_.reset(new PlanFragmentExecutor(
            exec_env_, PlanFragmentExecutor::ReportStatusCallback()));
    // 如果请求一个 Coordinator fragment(多数查询都是这种情况,除了并行 INSERT 查询),
    // backend threads 在启动任何其它 plan fragments 之前先启动这个 fragment,
    // 否则的话本地 exchange node 在注册到 stream mgr 之前其它 plan fragments 就开始发送数据
    TExecPlanFragmentParams rpc_params;
    SetExecPlanFragmentParams(schedule, 0, request.fragments[0], 0,
        (*fragment_exec_params)[0], 0, coord, &rpc_params);
    // 调用 PlanFragmentExecutor.Prepare() 准备执行
    RETURN_IF_ERROR(executor_->Prepare(rpc_params));

    // 在优化 LLVM 模块之前准备 output_expr_ctxs。该 Coordinator fragment 的其它
    // exprs 已经在 executor_->Prepare() 准备好了
    DCHECK(output_expr_ctxs != NULL);
    
    // 在 ‘pool’ 中从 output_exprs 中包括的 list of  nodes 创建表达式树
    // 在 output_expr_ctxs 中返回对应的 ExprContexts
    RETURN_IF_ERROR(Expr::CreateExprTrees(
        runtime_state()->obj_pool(), request.fragments[0].output_exprs,
        output_expr_ctxs));
    MemTracker* output_expr_tracker = runtime_state()->obj_pool()->Add(new MemTracker(
        -1, -1, "Output exprs", runtime_state()->instance_mem_tracker(), false));
    RETURN_IF_ERROR(Expr::Prepare(
        *output_expr_ctxs, runtime_state(), row_desc(), output_expr_tracker));
  } else {
    // 尽管没有 coordinator fragment,Coordinator 实例还是需要一个 mem tracker
    // 例如 result-caching 通过查询 mem tracker 跟踪 memory。
    // 如果有 fragment,上面创建的 fragment executor 会初始化 query mem tracker,
    // 否则, 在这里创建 query mem tracker。
    int64_t query_limit = -1;
    if (query_ctx_.request.query_options.__isset.mem_limit &&
        query_ctx_.request.query_options.mem_limit > 0) {
      query_limit = query_ctx_.request.query_options.mem_limit;
    }
    MemTracker* pool_tracker = MemTracker::GetRequestPoolMemTracker(
        schedule.request_pool(), exec_env_->process_mem_tracker());
    query_mem_tracker_ =
        MemTracker::GetQueryMemTracker(query_id_, query_limit, -1, pool_tracker, NULL);

    executor_.reset(NULL);
  }

  // 初始化执行配置文件结构体
  InitExecProfile(request);

  DebugOptions debug_options;
  ProcessQueryOptions(schedule.query_options(), &debug_options);

  // 从左至右启动 fragment 实例,以便发送者开始发送之前接收者已经准备好了
  backend_exec_states_.resize(schedule.num_backends());
  num_remaining_backends_ = schedule.num_backends();
  VLOG_QUERY << "starting " << schedule.num_backends()
             << " backends for query " << query_id_;

  query_events_->MarkEvent("Ready to start remote fragments");
  int backend_num = 0;

  // TODO: 添加一个运行时执行信息统计机制,
  // 使得这里不需要创建一个没注册的 TMetricDef
  TMetricDef md;
  md.__set_key("fragment-latencies");
  md.__set_units(TUnit::TIME_NS);
  md.__set_kind(TMetricKind::STATS);
  StatsMetric<double> latencies(md);
  // 下面是个双层循环:外层遍历 PlanFragment,内层遍历 backend instance,
  // 生成与每个 instance 关联的 BackendExecState(主要是生成 TExecPlanFragmentParams
  // 用于 Coordinator 与多个 backend instance 交互时的参数),并加入 backend_exec_states_ 列表,
  // 用于 Coordinator 对所有的 backend instance 执行状况的管理。
  for (int fragment_idx = (has_coordinator_fragment ? 1 : 0);
       fragment_idx < request.fragments.size(); ++fragment_idx) {
    const FragmentExecParams& params = (*fragment_exec_params)[fragment_idx];

    // 设置执行状态
    int num_hosts = params.hosts.size();
    DCHECK_GT(num_hosts, 0);
    for (int instance_idx = 0; instance_idx < num_hosts; ++instance_idx) {
      DebugOptions* backend_debug_options =
          (debug_options.phase != TExecNodePhase::INVALID
            && (debug_options.backend_num == -1
                || debug_options.backend_num == backend_num)
            ? &debug_options
            : NULL);
      // TODO: pool of pre-formatted BackendExecStates?
      BackendExecState* exec_state =
          obj_pool()->Add(new BackendExecState(schedule, this, coord, backend_num,
              request.fragments[fragment_idx], fragment_idx,
              params, instance_idx, backend_debug_options, obj_pool()));
      backend_exec_states_[backend_num] = exec_state;
      ++backend_num;
      VLOG(2) << "Exec(): starting instance: fragment_idx=" << fragment_idx
              << " instance_id=" << params.instance_ids[instance_idx];
    }
    fragment_profiles_[fragment_idx].num_instances = num_hosts;

    // 并行启动所有 rpcs,向每个 instance 发起 RPC 请求开始执行
    // 用 num_hosts 个线程并行调用 Coordinator::ExecRemoteFragment
    // 执行的延迟分布会记录到 latencies
    Status fragments_exec_status = ParallelExecutor::Exec(
        bind<Status>(mem_fn(&Coordinator::ExecRemoteFragment), this, _1),
        reinterpret_cast<void**>(&backend_exec_states_[backend_num - num_hosts]),
        num_hosts, &latencies);

    if (!fragments_exec_status.ok()) {
      DCHECK(query_status_.ok());  // nobody should have been able to cancel
      query_status_ = fragments_exec_status;
      // tear down running fragments and return
      CancelInternal();
      return fragments_exec_status;
    }
  }

  query_events_->MarkEvent("Remote fragments started");
  query_profile_->AddInfoString("Fragment start latencies",
      latencies.ToHumanReadable());

  // 如果我们有一个 Coordinator fragment 和 remote fragment(通常情况),
  // 释放 Coordinator fragment 上的线程令牌。该 fragment 用大部分时间等待而完成很少的工作。 
  // 保留该令牌会导致机器低使用率。如果该节点有 12 个查询,就会无故保存 12 个令牌
  if (has_coordinator_fragment && request.fragments.size() > 1) {
    executor_->ReleaseThreadToken();
  }

  PrintBackendInfo();

  stringstream ss;
  ss << "Query " << query_id_;
  progress_ = ProgressUpdater(ss.str(), schedule.num_scan_ranges());

  return Status::OK();
}

每个 Coordinator,PlanFragmentExecutor 和 ExecNode 都会有一个 RuntimeProfile,所有的 RuntimeProfile 会构成树状结构来记录每个执行节点的执行过程中的信息。在 Coordinator 有个成员变量 boost::scoped_ptr query_profile_ 用于表示这个 query 过程中的所有的 profile 信息。每个 Coordinator 还有个 aggregate_profile_ 专门负责 aggregate 相关的 profile。上面的函数中有四个重要的函数调用,分别是 PlanFragmentExecutor.Prepare(), Expr::CreateExprTrees() ,Expr::Prepare() 和 ParallelExecutor::Exec()。下面我们分别介绍这几个函数。PlanFragmentExecutor.Prepare():

/**
 * 准备执行。在 Open() 之前调用
 * 该调用不会阻塞
 * runtime_state() row_desc() 无效,直到调用了 Prepare()
 * 如果 request.query_options.mem_limit > 0,是该查询运行时消耗的字节数的大概限制
 * 如果超过该限制,查询会退出(MEM_LIMIT_EXCEEDED)
 */
Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) {
  fragment_sw_.Start();
  const TPlanFragmentExecParams& params = request.params;
  query_id_ = request.fragment_instance_ctx.query_ctx.query_id;

  VLOG_QUERY << "Prepare(): query_id=" << PrintId(query_id_) << " instance_id="
             << PrintId(request.fragment_instance_ctx.fragment_instance_id);
  VLOG(2) << "params:\n" << ThriftDebugString(params);

  if (request.__isset.reserved_resource) {
    VLOG_QUERY << "Executing fragment in reserved resource:\n"
               << request.reserved_resource;
  }

  string cgroup = "";
  if (FLAGS_enable_rm && request.__isset.reserved_resource) {
    cgroup = exec_env_->cgroups_mgr()->UniqueIdToCgroup(PrintId(query_id_, "_"));
  }

  runtime_state_.reset(
      new RuntimeState(request, cgroup, exec_env_));

  // total_time_counter() is in the runtime_state_ so start it up now.
  SCOPED_TIMER(profile()->total_time_counter());

  // 设置完 runtime_state_ 后注册,以便保证正确的清除
  if (FLAGS_enable_rm && !cgroup.empty() && request.__isset.reserved_resource) {
    bool is_first;
    // 注册 fragment
    RETURN_IF_ERROR(exec_env_->cgroups_mgr()->RegisterFragment(
        request.fragment_instance_ctx.fragment_instance_id, cgroup, &is_first));
    // 使用 cgroup 的第一个 fragment 基于预留的资源设置 cgroup 的 CPU 共享
    if (is_first) {
      DCHECK(request.__isset.reserved_resource);
      int32_t cpu_shares = exec_env_->cgroups_mgr()->VirtualCoresToCpuShares(
          request.reserved_resource.v_cpu_cores);
      RETURN_IF_ERROR(exec_env_->cgroups_mgr()->SetCpuShares(cgroup, cpu_shares));
    }
  }

  // TODO: Find the reservation id when the resource request is not set
  if (FLAGS_enable_rm && request.__isset.reserved_resource) {
    TUniqueId reservation_id;
    reservation_id << request.reserved_resource.reservation_id;

    // TODO: Combine this with RegisterFragment() etc.
    QueryResourceMgr* res_mgr;
    bool is_first = exec_env_->resource_broker()->GetQueryResourceMgr(query_id_,
        reservation_id, request.local_resource_address, &res_mgr);
    DCHECK(res_mgr != NULL);
    runtime_state_->SetQueryResourceMgr(res_mgr);
    if (is_first) {
      runtime_state_->query_resource_mgr()->InitVcoreAcquisition(
          request.reserved_resource.v_cpu_cores);
    }
  }

  // reservation or a query option.
  int64_t bytes_limit = -1;
  if (runtime_state_->query_options().__isset.mem_limit &&
      runtime_state_->query_options().mem_limit > 0) {
    bytes_limit = runtime_state_->query_options().mem_limit;
    VLOG_QUERY << "Using query memory limit from query options: "
               << PrettyPrinter::Print(bytes_limit, TUnit::BYTES);
  }

  int64_t rm_reservation_size_bytes = -1;
  if (request.__isset.reserved_resource && request.reserved_resource.memory_mb > 0) {
    rm_reservation_size_bytes =
        static_cast<int64_t>(request.reserved_resource.memory_mb) * 1024L * 1024L;
    // 使用超过 hard limit 的查询会被杀死,因此预留比限制更大的资源并无用处,
    // 限制预留字节数为 hard limit
    if (rm_reservation_size_bytes > bytes_limit && bytes_limit != -1) {
      runtime_state_->LogError(ErrorMsg(TErrorCode::FRAGMENT_EXECUTOR,
          PrettyPrinter::PrintBytes(rm_reservation_size_bytes),
          PrettyPrinter::PrintBytes(bytes_limit)));
      rm_reservation_size_bytes = bytes_limit;
    }
    VLOG_QUERY << "Using RM reservation memory limit from resource reservation: "
               << PrettyPrinter::Print(rm_reservation_size_bytes, TUnit::BYTES);
  }

  DCHECK(!params.request_pool.empty());
  runtime_state_->InitMemTrackers(query_id_, &params.request_pool,
      bytes_limit, rm_reservation_size_bytes);
  RETURN_IF_ERROR(runtime_state_->CreateBlockMgr());

  // 从 pool 中预留一个主线程
  runtime_state_->resource_pool()->AcquireThreadToken();
  if (runtime_state_->query_resource_mgr() != NULL) {
    runtime_state_->query_resource_mgr()->NotifyThreadUsageChange(1);
  }
  has_thread_token_ = true;

  average_thread_tokens_ = profile()->AddSamplingCounter("AverageThreadTokens",
      bind<int64_t>(mem_fn(&ThreadResourceMgr::ResourcePool::num_threads),
          runtime_state_->resource_pool()));
  mem_usage_sampled_counter_ = profile()->AddTimeSeriesCounter("MemoryUsage",
      TUnit::BYTES,
      bind<int64_t>(mem_fn(&MemTracker::consumption),
          runtime_state_->instance_mem_tracker()));
  thread_usage_sampled_counter_ = profile()->AddTimeSeriesCounter("ThreadUsage",
      TUnit::UNIT,
      bind<int64_t>(mem_fn(&ThreadResourceMgr::ResourcePool::num_threads),
          runtime_state_->resource_pool()));

  // 设置 desc tbl
  DescriptorTbl* desc_tbl = NULL;
  DCHECK(request.__isset.desc_tbl);
  RETURN_IF_ERROR(
      DescriptorTbl::Create(obj_pool(), request.desc_tbl, &desc_tbl));
  runtime_state_->set_desc_tbl(desc_tbl);
  VLOG_QUERY << "descriptor table for fragment="
             << request.fragment_instance_ctx.fragment_instance_id
             << "\n" << desc_tbl->DebugString();

  // 设置 plan
  DCHECK(request.__isset.fragment);
  RETURN_IF_ERROR(
      ExecNode::CreateTree(obj_pool(), request.fragment.plan, *desc_tbl, &plan_));
  runtime_state_->set_fragment_root_id(plan_->id());

  if (request.params.__isset.debug_node_id) {
    DCHECK(request.params.__isset.debug_action);
    DCHECK(request.params.__isset.debug_phase);
    ExecNode::SetDebugOptions(request.params.debug_node_id,
        request.params.debug_phase, request.params.debug_action, plan_);
  }

  // 调用 Prepare() 之前设置 exchange nodes 的发送者数目
  vector<ExecNode*> exch_nodes;
  plan_->CollectNodes(TPlanNodeType::EXCHANGE_NODE, &exch_nodes);
  BOOST_FOREACH(ExecNode* exch_node, exch_nodes)
  {
    DCHECK_EQ(exch_node->type(), TPlanNodeType::EXCHANGE_NODE);
    int num_senders = FindWithDefault(params.per_exch_num_senders,
        exch_node->id(), 0);
    DCHECK_GT(num_senders, 0);
    static_cast<ExchangeNode*>(exch_node)->set_num_senders(num_senders);
  }

  // 设置 scan ranges
  vector<ExecNode*> scan_nodes;
  vector<TScanRangeParams> no_scan_ranges;
  plan_->CollectScanNodes(&scan_nodes);
  for (int i = 0; i < scan_nodes.size(); ++i) {
    ScanNode* scan_node = static_cast<ScanNode*>(scan_nodes[i]);
    const vector<TScanRangeParams>& scan_ranges = FindWithDefault(
        params.per_node_scan_ranges, scan_node->id(), no_scan_ranges);
    scan_node->SetScanRanges(scan_ranges);
  }

  RuntimeProfile::Counter* prepare_timer = ADD_TIMER(profile(), "PrepareTime");
  {
    SCOPED_TIMER(prepare_timer);
    RETURN_IF_ERROR(plan_->Prepare(runtime_state_.get()));
  }

  PrintVolumeIds(params.per_node_scan_ranges);

  // 如果需要的话设置 sink
  if (request.fragment.__isset.output_sink) {
    RETURN_IF_ERROR(DataSink::CreateDataSink(
        obj_pool(), request.fragment.output_sink, request.fragment.output_exprs,
        params, row_desc(), &sink_));
    RETURN_IF_ERROR(sink_->Prepare(runtime_state()));

    RuntimeProfile* sink_profile = sink_->profile();
    if (sink_profile != NULL) {
      profile()->AddChild(sink_profile);
    }
  } else {
    sink_.reset(NULL);
  }

  // 设置 profile 计数器
  profile()->AddChild(plan_->runtime_profile());
  rows_produced_counter_ =
      ADD_COUNTER(profile(), "RowsProduced", TUnit::UNIT);
  per_host_mem_usage_ =
      ADD_COUNTER(profile(), PER_HOST_PEAK_MEM_COUNTER, TUnit::BYTES);

  row_batch_.reset(new RowBatch(plan_->row_desc(), runtime_state_->batch_size(),
        runtime_state_->instance_mem_tracker()));
  VLOG(2) << "plan_root=\n" << plan_->DebugString();
  prepared_ = true;
  return Status::OK();
}

上面的函数调用了 ExecNode::CreateTree(),该函数通过深度优先遍历从 plan 包含的 list of nodes 中创建 exec node tree,具体实现如下:

/**
 * 通过深度优先遍历从 plan 包含的 list of nodes 中创建 exec node tree
 * 所有结点都保存在 pool 中
 * 如果 plan 崩溃,返回 error,否则成功
 */
Status ExecNode::CreateTree(ObjectPool* pool, const TPlan& plan,
                            const DescriptorTbl& descs, ExecNode** root) {
  if (plan.nodes.size() == 0) {
    *root = NULL;
    return Status::OK();
  }
  int node_idx = 0;
  // 调用 Protected 函数 CreateTreeHelper 实现具体逻辑
  Status status = CreateTreeHelper(pool, plan.nodes, descs, NULL, &node_idx, root);
  if (status.ok() && node_idx + 1 != plan.nodes.size()) {
    status = Status(
        "Plan tree only partially reconstructed. Not all thrift nodes were used.");
  }
  if (!status.ok()) {
    LOG(ERROR) << "Could not construct plan tree:\n"
               << apache::thrift::ThriftDebugString(plan);
  }
  return status;
}

CreateTree() 通过调用 Protected 函数 CreateTreeHelper() 实现具体逻辑:

Status ExecNode::CreateTreeHelper(
    ObjectPool* pool,
    const vector<TPlanNode>& tnodes,
    const DescriptorTbl& descs,
    ExecNode* parent,
    int* node_idx,
    ExecNode** root) {
  // 初始 node_idx 比结点数目还大,传播错误信息
  if (*node_idx >= tnodes.size()) {
    return Status("Failed to reconstruct plan tree from thrift.");
  }
  const TPlanNode& tnode = tnodes[*node_idx];

  int num_children = tnode.num_children;
  ExecNode* node = NULL;
  RETURN_IF_ERROR(CreateNode(pool, tnode, descs, &node));
  if (parent != NULL) {
  // 根节点
    parent->children_.push_back(node);
  } else {
    *root = node;
  }
  for (int i = 0; i < num_children; ++i) {
    ++*node_idx;
    // 对每个孩子递归调用自身
    RETURN_IF_ERROR(CreateTreeHelper(pool, tnodes, descs, node, node_idx, NULL));
    // we are expecting a child, but have used all nodes
    // this means we have been given a bad tree and must fail
    if (*node_idx >= tnodes.size()) {
      return Status("Failed to reconstruct plan tree from thrift.");
    }
  }

再来看看 coordinator 的第二个重要函数调用 Expr::CreateExprTrees(),该函数在 ‘pool’ 中根据给定的 TExprs 向量创建包含 exprs 的 ExprContexts 向量。详细代码如下:

/**
 * 在 ‘pool’ 中根据给定的 TExprs 向量创建包含 exprs 的 ExprContexts 向量
 * 如果任何单独的转换出错,返回 error,否则 OK
 */
Status Expr::CreateExprTrees(ObjectPool* pool, const vector<TExpr>& texprs,
                             vector<ExprContext*>* ctxs) {
  ctxs->clear();
  for (int i = 0; i < texprs.size(); ++i) {
    ExprContext* ctx;
    RETURN_IF_ERROR(CreateExprTree(pool, texprs[i], &ctx));
    ctxs->push_back(ctx);
  }
  return Status::OK();
}

CreateExprTrees() 通过调用 CreateExprTree() 实现具体逻辑,后者详细代码如下:

/**
 * 在 ‘pool’ 中从 texpr 中包含的 list of nodes 创建表达式树
 * 返回 expr 中的根表达式以及在 ‘ctx’ 中返回对应的 ExprContext
 */
Status Expr::CreateExprTree(ObjectPool* pool, const TExpr& texpr, ExprContext** ctx) {
  // 输入为空
  if (texpr.nodes.size() == 0) {
    *ctx = NULL;
    return Status::OK();
  }
  int node_idx = 0;
  Expr* e;
  Status status = CreateTreeFromThrift(pool, texpr.nodes, NULL, &node_idx, &e, ctx);
  if (status.ok() && node_idx + 1 != texpr.nodes.size()) {
    status = Status(
        "Expression tree only partially reconstructed. Not all thrift nodes were used.");
  }
  if (!status.ok()) {
    LOG(ERROR) << "Could not construct expr tree.\n" << status.GetDetail() << "\n"
               << apache::thrift::ThriftDebugString(texpr);
  }
  return status;
}

实际表达式树的创建由 CreateTreeFromThrift() 完成:

/**
 * 通过深度优先遍历为以 ‘node_idx’ 为根的结点创建 expr tree
 * 参数:
 *  nodes:需要转换的 thrift 表达式结点向量
 *  parent:node_idx 处结点的 parent(or NULL for node_idx == 0)
 *  node_idx:
 *      in:TExprNode tree 的根
 *      out:nodes 中不是 tree 一部分的下一个结点
 *  root_expr:out:构建的 expr 树的根
 *  ctx:out:构建的 expr 树的 context
 *  return:
 *      如果成功返回 status.ok()
 *      如果树不一致或崩溃,返回!status.ok()
 */
Status Expr::CreateTreeFromThrift(ObjectPool* pool, const vector<TExprNode>& nodes,
    Expr* parent, int* node_idx, Expr** root_expr, ExprContext** ctx) {
  // 传播错误消息
  if (*node_idx >= nodes.size()) {
    return Status("Failed to reconstruct expression tree from thrift.");
  }
  int num_children = nodes[*node_idx].num_children;
  Expr* expr = NULL;
  // 调用 CreateExpr 创建 Expr
  RETURN_IF_ERROR(CreateExpr(pool, nodes[*node_idx], &expr));
  DCHECK(expr != NULL);
  if (parent != NULL) {
    parent->AddChild(expr);
  } else {
    DCHECK(root_expr != NULL);
    DCHECK(ctx != NULL);
    *root_expr = expr;
    *ctx = pool->Add(new ExprContext(expr));
  }
  for (int i = 0; i < num_children; i++) {
    *node_idx += 1;
    // 递归调用自身实现深度优先遍历
    RETURN_IF_ERROR(CreateTreeFromThrift(pool, nodes, expr, node_idx, NULL, NULL));
    // we are expecting a child, but have used all nodes
    // this means we have been given a bad tree and must fail
    if (*node_idx >= nodes.size()) {
      return Status("Failed to reconstruct expression tree from thrift.");
    }
  }
  return Status::OK();
}

CreateTreeFromThrift() 调用 CreateExpr() 生成一个新的 Expr,具体实现如下:

/**
 * 在 ‘pool’ 中根据 texpr_node.node_type 创建一个新的 Expr
 */
Status Expr::CreateExpr(ObjectPool* pool, const TExprNode& texpr_node, Expr** expr) {
  switch (texpr_node.node_type) {
    // 字面常量
    case TExprNodeType::BOOL_LITERAL:
    case TExprNodeType::FLOAT_LITERAL:
    case TExprNodeType::INT_LITERAL:
    case TExprNodeType::STRING_LITERAL:
    case TExprNodeType::DECIMAL_LITERAL:
      *expr = pool->Add(new Literal(texpr_node));
      return Status::OK();
    // case 表达式
    case TExprNodeType::CASE_EXPR:
      if (!texpr_node.__isset.case_expr) {
        return Status("Case expression not set in thrift node");
      }
      *expr = pool->Add(new CaseExpr(texpr_node));
      return Status::OK();
    // And Or Not 复合谓词
    case TExprNodeType::COMPOUND_PRED:
      if (texpr_node.fn.name.function_name == "and") {
        *expr = pool->Add(new AndPredicate(texpr_node));
      } else if (texpr_node.fn.name.function_name == "or") {
        *expr = pool->Add(new OrPredicate(texpr_node));
      } else {
        DCHECK_EQ(texpr_node.fn.name.function_name, "not");
        *expr = pool->Add(new ScalarFnCall(texpr_node));
      }
      return Status::OK();
    // 空字面常量
    case TExprNodeType::NULL_LITERAL:
      *expr = pool->Add(new NullLiteral(texpr_node));
      return Status::OK();
    // 引用
    case TExprNodeType::SLOT_REF:
      if (!texpr_node.__isset.slot_ref) {
        return Status("Slot reference not set in thrift node");
      }
      *expr = pool->Add(new SlotRef(texpr_node));
      return Status::OK();
    // 空元组谓词
    case TExprNodeType::TUPLE_IS_NULL_PRED:
      *expr = pool->Add(new TupleIsNullPredicate(texpr_node));
      return Status::OK();
    // 函数调用
    case TExprNodeType::FUNCTION_CALL:
      if (!texpr_node.__isset.fn) {
        return Status("Function not set in thrift node");
      }
      // 特殊情况的函数有它们自己的 Expr 类
      // TODO: is there a better way to do this?
      if (texpr_node.fn.name.function_name == "if") {
        *expr = pool->Add(new IfExpr(texpr_node));
      } else if (texpr_node.fn.name.function_name == "nullif") {
        *expr = pool->Add(new NullIfExpr(texpr_node));
      } else if (texpr_node.fn.name.function_name == "isnull" ||
                 texpr_node.fn.name.function_name == "ifnull" ||
                 texpr_node.fn.name.function_name == "nvl") {
        *expr = pool->Add(new IsNullExpr(texpr_node));
      } else if (texpr_node.fn.name.function_name == "coalesce") {
        *expr = pool->Add(new CoalesceExpr(texpr_node));

      } else if (texpr_node.fn.binary_type == TFunctionBinaryType::HIVE) {
        *expr = pool->Add(new HiveUdfCall(texpr_node));
      } else {
        *expr = pool->Add(new ScalarFnCall(texpr_node));
      }
      return Status::OK();
    default:
      stringstream os;
      os << "Unknown expr node type: " << texpr_node.node_type;
      return Status(os.str());
  }
}

CreateExpr() 根据 texpr_node.node_type 新建了很多 Expr,例如 Literal、AndPredicate、SlotRef 等等,它们都是 Expr 的直接或间接子类。关于这些 Expr 的具体实现可以参考源文件:Exprs

再来看看 coordinator 的第三个重要函数调用 public Expr::Prepare()

/**
 * 用于准备 multiple expr trees 的函数
 * 如果提供了 ‘tracker’,会使用这个而不是默认的 'state' 中的 UDF tracker
 */
Status Expr::Prepare(const vector<ExprContext*>& ctxs, RuntimeState* state,
                     const RowDescriptor& row_desc, MemTracker* tracker) {
  for (int i = 0; i < ctxs.size(); ++i) {
  // 通过调用 ExprContext.Prepare()
    RETURN_IF_ERROR(ctxs[i]->Prepare(state, row_desc, tracker));
  }
  return Status::OK();
}

public Expr.Prepare() 通过调用 public ExprContex.Prepare() 实现具体逻辑:

/**
 * 准备 expr tree 进行评估
 * 如果提供了 ‘tracker’,会使用这个而不是默认的 'state' 中的 UDF tracker
 */
Status ExprContext::Prepare(RuntimeState* state, const RowDescriptor& row_desc,
                            MemTracker* tracker) {
  DCHECK(tracker != NULL);
  DCHECK(pool_.get() == NULL);
  prepared_ = true;
  pool_.reset(new MemPool(tracker));
  // 调用 Expr.Prepare()
  return root_->Prepare(state, row_desc, this);
}

而 public ExprContext.Prepare() 又调用 protected Expr.Prepare(),这是个虚函数,Expr 的子类需要实现这个函数。

/**
 * 初始化该 expr 实例。这不包括初始化 ExprContext 中的 state;
 * ‘context’ 只能被用于通过 RegisterFunctionContext() 注册一个 FunctionContext;
 * 任何 IR(中间表达式) 函数都在这里生成
 *
 * 子类重写该函数时应调用 Expr::Prepare() 来在 expr tree 上递归调用 Prepare()
 */
Status Expr::Prepare(RuntimeState* state, const RowDescriptor& row_desc,
                     ExprContext* context) {
  DCHECK(type_.type != INVALID_TYPE);
  for (int i = 0; i < children_.size(); ++i) {
  // 每个 children 递归调用自身
    RETURN_IF_ERROR(children_[i]->Prepare(state, row_desc, context));
  }
  return Status::OK();
}

至此,我们已经介绍完了 Coordinator::Exec() 中的第三个重要函数调用 Expr::Prepare()。再来看看 Coordinator 最后一个重要的函数调用 ParallelExecutor::Exec,也就是真正的执行部分。

/**
 * 用 num_args 个线程并行调用函数 (args[i]) num_args 次
 * 如果有任何 item 失败,返回第一个失败 item 的 Status
 * 否则当所有 item 执行完后返回 Status::OK
 * 调用函数可能会传递一个 StatsMetric 用于收集任务执行时的延迟分布
 */
Status ParallelExecutor::Exec(Function function, void** args, int num_args,
    StatsMetric<double>* latencies) {
  Status status;
  ThreadGroup worker_threads;
  mutex lock;

  for (int i = 0; i < num_args; ++i) {
    stringstream ss;
    ss << "worker-thread(" << i << ")";
    worker_threads.AddThread(new Thread("parallel-executor", ss.str(),
        &ParallelExecutor::Worker, function, args[i], &lock, &status, latencies));
  }
  worker_threads.JoinAll();

  return status;
}

Coordinator 中用的是:

Status fragments_exec_status = ParallelExecutor::Exec(
        bind(mem_fn(&Coordinator::ExecRemoteFragment), this, _1),
        reinterpret_cast(&backend_exec_states_[backend_num - num_hosts]),
        num_hosts, &latencies);

因此会用 num_hosts 个线程并行调用 Coordinator::ExecRemoteFragment() 函数 num_hosts 次。让我们来看看 Coordinator::ExecRemoteFragment() 函数的具体实现:

/**
 * ExecPlanFragment() rpc 的封装。该函数会在多个线程中并行调用;
 * rpc 之前先获取 exec_state->lock,UpdateFragmentExecStatus() 才能正确序列化;
 * exec_state 包括所有发射 rpc 需要的信息
 * 'coordinator' 总是该类的一个实例,'exec_state' 总是 BackendExecState 的一个实例
 */
Status Coordinator::ExecRemoteFragment(void* exec_state_arg) {
  BackendExecState* exec_state = reinterpret_cast<BackendExecState*>(exec_state_arg);
  VLOG_FILE << "making rpc: ExecPlanFragment query_id=" << query_id_
            << " instance_id=" << exec_state->fragment_instance_id
            << " host=" << exec_state->backend_address;
  lock_guard<mutex> l(exec_state->lock);

  Status status;
  // 实例化连接
  ImpalaInternalServiceConnection backend_client(
      exec_env_->impalad_client_cache(), exec_state->backend_address, &status);
  RETURN_IF_ERROR(status);

  TExecPlanFragmentResult thrift_result;
  // rpc
  Status rpc_status = backend_client.DoRpc(&ImpalaInternalServiceClient::ExecPlanFragment,
      exec_state->rpc_params, &thrift_result);
  if (!rpc_status.ok()) {
    stringstream msg;
    msg << "ExecPlanRequest rpc query_id=" << query_id_
        << " instance_id=" << exec_state->fragment_instance_id
        << " failed: " << rpc_status.msg().msg();
    VLOG_QUERY << msg.str();
    exec_state->status = Status(msg.str());
    return status;
  }

  exec_state->status = thrift_result.status;
  if (exec_state->status.ok()) {
    exec_state->initiated = true;
    exec_state->stopwatch.Start();
  }
  return exec_state->status;
}

上面函数最重要的是 Status rpc_status = backend_client.DoRpc() 调用,DoRpc() 函数具体实现如下:

/**
 * 进行 RPC 调用 f(request, response),以及一些客户端异常关闭后 TCP 无法连接的错误处理
 * 注意这可能导致 f() 被调用两次,因为取决于第一次尝试收到的错误可能会再一次尝试  f()。
 * TODO: Detect already-closed cnxns and only retry in that case.
 * 如果超时返回 RPC_TIMEOUT,或由于其他原因 RPC 无法完成,则返回 RPC_GENERAL_ERROR
 * 应用层错误应该通过 response type 标记信号
 */
  template <class F, class Request, class Response>
  Status DoRpc(const F& f, const Request& request, Response* response) {
    DCHECK(response != NULL);
    try {
      (client_->*f)(*response, request);
    } catch (const apache::thrift::TException& e) {
      if (IsTimeoutTException(e)) return Status(TErrorCode::RPC_TIMEOUT);

      // 客户端可能异常关闭,重新打开并重试
      // TODO: ThriftClient 应该返回恰当的错误码
      RETURN_IF_ERROR(Reopen());
      try {
        (client_->*f)(*response, request);
      } catch (apache::thrift::TException& e) {
        // 到这里 RPC 已经失败
        return Status(TErrorCode::RPC_GENERAL_ERROR, e.what());
      }
    }
    return Status::OK();
  }

Status rpc_status = backend_client.DoRpc() 调用时使用的参数是 ImpalaInternalServiceClient::ExecPlanFragment,该函数在ImpalaInternalService.h 中定义,下面是具体代码:

  virtual void ExecPlanFragment(TExecPlanFragmentResult& return_val,
      const TExecPlanFragmentParams& params) {
    fragment_mgr_->ExecPlanFragment(params).SetTStatus(&return_val);
  }

上面的函数调用 FragmentMgr.ExecPlanFragment(),具体代码如下:

/**
 * 注册一个 FragmentExecState,调用它的 Prepare(),运行之前先启动运行 FragmentExecThread() 的线程;
 * 如果没有错误则返回 OK,否则如果 fragment 错误(e.g. no sink) 或调用 Prepare() 时出错则返回错误
 */
Status FragmentMgr::ExecPlanFragment(const TExecPlanFragmentParams& exec_params) {
  VLOG_QUERY << "ExecPlanFragment() instance_id="
             << exec_params.fragment_instance_ctx.fragment_instance_id
             << " coord=" << exec_params.fragment_instance_ctx.query_ctx.coord_address
             << " backend#=" << exec_params.fragment_instance_ctx.backend_num;

  if (!exec_params.fragment.__isset.output_sink) {
    return Status("missing sink in plan fragment");
  }

  // 新建 FragmentExecState
  shared_ptr<FragmentExecState> exec_state(
      new FragmentExecState(exec_params.fragment_instance_ctx, ExecEnv::GetInstance()));
  // 注册 exec_state 前先调用 Prepare() 防止发生 exec_state->Cancel() 调用,我们可能会
  // 收到异步取消,执行器要求 Prepare() 返回之前不能调用 Cancel()
  RETURN_IF_ERROR(exec_state->Prepare(exec_params));

  {
    lock_guard<mutex> l(fragment_exec_state_map_lock_);
    // 启动 exec 线程之前先注册 exec_state
    fragment_exec_state_map_.insert(
        make_pair(exec_params.fragment_instance_ctx.fragment_instance_id, exec_state));
  }

  // 在新线程中执行 plan fragment
  // TODO: 通过全局线程池管理线程
  exec_state->set_exec_thread(new Thread("impala-server", "exec-plan-fragment",
      &FragmentMgr::FragmentExecThread, this, exec_state.get()));

  return Status::OK();
}

上面的函数新建 FragmentMgr::FragmentExecThread() 线程开始执行,该私有成员方法如下:

/**
 * 调用 exec_state->Exec(),然后从 fragment map 中移除 exec_state。
 * 在 fragment 的执行线程中运行
 */
void FragmentMgr::FragmentExecThread(FragmentExecState* exec_state) {
  ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->Increment(1L);
  exec_state->Exec();
  // 已经完成了 plan fragment

  // 最后一次对 FragmentExecState 的引用在 map 中。
  // fragment_exec_state_map_lock_ 生效时我们不希望调用析构函数,
  // 因此我们在从 map 中移除条目之前先获得一个引用。
  shared_ptr<FragmentExecState> exec_state_reference;
  {
    lock_guard<mutex> l(fragment_exec_state_map_lock_);
    FragmentExecStateMap::iterator i =
        fragment_exec_state_map_.find(exec_state->fragment_instance_id());
    if (i != fragment_exec_state_map_.end()) {
      exec_state_reference = i->second;
      fragment_exec_state_map_.erase(i);
    } else {
      LOG(ERROR) << "missing entry in fragment exec state map: instance_id="
                 << exec_state->fragment_instance_id();
    }
  }
#ifndef ADDRESS_SANITIZER
  // tcmalloc 和 address sanitizer 不能一起使用
  if (FLAGS_log_mem_usage_interval > 0) {
    uint64_t num_complete = ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->value();
    if (num_complete % FLAGS_log_mem_usage_interval == 0) {
      char buf[2048];
      // 输出该 impalad 正在使用的内存大小
      MallocExtension::instance()->GetStats(buf, 2048);
      LOG(INFO) << buf;
    }
  }
#endif
}

上面的函数调用了 FragmentExecState.Exec() 函数,该函数源代码如下:

/**
 * plan fragment 执行的主循环。阻塞直到完成执行
 */
void FragmentMgr::FragmentExecState::Exec() {
  // Open() does the full execution, because all plan fragments have sinks
  executor_.Open();
  executor_.Close();
}

该函数调用了 PlanFragmentExecutor.Open()PlanFragmentExecutor.Close() 函数,这两个函数的定义如下:

/**
 * 开始执行。在 GetNext() 之前调用该函数。
 * 如果该 fragment 有一个 sink,Open() 会发送 fragment 生成的所有 rows 都那个 sink,
 * 因此,Open() 会一直阻塞直到所有 rows 都产生(后续调用的 GetNext() 不会返回任何 rows)。
 * 如果间隔标记大于 0,而且 c'tor 中指定了回调函数,这里会同时启动状态报告线程。
 * 如果该 fragment 有一个 sink,Open() 返回时会最后一次调用
 * report_status_cb,然后状态报告线程就会停止。
 */
Status PlanFragmentExecutor::Open() {
  VLOG_QUERY << "Open(): instance_id="
      << runtime_state_->fragment_instance_id();
  // 由于 Open() 可能阻塞,我们要在调用它前先启动 profile-reporting
  if (!report_status_cb_.empty() && FLAGS_status_report_interval > 0) {
    unique_lock<mutex> l(report_thread_lock_);
    report_thread_.reset(
        new Thread("plan-fragment-executor", "report-profile",
            &PlanFragmentExecutor::ReportProfile, this));
    // 确定启动了线程,否则 ReportProfile() 可能会和 StopReportThread() 竞争
    report_thread_started_cv_.wait(l);
    report_thread_active_ = true;
  }

  // 调用 LLVM 优化模块
  OptimizeLlvmModule();
  // 调用私有函数 OpenInternal() 实现具体逻辑
  Status status = OpenInternal();
  if (!status.ok() && !status.IsCancelled() && !status.IsMemLimitExceeded()) {
    // 记录返回结果以及错误消息.
    // 不获取结果的查询(e.g. insert) 可能不会直接接收消息,只能通过日志获取
    runtime_state_->LogError(status.msg());
  }
  UpdateStatus(status);
  return status;
}  

PlanFragmentExecutor.Close():

/**
 * 关闭底层的 plan fragment 并释放所有 Open()/GetNext() 分配的资源
 */
void PlanFragmentExecutor::Close() {
  if (closed_) return;
  row_batch_.reset();
  // 可能没有调用设置 runtime_state_ 的 Prepare
  if (runtime_state_.get() != NULL) {
    if (runtime_state_->query_resource_mgr() != NULL) {
    // 取消注册 Fragment
      exec_env_->cgroups_mgr()->UnregisterFragment(
          runtime_state_->fragment_instance_id(), runtime_state_->cgroup());
    }
    // 关闭 plan
    if (plan_ != NULL) plan_->Close(runtime_state_.get());
    // 关闭 sink_
    if (sink_.get() != NULL) sink_->Close(runtime_state());
    BOOST_FOREACH(DiskIoMgr::RequestContext* context,
        *runtime_state_->reader_contexts()) {
        // 取消注册 Context
      runtime_state_->io_mgr()->UnregisterContext(context);
    }
    // 取消注册 pool
    exec_env_->thread_mgr()->UnregisterPool(runtime_state_->resource_pool());
  }
  if (mem_usage_sampled_counter_ != NULL) {
    PeriodicCounterUpdater::StopTimeSeriesCounter(mem_usage_sampled_counter_);
    mem_usage_sampled_counter_ = NULL;
  }
  closed_ = true;
}

PlanFragmentExecutor.Open() 调用了两个主要函数:PlanFragmentExecutor.OpenInternal()PlanFragmentExecutor.OptimizeLlvmModule(),这两个函数的具体定义如下:PlanFragmentExecutor.OpenInternal()

/**
 * 执行 Open() 逻辑并返回结果状态。并不设置 status_。
 * 如果 plan fragment 没有 sink,OpenInternal() 不做任何事情。
 * 如果 plan fragment 有一个 sink,而且 OpenInternal() 没有返回任何错误
 * 情况,所有的 row 都会被发送到 sink,然后关闭 sink,发送最后的报告,
 * 停止报告线程。成功执行后 sink_ 会被置为空。
 */
Status PlanFragmentExecutor::OpenInternal() {
  {
    SCOPED_TIMER(profile()->total_time_counter());
    RETURN_IF_ERROR(plan_->Open(runtime_state_.get()));
  }

  if (sink_.get() == NULL) return Status::OK();

  RETURN_IF_ERROR(sink_->Open(runtime_state_.get()));

  // 如果有一个 sink,完成所有工作,当这里返回时也就实际完成了查询
  while (!done_) {
    RowBatch* batch;
    RETURN_IF_ERROR(GetNextInternal(&batch));
    if (batch == NULL) break;
    if (VLOG_ROW_IS_ON) {
      VLOG_ROW << "OpenInternal: #rows=" << batch->num_rows();
      for (int i = 0; i < batch->num_rows(); ++i) {
        VLOG_ROW << PrintRow(batch->GetRow(i), row_desc());
      }
    }

    SCOPED_TIMER(profile()->total_time_counter());
    RETURN_IF_ERROR(sink_->Send(runtime_state(), batch, done_));
  }

  // 停止报告线程之前关闭 sink。关闭时可能会向最后的报告
  // 中添加一些重要信息 (e.g. table sinks 记录该方法中写了的文件)。
  // coordinator 报告会等到所,因此有 backend 发生错误或者返回了
  // done=true 的状态报告,因此 Close 时关闭任何数据流状态是安全的
  SCOPED_TIMER(profile()->total_time_counter());
  sink_->Close(runtime_state());
  done_ = true;

  FragmentComplete();
  return Status::OK();
}

PlanFragmentExecutor.OptimizeLlvmModule()

/**
 * 在 runtime_state_->llvm_codegen() 中优化 code-generated 函数。
 * 必须在 plan_->Prepare() 和 plan_->Open() 之间调用。
 * 这很耗时,因此我们不希望在 PlanFragmentExecutor()::Prepare() 中进行,
 * 使得能更快地启动 plan fragments 以及并行执行(在很深的 plan tree 中
 * 按照层次顺序启动 Fragment)
 */
void PlanFragmentExecutor::OptimizeLlvmModule() {
  if (!runtime_state_->codegen_created()) return;
  LlvmCodeGen* codegen;
  Status status = runtime_state_->GetCodegen(&codegen, /* initalize */ false);
  DCHECK(status.ok());
  DCHECK(codegen != NULL);
  status = codegen->FinalizeModule();
  if (!status.ok()) {
    stringstream ss;
    ss << "Error with codegen for this query: " << status.GetDetail();
    runtime_state_->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
  }
}

PlanFragmentExecutor::OpenInternal()PlanFragmentExecutor::OptimizeLlvmModule() 中调用了几个比较重要的函数,分别是:
PlanFragmentExecutor.GetNextInternal()
ExecNode.Open()
DataSink.Open()
DataSink.send()
DataSink.Close()
RuntimeState.codegen_created()
RuntimeState.GetCodegen()
LlvmCodeGen.FinalizeModule()

至此,我们已经介绍了 Coordinator::Exec() 调用的四个重要函数,也就介绍完了 Query/DML 请求类型schedulecoordinator 部分,至于 ImpalaServer::QueryExecState::Exec 的其它请求类型,这里不再进行详细介绍。

介绍完了 ImpalaServer::QueryExecState::Exec,也就基本完成了 BE 的介绍。

由于笔者刚开始接触 Impala,分析可能存在某些谬误,有任何疑问或建议都欢迎讨论。


Reference:
Impala Main Page
Impala Github Project
Impala 简介
Impala 源码分析-FE
Impala 函数调用栈
Impala 数据结构-Thrift 部分

Tagged on:

One thought on “Impala 源码分析-BE

发表评论

电子邮件地址不会被公开。