Impala 源码分析-FE

Impala 源代码目录结构

SQL 解析

Impala 的 SQL 解析与执行计划生成部分是由 impala-frontend(Java)实现的,监听端口是 21000。用户通过
Beeswax 接口 BeeswaxService.query() 提交一个请求,在 impalad 端的处理逻辑是由
void ImpalaServer::query(QueryHandle& query_handle, const Query& query) 这个函数(
ImpalaServer.h)完成的。

void ImpalaServer::query(QueryHandle& query_handle, const Query& query) {
  VLOG_QUERY << "query(): query=" << query.query;
  ScopedSessionState session_handle(this);
  shared_ptr<SessionState> session;
  RAISE_IF_ERROR(// 为当前连接返回唯一标识,标记会话为使用中并保存
      session_handle.WithSession(ThriftServer::GetThreadConnectionId(), &session),
      SQLSTATE_GENERAL_ERROR);
  TQueryCtx query_ctx;
  // 将 Query 转化为 TQueryCtx
  // raise general error for request conversion error;
  RAISE_IF_ERROR(QueryToTQueryContext(query, &query_ctx), SQLSTATE_GENERAL_ERROR);

  // raise Syntax error or access violation; it's likely to be syntax/analysis error
  // TODO: that may not be true; fix this
  shared_ptr<QueryExecState> exec_state;
  	// 开始异步执行查询,内部调用 ImpalaServer::Execute() 函数
  	// 将 TQueryCtx 转换为 QueryExecState,注册并调用 Coordinator::Execute()
  RAISE_IF_ERROR(Execute(&query_ctx, session, &exec_state),
      SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);

  exec_state->UpdateQueryState(QueryState::RUNNING);
  // start thread to wait for results to become available, which will allow
  // us to advance query state to FINISHED or EXCEPTION
  exec_state->WaitAsync();
  // Once the query is running do a final check for session closure and add it to the
  // set of in-flight queries.
  Status status = SetQueryInflight(session, exec_state);
  if (!status.ok()) {
    UnregisterQuery(exec_state->query_id(), false, &status);
    RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
  }
  TUniqueIdToQueryHandle(exec_state->query_id(), &query_handle);
}

其中 QueryToTQueryContext(query, &query_ctx) 将 Query 装换为 TQueryCtx。具体代码实现如下:
(ImpalaServer.h)

Status ImpalaServer::QueryToTQueryContext(const Query& query,
    TQueryCtx* query_ctx) {
  query_ctx->request.stmt = query.query;
  VLOG_QUERY << "query: " << ThriftDebugString(query);
  {
    shared_ptr<SessionState> session;
    const TUniqueId& session_id = ThriftServer::GetThreadConnectionId();
    RETURN_IF_ERROR(GetSessionState(session_id, &session));
    DCHECK(session != NULL);
    {
      // The session is created when the client connects. Depending on the underlying
      // transport, the username may be known at that time. If the username hasn't been
      // set yet, set it now.
      lock_guard<mutex> l(session->lock);
      if (session->connected_user.empty()) session->connected_user = query.hadoop_user;
      query_ctx->request.query_options = session->default_query_options;
    }
    // 构建该 SessionState 的 Thrift 表示用于序列化到 frontend
    session->ToThrift(session_id, &query_ctx->session);
  }

  // Override default query options with Query.Configuration
  if (query.__isset.configuration) {
    BOOST_FOREACH(const string& option, query.configuration) {
      RETURN_IF_ERROR(ParseQueryOptions(option, &query_ctx->request.query_options));
    }
    VLOG_QUERY << "TClientRequest.queryOptions: "
               << ThriftDebugString(query_ctx->request.query_options);
  }

  return Status::OK();
}

内部调用 ImpalaServer::Execute()
(ImpalaServer.h)
函数将 TQueryCtx 转换为 TExecRequest,具体逻辑通过调用 ImpalaServer::ExecuteInternal() 实现。代码如下:

Status ImpalaServer::Execute(TQueryCtx* query_ctx,
    shared_ptr<SessionState> session_state,
    shared_ptr<QueryExecState>* exec_state) {
  PrepareQueryContext(query_ctx);
  bool registered_exec_state;
  ImpaladMetrics::IMPALA_SERVER_NUM_QUERIES->Increment(1L);

  // Redact the SQL stmt and update the query context
  string stmt = replace_all_copy(query_ctx->request.stmt, "\n", " ");
  Redact(&stmt);
  query_ctx->request.__set_redacted_stmt((const string) stmt);
	// 实现 Execute() 逻辑,出错时不取消注册查询
  Status status = ExecuteInternal(*query_ctx, session_state, &registered_exec_state,
      exec_state);
  if (!status.ok() && registered_exec_state) {
    UnregisterQuery((*exec_state)->query_id(), false, &status);
  }
  return status;
}

上面的函数调用 ImpalaServer::ExecuteInternal()
(ImpalaServer.h)
在这个函数里通过 JNI 接口调用 frontend.createExecRequest() 生成 TExecRequest,具体代码如下:

Status ImpalaServer::ExecuteInternal(
    const TQueryCtx& query_ctx,
    shared_ptr<SessionState> session_state,
    bool* registered_exec_state,
    shared_ptr<QueryExecState>* exec_state) {
  DCHECK(session_state != NULL);
  *registered_exec_state = false;
  if (IsOffline()) {
    return Status("This Impala server is offline. Please retry your query later.");
  }
  exec_state->reset(new QueryExecState(query_ctx, exec_env_, exec_env_->frontend(),
      this, session_state));

  (*exec_state)->query_events()->MarkEvent("Start execution");

  TExecRequest result;
  {
    // Keep a lock on exec_state so that registration and setting
    // result_metadata are atomic.
    //
    // Note: this acquires the exec_state lock *before* the
    // query_exec_state_map_ lock. This is the opposite of
    // GetQueryExecState(..., true), and therefore looks like a
    // candidate for deadlock. The reason this works here is that
    // GetQueryExecState cannot find exec_state (under the exec state
    // map lock) and take it's lock until RegisterQuery has
    // finished. By that point, the exec state map lock will have been
    // given up, so the classic deadlock interleaving is not possible.
    lock_guard<mutex> l(*(*exec_state)->lock());

    // register exec state as early as possible so that queries that
    // take a long time to plan show up, and to handle incoming status
    // reports before execution starts.
    RETURN_IF_ERROR(RegisterQuery(session_state, *exec_state));
    *registered_exec_state = true;

    RETURN_IF_ERROR((*exec_state)->UpdateQueryStatus(
    // 通过 JNI 接口调用 frontend.createExecRequest() 生成 TExecRequest
        exec_env_->frontend()->GetExecRequest(query_ctx, &result)));
    (*exec_state)->query_events()->MarkEvent("Planning finished");
    (*exec_state)->summary_profile()->AddEventSequence(
        result.timeline.name, result.timeline);
    if (result.__isset.result_set_metadata) {
      (*exec_state)->set_result_metadata(result.result_set_metadata);
    }
  }
  VLOG(2) << "Execution request: " << ThriftDebugString(result);

  // start execution of query; also starts fragment status reports
  RETURN_IF_ERROR((*exec_state)->Exec(&result));
  if (result.stmt_type == TStmtType::DDL) {
    Status status = UpdateCatalogMetrics();
    if (!status.ok()) {
      VLOG_QUERY << "Couldn't update catalog metrics: " << status.GetDetail();
    }
  }

  if ((*exec_state)->coord() != NULL) {
    const unordered_set<TNetworkAddress>& unique_hosts =
        (*exec_state)->schedule()->unique_hosts();
    if (!unique_hosts.empty()) {
      lock_guard<mutex> l(query_locations_lock_);
      BOOST_FOREACH(const TNetworkAddress& port, unique_hosts) {
        query_locations_[port].insert((*exec_state)->query_id());
      }
    }
  }
  return Status::OK();
}

Frontend::GetExecRequest()
(Frontend.h)
通过 JNI 接口调用 frontend.createExecRequest() 生成 TExecRequest。具体实现代码如下:

Status Frontend::GetExecRequest(
    const TQueryCtx& query_ctx, TExecRequest* result) {
  return JniUtil::CallJniMethod(fe_, create_exec_request_id_, query_ctx, result);
}

JniUtil::CallJniMethod()
(jni-util.h)
的具体实现代码如下:

/// Utility methods to avoid repeating lots of the JNI call boilerplate. It seems these
/// must be defined in the header to compile properly.
template <typename T>
static Status CallJniMethod(const jobject& obj, const jmethodID& method, const T& arg) {
  JNIEnv* jni_env = getJNIEnv();
  jbyteArray request_bytes;
  JniLocalFrame jni_frame;
  RETURN_IF_ERROR(jni_frame.push(jni_env));
  RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &arg, &request_bytes));
  jni_env->CallObjectMethod(obj, method, request_bytes);
  RETURN_ERROR_IF_EXC(jni_env);
  return Status::OK();
}

至此,将通过 Thrift 转到 Java Frontend 生成执行计划树。
public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
(Frontend.java)
是最重要的方法,它根据提供的 TQueryCtx 创建 TExecRequest。具体代码(分析部分)如下:

  /**
   * Create a populated TExecRequest corresponding to the supplied TQueryCtx.
   */
  public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
      throws ImpalaException {
    // Analyzes the SQL statement included in queryCtx and returns the AnalysisResult.
    AnalysisContext.AnalysisResult analysisResult = analyzeStmt(queryCtx);
    EventSequence timeline = analysisResult.getAnalyzer().getTimeline();
    timeline.markEvent("Analysis finished");
    .
    .
    .
    .
  }

首先通过调用 analyzeStmt()
(Frontend.java)
方法分析提交的 SQL 语句。analyzeStmt() 的具体实现代码如下:

/**
   * Analyzes the SQL statement included in queryCtx and returns the AnalysisResult.
   */
  private AnalysisContext.AnalysisResult analyzeStmt(TQueryCtx queryCtx)
      throws AnalysisException, InternalException, AuthorizationException {
    AnalysisContext analysisCtx = new AnalysisContext(dsqldCatalog_, queryCtx,
        authzConfig_);
    LOG.debug("analyze query " + queryCtx.request.stmt);

    // 循环分析直到出现以下某种情形:
    // 1) 分析成功完成
    // 2) 由于缺失表分析失败并抛出 AnalysisException 异常
    // 3) 分析失败并抛出 AuthorizationException 异常
    try {
      while (true) {
        try {
          // 通过调用 AnalyzeContex.analyze() 实现具体的分析逻辑 
          analysisCtx.analyze(queryCtx.request.stmt);
          Preconditions.checkState(analysisCtx.getAnalyzer().getMissingTbls().isEmpty());
          return analysisCtx.getAnalysisResult();
        } catch (AnalysisException e) {
          Set<TableName> missingTbls = analysisCtx.getAnalyzer().getMissingTbls();
          // Only re-throw the AnalysisException if there were no missing tables.
          if (missingTbls.isEmpty()) throw e;

          // Some tables/views were missing, request and wait for them to load.
          if (!requestTblLoadAndWait(missingTbls, MISSING_TBL_LOAD_WAIT_TIMEOUT_MS)) {
            LOG.info(String.format("Missing tables were not received in %dms. Load " +
                "request will be retried.", MISSING_TBL_LOAD_WAIT_TIMEOUT_MS));
          }
        }
      }
    } finally {
      // Authorize all accesses.
      // AuthorizationExceptions must take precedence over any AnalysisException
      // that has been thrown, so perform the authorization first.
      analysisCtx.getAnalyzer().authorize(getAuthzChecker());
    }
  }

AnalyzerContext.AnalyzeResult.Analyzer 对象是个存放这个 SQL 所涉及到的所有信息
(包含Table, conjunct, slot,slotRefMap, eqJoinConjuncts等)的知识库,所有跟这个
SQL 有关的东西都会存到 Analyzer对象里面。该类的定义可以查看
Analyzer.java
AnalyzerContex.analyze()
(AnalyzeContext.java)
的具体实现代码如下:

  /**
   * Parse and analyze 'stmt'. If 'stmt' is a nested query (i.e. query that
   * contains subqueries), it is also rewritten by performing subquery unnesting.
   * The transformed stmt is then re-analyzed in a new analysis context.
   */
  public void analyze(String stmt) throws AnalysisException {
    Analyzer analyzer = new Analyzer(catalog_, queryCtx_, authzConfig_);
    analyze(stmt, analyzer);
  }

上面的 analyze() 函数通过调用同名的重载函数 analyze(String stmt, Analyzer analyzer)
(AnalyzeContext.java)
实现具体的分析,代码如下:

  /**
   * Parse and analyze 'stmt' using a specified Analyzer.
   */
  public void analyze(String stmt, Analyzer analyzer) throws AnalysisException {
    SqlScanner input = new SqlScanner(new StringReader(stmt));
    SqlParser parser = new SqlParser(input);
    try {
      analysisResult_ = new AnalysisResult();
      analysisResult_.analyzer_ = analyzer;
      if (analysisResult_.analyzer_ == null) {
        analysisResult_.analyzer_ = new Analyzer(catalog_, queryCtx_, authzConfig_);
      }
      analysisResult_.stmt_ = (StatementBase) parser.parse().value;
      if (analysisResult_.stmt_ == null)
        return;

      // For CTAS(Create Table As Select), we copy the create statement 
      // in case we have to create a new CTAS statement after a query rewrite.
      if (analysisResult_.stmt_ instanceof CreateTableAsSelectStmt) {
        analysisResult_.tmpCreateTableStmt_ =
            ((CreateTableAsSelectStmt) analysisResult_.stmt_).getCreateStmt().clone();
      }

      analysisResult_.stmt_.analyze(analysisResult_.analyzer_);
      boolean isExplain = analysisResult_.isExplainStmt();

      // Check if we need to rewrite the statement.
      if (analysisResult_.requiresRewrite()) {
        StatementBase rewrittenStmt = StmtRewriter.rewrite(analysisResult_);
        // Re-analyze the rewritten statement.
        Preconditions.checkNotNull(rewrittenStmt);
        analysisResult_ = new AnalysisResult();
        analysisResult_.analyzer_ = new Analyzer(catalog_, queryCtx_, authzConfig_);
        analysisResult_.stmt_ = rewrittenStmt;
        analysisResult_.stmt_.analyze(analysisResult_.analyzer_);
        LOG.trace("rewrittenStmt: " + rewrittenStmt.toSql());
        if (isExplain)
          analysisResult_.stmt_.setIsExplain();
      }
    } catch (AnalysisException e) {
      // Don't wrap AnalysisExceptions in another AnalysisException
      throw e;
    } catch (Exception e) {
      throw new AnalysisException(parser.getErrorMsg(stmt), e);
    }
  }

上面的函数通过调用 SqlScanner 和 SqlParser 类实现具体的分析。可以查看
sql-scanner.flex

sql-parser.y

分析 SQL 语句的大概流程如下:

  1. 处理这个 SQL 所涉及到的 Table(即TableRefs),这些 Table 是在 from 从句中提取出来的(包含关键字
    from, join, on/using)。注意 JOIN 操作以及 on/using 条件是存储在参与 JOIN 操作的右边的表的 TableRef
    中并分析的。依次 analyze() 每个 TableRef,向 Analyzer 注册 registerBaseTableRef(填充TupleDescriptor)。
    如果对应的 TableRef 涉及到 JOIN 操作,还要 analyzeJoin()。在 analyzeJoin() 时会向 Analyzer registerConjunct()
    填充 Analyzer 的一些成员变量:conjuncts,tuplePredicates(TupleId 与 conjunct 的映射),slotPredicates(SlotId
    与 conjunct 的映射),eqJoinConjuncts。
  2. 处理 select 从句(包含关键字 select, MAX(), AVG()等聚集函数):分析这个 SQL 都 select 了哪几项,每一项都是个
    Expr 类型的子类对象,把这几项填入 resultExprs 数组和 colLabels。然后把 resultExprs 里面的 Expr 都递归 analyze
    一下,要分析到树的最底层,向 Analyzer 注册 SlotRef 等。
  3. 分析 where 从句(关键字 where),首先递归 Analyze 从句中 Expr 组成的树,然后向 Analyzer registerConjunct()
    填充 Analyzer 的一些成员变量(同1,此外还要填充 whereClauseConjuncts) 。
  4. 处理 sort 相关信息(关键字 order by)。先是解析 aliases 和 ordinals,然后从 order by 后面的从句中提取 Expr 填入
    orderingExprs,接着递归 Analyze 从句中 Expr 组成的树,最后创建 SortInfo 对象。
  5. 处理 aggregation 相关信息(关键字 group by, having, avg, max 等)。首先递归分析 group by 从句里的 Expr,然后如果有
    having 从句就像 where 从句一样,先是 analyze having 从句中 Expr 组成的树,然后向 Analyzer registerConjunct()等。
  6. 处理 InlineView。

至此,词法分析和语法分析都完成了,回到 frontend.createExecRequest()
(Frontend.java)
函数,开始填充 TExecRequest 内的成员变量。代码如下(部分):

  /**
   * Create a populated TExecRequest corresponding to the supplied TQueryCtx.
   */
  public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
      throws ImpalaException {
    // Analyzes the SQL statement included in queryCtx and returns the AnalysisResult.
    AnalysisContext.AnalysisResult analysisResult = analyzeStmt(queryCtx);
    EventSequence timeline = analysisResult.getAnalyzer().getTimeline();
    timeline.markEvent("Analysis finished");
    
    // 开始填充 TExecRequest
    Preconditions.checkNotNull(analysisResult.getStmt());
    TExecRequest result = new TExecRequest();
    result.setQuery_options(queryCtx.request.getQuery_options());
    result.setAccess_events(analysisResult.getAccessEvents());
    result.analysis_warnings = analysisResult.getAnalyzer().getWarnings();

    if (analysisResult.isCatalogOp()) {
      result.stmt_type = TStmtType.DDL;
      createCatalogOpRequest(analysisResult, result);
      String jsonLineageGraph = analysisResult.getJsonLineageGraph();
      if (jsonLineageGraph != null && !jsonLineageGraph.isEmpty()) {
        result.catalog_op_request.setLineage_graph(jsonLineageGraph);
      }
      // All DDL operations except for CTAS are done with analysis at this point.
      if (!analysisResult.isCreateTableAsSelectStmt()) return result;
    } else if (analysisResult.isLoadDataStmt()) {
      result.stmt_type = TStmtType.LOAD;
      result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
          new TColumn("summary", Type.STRING.toThrift()))));
      result.setLoad_data_request(analysisResult.getLoadDataStmt().toThrift());
      return result;
    } else if (analysisResult.isSetStmt()) {
      result.stmt_type = TStmtType.SET;
      result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
          new TColumn("option", Type.STRING.toThrift()),
          new TColumn("value", Type.STRING.toThrift()))));
      result.setSet_query_option_request(analysisResult.getSetStmt().toThrift());
      return result;
    }
    .
    .
    .
    .
    
  }

如果是 DDL 命令(use, show tables, show databases, describe),那么调用 createCatalogOpRequest()。
如果是 Load Data 或者 Set 语句,就调用相应的 setmetadata 并转换为 Thrift。

执行计划生成

另外一种情况就是 Query 或者 DML 命令,那么就得创建和填充 TQueryExecRequest 了。该部分代码如下:

/**
   * Create a populated TExecRequest corresponding to the supplied TQueryCtx.
   */
  public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
      throws DsqlException {
    .
    .
    .
    .
    .
    // create TQueryExecRequest 如果是 Query、DML、或 CTAS 语句
    Preconditions.checkState(analysisResult.isQueryStmt() || analysisResult.isDmlStmt()
        || analysisResult.isCreateTableAsSelectStmt());

    TQueryExecRequest queryExecRequest = new TQueryExecRequest();
    // create plan
    LOG.debug("create plan");
    Planner planner = new Planner(analysisResult, queryCtx);
    // 根据 SQL 语法树生成执行计划(PlanNode 和 PlanFragment)
    // 用 Planner 把 SQL 解析出的语法树转换成 Plan fragments,后者能在各个 backend 被执行。
    ArrayList<PlanFragment> fragments = planner.createPlan();

    List<ScanNode> scanNodes = Lists.newArrayList();
    // 建立 queryExecRequest.fragments 中 fragment 到它索引的映射;
    // queryExecRequest.dest_fragment_idx 需要这些映射
    Map<PlanFragment, Integer> fragmentIdx = Maps.newHashMap();

    for (int fragmentId = 0; fragmentId < fragments.size(); ++fragmentId) {
      PlanFragment fragment = fragments.get(fragmentId);
      Preconditions.checkNotNull(fragment.getPlanRoot());
      fragment.getPlanRoot().collect(Predicates.instanceOf(ScanNode.class), scanNodes);
      fragmentIdx.put(fragment, fragmentId);
    }
    .
    .
    .
    .
    .
  }

上面的 createPlan() 函数是 frontend 最重要的函数:根据 SQL 解析的结果和 client 传入的 query options,
生成执行计划。执行计划是用 PlanFragment 的数组表示的,最后会序列化到 TQueryExecRequest.fragments
然后传给 backend 的 coordinator 去调度执行。现在让我们来看看 createPlan()
(Planner.java)
的具体实现:

  /**
   * Returns a list of plan fragments for executing an analyzed parse tree.
   * May return a single-node or distributed executable plan.
   */
  public ArrayList<PlanFragment> createPlan() throws ImpalaException {
    SingleNodePlanner singleNodePlanner = new SingleNodePlanner(ctx_);
    DistributedPlanner distributedPlanner = new DistributedPlanner(ctx_);
    // 首先生成 SingleNodePlan,单节点执行计划树
    PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan();
    ctx_.getRootAnalyzer().getTimeline().markEvent("Single node plan created");
    ArrayList<PlanFragment> fragments = null;

    // Determine the maximum number of rows processed by any node in the plan tree
    MaxRowsProcessedVisitor visitor = new MaxRowsProcessedVisitor();
    singleNodePlan.accept(visitor);
    long maxRowsProcessed = visitor.get() == -1 ? Long.MAX_VALUE : visitor.get();
    boolean isSmallQuery =
        maxRowsProcessed < ctx_.getQueryOptions().exec_single_node_rows_threshold;
    if (isSmallQuery) {
      // Execute on a single node and disable codegen for small results
      ctx_.getQueryOptions().setNum_nodes(1);
      ctx_.getQueryOptions().setDisable_codegen(true);
      if (maxRowsProcessed < ctx_.getQueryOptions().batch_size ||
          maxRowsProcessed < 1024 && ctx_.getQueryOptions().batch_size == 0) {
        // Only one scanner thread for small queries
        ctx_.getQueryOptions().setNum_scanner_threads(1);
      }
    }

    if (ctx_.isSingleNodeExec()) {// 如果是单节点执行计划树
      // 创建保护整个单点计划树的片段
      fragments = Lists.newArrayList(new PlanFragment(
          ctx_.getNextFragmentId(), singleNodePlan, DataPartition.UNPARTITIONED));
    } else {// 分布式执行计划树
      // create distributed plan
      fragments = distributedPlanner.createPlanFragments(singleNodePlan);
    }
		// 最后一个 Fragment 是根 fragment
    PlanFragment rootFragment = fragments.get(fragments.size() - 1);
    if (ctx_.isInsertOrCtas()) {
      InsertStmt insertStmt = ctx_.getAnalysisResult().getInsertStmt();
      if (!ctx_.isSingleNodeExec()) {
        // repartition on partition keys
        rootFragment = distributedPlanner.createInsertFragment(
            rootFragment, insertStmt, ctx_.getRootAnalyzer(), fragments);
      }
      // set up table sink for root fragment
      rootFragment.setSink(insertStmt.createDataSink());
    }

    ColumnLineageGraph graph = ctx_.getRootAnalyzer().getColumnLineageGraph();
    List<Expr> resultExprs = null;
    Table targetTable = null;
    if (ctx_.isInsertOrCtas()) {
      InsertStmt insertStmt = ctx_.getAnalysisResult().getInsertStmt();
      resultExprs = insertStmt.getResultExprs();
      targetTable = insertStmt.getTargetTable();
      graph.addTargetColumnLabels(targetTable);
    } else {
      resultExprs = ctx_.getQueryStmt().getResultExprs();
      graph.addTargetColumnLabels(ctx_.getQueryStmt().getColLabels());
    }
    resultExprs = Expr.substituteList(resultExprs,
        rootFragment.getPlanRoot().getOutputSmap(), ctx_.getRootAnalyzer(), true);
    rootFragment.setOutputExprs(resultExprs);
    LOG.debug("desctbl: " + ctx_.getRootAnalyzer().getDescTbl().debugString());
    LOG.debug("resultexprs: " + Expr.debugString(rootFragment.getOutputExprs()));
    LOG.debug("finalize plan fragments");
    for (PlanFragment fragment: fragments) {
      fragment.finalize(ctx_.getRootAnalyzer());
    }

    Collections.reverse(fragments);
    ctx_.getRootAnalyzer().getTimeline().markEvent("Distributed plan created");

    if (RuntimeEnv.INSTANCE.computeLineage() || RuntimeEnv.INSTANCE.isTestEnv()) {
      // Compute the column lineage graph
      if (ctx_.isInsertOrCtas()) {
        Preconditions.checkNotNull(targetTable);
        List<Expr> exprs = Lists.newArrayList();
        if (targetTable instanceof HBaseTable) {
          exprs.addAll(resultExprs);
        } else {
          exprs.addAll(ctx_.getAnalysisResult().getInsertStmt().getPartitionKeyExprs());
          exprs.addAll(resultExprs.subList(0,
              targetTable.getNonClusteringColumns().size()));
        }
        graph.computeLineageGraph(exprs, ctx_.getRootAnalyzer());
      } else {
        graph.computeLineageGraph(resultExprs, ctx_.getRootAnalyzer());
      }
      LOG.trace("lineage: " + graph.debugString());
      ctx_.getRootAnalyzer().getTimeline().markEvent("Lineage info computed");
    }

    return fragments;
  }

createPlan 包括createSingleNodePlancreatePlanFragments
两个主要部分。其中第一个是单节点计划树,所有片段只能在一个节点 corrd 上执行,第二个是分布式执行计划树,片段可以分配到不同的节点中运行。我们先来看看 SingleNodePlanner.createSingleNodePlan()
(SingleNodePlanner.java)
该方法根据 Planner Context 中分析的语法树创建单节点执行计划树并返回根节点。计划递归处理语法树并执行以下操作,自上而下处理查询语句:

  1. materialize the slots required for evaluating expressions of that statement
  2. migrate conjuncts from parent blocks into inline views and union operands In the bottom-up phase generate the plan tree for every query statement:
  3. perform join-order optimization when generating the plan of the FROM clause of a select statement; requires that all materialized slots are known for an accurate estimate of row sizes needed for cost-based join ordering
  4. assign conjuncts that can be evaluated at that node and compute the stats of that node (cardinality, etc.)
  5. apply combined expression substitution map of child plan nodes; if a plan node re-maps its input, set a substitution map to be applied by parents

具体代码如下:

  /**
   * Generates and returns the root of the single-node plan for the analyzed parse tree
   * in the planner context. 
   */
  public PlanNode createSingleNodePlan() throws ImpalaException {
    QueryStmt queryStmt = ctx_.getQueryStmt();
    // Use the stmt's analyzer which is not necessarily the root analyzer
    // to detect empty result sets.
    Analyzer analyzer = queryStmt.getAnalyzer();
    analyzer.computeEquivClasses();
    analyzer.getTimeline().markEvent("Equivalence classes computed");

    // Mark slots referenced by output exprs as materialized, prior to generating the
    // plan tree.
    // We need to mark the result exprs of the topmost select block as materialized, so
    // that PlanNode.init() can compute the final mem layout of materialized tuples
    // (the byte size of tuples is needed for cost computations).
    // TODO: instead of materializing everything produced by the plan root, derive
    // referenced slots from destination fragment and add a materialization node
    // if not all output is needed by destination fragment
    // TODO 2: should the materialization decision be cost-based?
    if (queryStmt.getBaseTblResultExprs() != null) {
      analyzer.materializeSlots(queryStmt.getBaseTblResultExprs());
    }

    LOG.trace("desctbl: " + analyzer.getDescTbl().debugString());
    PlanNode singleNodePlan = createQueryPlan(queryStmt, analyzer,
        ctx_.getQueryOptions().isDisable_outermost_topn());
    Preconditions.checkNotNull(singleNodePlan);
    return singleNodePlan;
  }

上面的函数通过调用私有的 createQueryPlan()
(SingleNodePlanner.java)
函数实现。该函数为单节点执行创建计划树。为查询语句中的
Select/Project/Join/Union [All]/Group by/Having/Order by
生成 PlanNode。具体实现代码如下:

  /**
   * Create plan tree for single-node execution. Generates PlanNodes for the
   * Select/Project/Join/Union [All]/Group by/Having/Order by clauses of the query stmt.
   */
  private PlanNode createQueryPlan(QueryStmt stmt, Analyzer analyzer, boolean disableTopN)
      throws ImpalaException {
      // Analyzer 检测结果集是否为空,如果是的话直接返回空节点
    if (analyzer.hasEmptyResultSet()) return createEmptyNode(stmt, analyzer);

    PlanNode root;
    if (stmt instanceof SelectStmt) {// 如果是 select 语句
      SelectStmt selectStmt = (SelectStmt) stmt;
      // 创建 SelectPlan
      root = createSelectPlan(selectStmt, analyzer);

      // insert possible AnalyticEvalNode before SortNode
      if (((SelectStmt) stmt).getAnalyticInfo() != null) {
        AnalyticInfo analyticInfo = selectStmt.getAnalyticInfo();
        ArrayList<TupleId> stmtTupleIds = Lists.newArrayList();
        stmt.getMaterializedTupleIds(stmtTupleIds);
        AnalyticPlanner analyticPlanner =
            new AnalyticPlanner(stmtTupleIds, analyticInfo, analyzer, ctx_);
        List<Expr> inputPartitionExprs = Lists.newArrayList();
        AggregateInfo aggInfo = selectStmt.getAggInfo();
        root = analyticPlanner.createSingleNodePlan(root,
            aggInfo != null ? aggInfo.getGroupingExprs() : null, inputPartitionExprs);
        if (aggInfo != null && !inputPartitionExprs.isEmpty()) {
          // analytic computation will benefit from a partition on inputPartitionExprs
          aggInfo.setPartitionExprs(inputPartitionExprs);
        }
      }
    } else {// 否则,创建 UnionPlan
      Preconditions.checkState(stmt instanceof UnionStmt);
      root = createUnionPlan((UnionStmt) stmt, analyzer);
    }

    // 如果 sort 元组有没有物化的槽,避免添加 sort node,
    boolean sortHasMaterializedSlots = false;
    if (stmt.evaluateOrderBy()) {
      for (SlotDescriptor sortSlotDesc:
        stmt.getSortInfo().getSortTupleDescriptor().getSlots()) {
        if (sortSlotDesc.isMaterialized()) {
          sortHasMaterializedSlots = true;
          break;
        }
      }
    }

    if (stmt.evaluateOrderBy() && sortHasMaterializedSlots) {
      long limit = stmt.getLimit();
      // TODO: External sort could be used for very large limits
      // not just unlimited order-by
      boolean useTopN = stmt.hasLimit() && !disableTopN;
      // 创建 sort node
      root = new SortNode(ctx_.getNextNodeId(), root, stmt.getSortInfo(),
          useTopN, stmt.getOffset());
      Preconditions.checkState(root.hasValidStats());
      root.setLimit(limit);
      root.init(analyzer);
    } else {
      root.setLimit(stmt.getLimit());
      root.computeStats(analyzer);
    }

    return root;
  }

SingleNodePlanner.createSelectPlan()
(SingleNodePlanner.java)
函数创建实现 select 查询语句块中
Select/Project/Join/Group by/Having 等从句的 PlanNode 树。具体实现代码如下:

  /**
   * Create tree of PlanNodes that implements the Select/Project/Join/Group by/Having
   * of the selectStmt query block.
   */
  private PlanNode createSelectPlan(SelectStmt selectStmt, Analyzer analyzer)
      throws ImpalaException {
    // no from clause -> materialize the select's exprs with a UnionNode
    // 如果 select 语句没有引用任何 table,创建 ConstantSelectPlan
    if (selectStmt.getTableRefs().isEmpty()) {
      return createConstantSelectPlan(selectStmt, analyzer);
    }

    // Slot materialization:
    // We need to mark all slots as materialized that are needed during the execution
    // of selectStmt, and we need to do that prior to creating plans for the TableRefs
    // (because createTableRefNode() might end up calling computeMemLayout() on one or
    // more TupleDescriptors, at which point all referenced slots need to be marked).
    //
    // For non-join predicates, slots are marked as follows:
    // - for base table scan predicates, this is done directly by ScanNode.init(), which
    //   can do a better job because it doesn't need to materialize slots that are only
    //   referenced for partition pruning, for instance
    // - for inline views, non-join predicates are pushed down, at which point the
    //   process repeats itself.
    selectStmt.materializeRequiredSlots(analyzer);

    ArrayList<TupleId> rowTuples = Lists.newArrayList();
    // collect output tuples of subtrees
    for (TableRef tblRef: selectStmt.getTableRefs()) {
      rowTuples.addAll(tblRef.getMaterializedTupleIds());
    }

    // 如果 select 语句中的 select、project、join 部分返回空结果集
    // 用空集创建满足 select 语句的 AggregationPlan
    // Make sure the slots of the aggregation exprs and the tuples that they reference
    // are materialized (see IMPALA-1960).
    if (analyzer.hasEmptySpjResultSet()) {
      PlanNode emptySetNode = new EmptySetNode(ctx_.getNextNodeId(), rowTuples);
      emptySetNode.init(analyzer);
      emptySetNode.setOutputSmap(selectStmt.getBaseTblSmap());
      return createAggregationPlan(selectStmt, analyzer, emptySetNode);
    }

    // 为 table refs 创建 Plan;这里使用 list 而不是 map 是为了保证生成 join plan
    // 时遍历 TableRefs 有一个确定的顺序
    List<Pair<TableRef, PlanNode>> refPlans = Lists.newArrayList();
    for (TableRef ref: selectStmt.getTableRefs()) {
      PlanNode plan = createTableRefNode(analyzer, ref);
      Preconditions.checkState(plan != null);
      refPlans.add(new Pair(ref, plan));
    }
    // save state of conjunct assignment; needed for join plan generation
    for (Pair<TableRef, PlanNode> entry: refPlans) {
      entry.second.setAssignedConjuncts(analyzer.getAssignedConjuncts());
    }

    PlanNode root = null;
    // 如果有足够的统计数据,例如 join 操作各个 table 的大小,创建开销最小的 JoinPlan
    if (!selectStmt.getSelectList().isStraightJoin()) {
      Set<ExprId> assignedConjuncts = analyzer.getAssignedConjuncts();
      root = createCheapestJoinPlan(analyzer, refPlans);
      if (root == null) analyzer.setAssignedConjuncts(assignedConjuncts);
    }
    // 否则,根据 from 从句中 table 顺序创建 JoinPlan
    if (selectStmt.getSelectList().isStraightJoin() || root == null) {
      // we didn't have enough stats to do a cost-based join plan, or the STRAIGHT_JOIN
      // keyword was in the select list: use the FROM clause order instead
      root = createFromClauseJoinPlan(analyzer, refPlans);
      Preconditions.checkNotNull(root);
    }

    // 如果有聚集操作,创建 AggregationPlan
    if (selectStmt.getAggInfo() != null) {
      root = createAggregationPlan(selectStmt, analyzer, root);
    }

    // All the conjuncts_ should be assigned at this point.
    // TODO: Re-enable this check here and/or elswehere.
    //Preconditions.checkState(!analyzer.hasUnassignedConjuncts());
    return root;
  }

上面函数中调用的主要私有方法有:
createTableRefNode()、createCheapestJoinPlan()、 createFromClauseJoinPlan()、 createAggregationPlan(),各个函数的具体实现如下:

createTableRefNode()

  /**
   * Create a tree of PlanNodes for the given tblRef, which can be a BaseTableRef,
   * CollectionTableRef or an InlineViewRef.
   */
  private PlanNode createTableRefNode(Analyzer analyzer, TableRef tblRef)
      throws ImpalaException {
    if (tblRef instanceof BaseTableRef || tblRef instanceof CollectionTableRef) {
    // 创建 ScanNode
      return createScanNode(analyzer, tblRef);
    } else if (tblRef instanceof InlineViewRef) {
    // 创建 InlineViewPlan
      return createInlineViewPlan(analyzer, (InlineViewRef) tblRef);
    }
    throw new InternalException(
        "Unknown TableRef node: " + tblRef.getClass().getSimpleName());
  }

createCheapestJoinPlan()

  /**
   * 返回物化 join refPlans 中所有 TblRefs 开销最小的 plan
   * 假设 refPlans 中的顺序和查询中的原始顺序相同
   * For this plan:
   * - the plan is executable, ie, all non-cross joins have equi-join predicates
   * - the leftmost scan is over the largest of the inputs for which we can still
   *   construct an executable plan(左边的是最大表)
   * - all rhs's(right hand side?) are in decreasing order of selectiveness (percentage of rows they
   *   eliminate)
   * - outer/cross/semi joins: rhs serialized size is < lhs serialized size;(右边的表比左边的小)
   *   enforced via join inversion, if necessary(否则通过 join 反转实现)
   * Returns null if we can't create an executable plan.
   */
  private PlanNode createCheapestJoinPlan(
      Analyzer analyzer, List<Pair<TableRef, PlanNode>> refPlans)
      throws ImpalaException {
    LOG.trace("createCheapestJoinPlan");
    if (refPlans.size() == 1) return refPlans.get(0).second;

    // collect eligible candidates for the leftmost input; list contains
    // (plan, materialized size)
    ArrayList<Pair<TableRef, Long>> candidates = Lists.newArrayList();
    for (Pair<TableRef, PlanNode> entry: refPlans) {
      TableRef ref = entry.first;
      JoinOperator joinOp = ref.getJoinOp();

      // The rhs table of an outer/semi join can appear as the left-most input if we
      // invert the lhs/rhs and the join op. However, we may only consider this inversion
      // for the very first join in refPlans, otherwise we could reorder tables/joins
      // across outer/semi joins which is generally incorrect. The null-aware
      // left anti-join operator is never considered for inversion because we can't
      // execute the null-aware right anti-join efficiently.
      // TODO: Allow the rhs of any cross join as the leftmost table. This needs careful
      // consideration of the joinOps that result from such a re-ordering (IMPALA-1281).
      if (((joinOp.isOuterJoin() || joinOp.isSemiJoin() || joinOp.isCrossJoin()) &&
          ref != refPlans.get(1).first) || joinOp.isNullAwareLeftAntiJoin()) {
        // ref cannot appear as the leftmost input
        continue;
      }

      PlanNode plan = entry.second;
      if (plan.getCardinality() == -1) {
        // use 0 for the size to avoid it becoming the leftmost input
        // TODO: Consider raw size of scanned partitions in the absence of stats.
        candidates.add(new Pair(ref, new Long(0)));
        LOG.trace("candidate " + ref.getUniqueAlias() + ": 0");
        continue;
      }
      Preconditions.checkNotNull(ref.getDesc());
      long materializedSize =
          (long) Math.ceil(plan.getAvgRowSize() * (double) plan.getCardinality());
      candidates.add(new Pair(ref, new Long(materializedSize)));
      LOG.trace("candidate " + ref.getUniqueAlias() + ": " + Long.toString(materializedSize));
    }
    if (candidates.isEmpty()) return null;

    // order candidates by descending materialized size; we want to minimize the memory
    // consumption of the materialized hash tables required for the join sequence
    Collections.sort(candidates,
        new Comparator<Pair<TableRef, Long>>() {
          public int compare(Pair<TableRef, Long> a, Pair<TableRef, Long> b) {
            long diff = b.second - a.second;
            return (diff < 0 ? -1 : (diff > 0 ? 1 : 0));
          }
        });

		// 根据已经按照大小排序的 table 创建 JoinPlan
    for (Pair<TableRef, Long> candidate: candidates) {
      PlanNode result = createJoinPlan(analyzer, candidate.first, refPlans);
      if (result != null) return result;
    }
    return null;
  }

createFromClauseJoinPlan()

  /**
   * 返回按照 from 语句顺序的 JoinPlan
   */
  private PlanNode createFromClauseJoinPlan(
      Analyzer analyzer, List<Pair<TableRef, PlanNode>> refPlans)
      throws ImpalaException {
    // create left-deep sequence of binary hash joins; assign node ids as we go along
    Preconditions.checkState(!refPlans.isEmpty());
    PlanNode root = refPlans.get(0).second;
    for (int i = 1; i < refPlans.size(); ++i) {
      TableRef innerRef = refPlans.get(i).first;
      PlanNode innerPlan = refPlans.get(i).second;
      root = createJoinNode(analyzer, root, innerPlan, null, innerRef);
      root.setId(ctx_.getNextNodeId());
    }
    return root;
  }

createAggregationPlan()

  /**
   * Returns a new AggregationNode that materializes the aggregation of the given stmt.
   * Assigns conjuncts from the Having clause to the returned node.
   */
  private PlanNode createAggregationPlan(SelectStmt selectStmt, Analyzer analyzer,
      PlanNode root) throws InternalException {
    Preconditions.checkState(selectStmt.getAggInfo() != null);
    // add aggregation, if required
    AggregateInfo aggInfo = selectStmt.getAggInfo();
    root = new AggregationNode(ctx_.getNextNodeId(), root, aggInfo);
    root.init(analyzer);
    Preconditions.checkState(root.hasValidStats());
    // if we're computing DISTINCT agg fns, the analyzer already created the
    // 2nd phase agginfo
    if (aggInfo.isDistinctAgg()) {
      ((AggregationNode)root).unsetNeedsFinalize();
      // The output of the 1st phase agg is the 1st phase intermediate.
      ((AggregationNode)root).setIntermediateTuple();
      root = new AggregationNode(ctx_.getNextNodeId(), root,
          aggInfo.getSecondPhaseDistinctAggInfo());
      root.init(analyzer);
      Preconditions.checkState(root.hasValidStats());
    }
    // add Having clause
    root.assignConjuncts(analyzer);
    return root;
  }

上面的 createCheapestJoinPlan() 和 createFromClauseJoinPlan()
方法调用了 createJoinNode() 和 createJoinPlan() 两个方法。它们的具体实现如下:

createJoinNode()

  /**
   * 创建 join outer 和 inner 的 node。两者其中之一可能是一个根据 table ref 创建的 plan
   * 但不能同时都是 plan。对应的 outer/inner tableRef 不能为空
   */
  private PlanNode createJoinNode(
      Analyzer analyzer, PlanNode outer, PlanNode inner, TableRef outerRef,
      TableRef innerRef) throws ImpalaException {
    Preconditions.checkState(innerRef != null ^ outerRef != null);
    TableRef tblRef = (innerRef != null) ? innerRef : outerRef;

    List<BinaryPredicate> eqJoinConjuncts = Lists.newArrayList();
    List<Expr> eqJoinPredicates = Lists.newArrayList();
    // get eq join predicates for the TableRefs' ids (not the PlanNodes' ids, which
    // are materialized)
    if (innerRef != null) {
      getHashLookupJoinConjuncts(
          analyzer, outer.getTblRefIds(), innerRef, eqJoinConjuncts, eqJoinPredicates);
      // Outer joins should only use On-clause predicates as eqJoinConjuncts.
      if (!innerRef.getJoinOp().isOuterJoin()) {
        analyzer.createEquivConjuncts(outer.getTblRefIds(), innerRef.getId(),
            eqJoinConjuncts);
      }
    } else {
      getHashLookupJoinConjuncts(
          analyzer, inner.getTblRefIds(), outerRef, eqJoinConjuncts, eqJoinPredicates);
      // Outer joins should only use On-clause predicates as eqJoinConjuncts.
      if (!outerRef.getJoinOp().isOuterJoin()) {
        analyzer.createEquivConjuncts(inner.getTblRefIds(), outerRef.getId(),
            eqJoinConjuncts);
      }
      // Reverse the lhs/rhs of the join conjuncts.
      for (BinaryPredicate eqJoinConjunct: eqJoinConjuncts) {
        Expr swapTmp = eqJoinConjunct.getChild(0);
        eqJoinConjunct.setChild(0, eqJoinConjunct.getChild(1));
        eqJoinConjunct.setChild(1, swapTmp);
      }
    }

    // 处理隐含交叉 join
    if (eqJoinConjuncts.isEmpty()) {
      // Since our only implementation of semi and outer joins is hash-based, and we do
      // not re-order semi and outer joins, we must have eqJoinConjuncts here to execute
      // this query.
      // TODO: Revisit when we add more semi/join implementations. Pick up and pass in
      // the otherJoinConjuncts.
      if (tblRef.getJoinOp().isOuterJoin() ||
          tblRef.getJoinOp().isSemiJoin()) {
        throw new NotImplementedException(
            String.format("%s join with '%s' without equi-join " +
            "conjuncts is not supported.",
            tblRef.getJoinOp().isOuterJoin() ? "Outer" : "Semi",
            innerRef.getUniqueAlias()));
      }
      CrossJoinNode result =
          new CrossJoinNode(outer, inner, tblRef, Collections.<Expr>emptyList());
      result.init(analyzer);
      return result;
    }

    // 处理显式交叉 join
    if (tblRef.getJoinOp() == JoinOperator.CROSS_JOIN) {
      tblRef.setJoinOp(JoinOperator.INNER_JOIN);
    }

    analyzer.markConjunctsAssigned(eqJoinPredicates);

    List<Expr> otherJoinConjuncts = Lists.newArrayList();
    if (tblRef.getJoinOp().isOuterJoin()) {// 外连接
      // Also assign conjuncts from On clause. All remaining unassigned conjuncts
      // that can be evaluated by this join are assigned in createSelectPlan().
      otherJoinConjuncts = analyzer.getUnassignedOjConjuncts(tblRef);
    } else if (tblRef.getJoinOp().isSemiJoin()) {// 半连接
      // Unassigned conjuncts bound by the invisible tuple id of a semi join must have
      // come from the join's On-clause, and therefore, must be added to the other join
      // conjuncts to produce correct results.
      otherJoinConjuncts =
          analyzer.getUnassignedConjuncts(tblRef.getAllTupleIds(), false);
      if (tblRef.getJoinOp().isNullAwareLeftAntiJoin()) {// 对空值敏感的反连接
        boolean hasNullMatchingEqOperator = false;
        // Keep only the null-matching eq conjunct in the eqJoinConjuncts and move
        // all the others in otherJoinConjuncts. The BE relies on this
        // separation for correct execution of the null-aware left anti join.
        Iterator<BinaryPredicate> it = eqJoinConjuncts.iterator();
        while (it.hasNext()) {
          BinaryPredicate conjunct = it.next();
          if (!conjunct.isNullMatchingEq()) {
            otherJoinConjuncts.add(conjunct);
            it.remove();
          } else {
            // Only one null-matching eq conjunct is allowed
            Preconditions.checkState(!hasNullMatchingEqOperator);
            hasNullMatchingEqOperator = true;
          }
        }
        Preconditions.checkState(hasNullMatchingEqOperator);
      }
    }
    analyzer.markConjunctsAssigned(otherJoinConjuncts);

    HashJoinNode result =
        new HashJoinNode(outer, inner, tblRef, eqJoinConjuncts, otherJoinConjuncts);
    result.init(analyzer);
    return result;
  }

createJoinPlan()

  /**
   * Returns a plan with leftmostRef's plan as its leftmost input; the joins
   * are in decreasing order of selectiveness (percentage of rows they eliminate).
   * The leftmostRef's join will be inverted if it is an outer/semi/cross join.
   */
  private PlanNode createJoinPlan(
      Analyzer analyzer, TableRef leftmostRef, List<Pair<TableRef, PlanNode>> refPlans)
      throws ImpalaException {

    LOG.trace("createJoinPlan: " + leftmostRef.getUniqueAlias());
    // 等待 join 的 tableref
    List<Pair<TableRef, PlanNode>> remainingRefs = Lists.newArrayList();
    PlanNode root = null;  // root of accumulated join plan
    for (Pair<TableRef, PlanNode> entry: refPlans) {
      if (entry.first == leftmostRef) {
        root = entry.second;
      } else {
        remainingRefs.add(entry);
      }
    }
    Preconditions.checkNotNull(root);
    // 已经 join 的 refs;joinedRefs 和 remainingRefs 中 refs 的 union 就是所有 table refs
    Set<TableRef> joinedRefs = Sets.newHashSet();
    joinedRefs.add(leftmostRef);

    // 如果最左边的 TblRef 是 outer/semi/cross join,反转
    boolean planHasInvertedJoin = false;
    if (leftmostRef.getJoinOp().isOuterJoin()
        || leftmostRef.getJoinOp().isSemiJoin()
        || leftmostRef.getJoinOp().isCrossJoin()) {
      // TODO: Revisit the interaction of join inversion here and the analysis state
      // that is changed in analyzer.invertOuterJoin(). Changing the analysis state
      // should not be necessary because the semantics of an inverted outer join do
      // not change.
      leftmostRef.invertJoin(refPlans, analyzer);
      planHasInvertedJoin = true;
    }

    long numOps = 0;
    int i = 0;
    while (!remainingRefs.isEmpty()) {
      // Join 链中的每一步都最小化结果数目,从而最小化 hash table 查找
      PlanNode newRoot = null;
      Pair<TableRef, PlanNode> minEntry = null;
      for (Pair<TableRef, PlanNode> entry: remainingRefs) {
        TableRef ref = entry.first;
        LOG.trace(Integer.toString(i) + " considering ref " + ref.getUniqueAlias());

        // Determine whether we can or must consider this join at this point in the plan.
        // Place outer/semi joins at a fixed position in the plan tree (IMPALA-860),
        // s.t. all the tables appearing to the left/right of an outer/semi join in
        // the original query still remain to the left/right after join ordering. This
        // prevents join re-ordering across outer/semi joins which is generally wrong.
        // The checks below relies on remainingRefs being in the order as they originally
        // appeared in the query.
        JoinOperator joinOp = ref.getJoinOp();
        if (joinOp.isOuterJoin() || joinOp.isSemiJoin()) {
          List<TupleId> currentTids = Lists.newArrayList(root.getTblRefIds());
          currentTids.add(ref.getId());
          // Place outer/semi joins at a fixed position in the plan tree. We know that
          // the join resulting from 'ref' must become the new root if the current
          // root materializes exactly those tuple ids corresponding to TableRefs
          // appearing to the left of 'ref' in the original query.
          List<TupleId> tableRefTupleIds = ref.getAllTupleIds();
          if (!currentTids.containsAll(tableRefTupleIds) ||
              !tableRefTupleIds.containsAll(currentTids)) {
            // Do not consider the remaining table refs to prevent incorrect re-ordering
            // of tables across outer/semi/anti joins.
            break;
          }
        } else if (ref.getJoinOp().isCrossJoin()) {
          if (!joinedRefs.contains(ref.getLeftTblRef())) continue;
        }

        PlanNode rhsPlan = entry.second;
        analyzer.setAssignedConjuncts(root.getAssignedConjuncts());

        boolean invertJoin = false;
        if (joinOp.isOuterJoin() || joinOp.isSemiJoin() || joinOp.isCrossJoin()) {
          // Invert the join if doing so reduces the size of build-side hash table
          // (may also reduce network costs depending on the join strategy).
          // Only consider this optimization if both the lhs/rhs cardinalities are known.
          // The null-aware left anti-join operator is never considered for inversion
          // because we can't execute the null-aware right anti-join efficiently.
          long lhsCard = root.getCardinality();
          long rhsCard = rhsPlan.getCardinality();
          if (lhsCard != -1 && rhsCard != -1 &&
              lhsCard * root.getAvgRowSize() < rhsCard * rhsPlan.getAvgRowSize() &&
              !joinOp.isNullAwareLeftAntiJoin()) {
            invertJoin = true;
          }
        }
        PlanNode candidate = null;
        if (invertJoin) {
          ref.setJoinOp(ref.getJoinOp().invert());
          candidate = createJoinNode(analyzer, rhsPlan, root, ref, null);
          planHasInvertedJoin = true;
        } else {
          candidate = createJoinNode(analyzer, root, rhsPlan, null, ref);
        }
        if (candidate == null) continue;
        LOG.trace("cardinality=" + Long.toString(candidate.getCardinality()));

        // Use 'candidate' as the new root; don't consider any other table refs at this
        // position in the plan.
        if (joinOp.isOuterJoin() || joinOp.isSemiJoin()) {
          newRoot = candidate;
          minEntry = entry;
          break;
        }

        // 优先选择 Hash Join 而不是 Cross Join, due to limited costing infrastructure
        if (newRoot == null
            || (candidate.getClass().equals(newRoot.getClass())
                && candidate.getCardinality() < newRoot.getCardinality())
            || (candidate instanceof HashJoinNode && newRoot instanceof CrossJoinNode)) {
          newRoot = candidate;
          minEntry = entry;
        }
      }
      if (newRoot == null) {
        // Currently, it should not be possible to invert a join for a plan that turns
        // out to be non-executable because (1) the joins we consider for inversion are
        // barriers in the join order, and (2) the caller of this function only considers
        // other leftmost table refs if a plan turns out to be non-executable.
        // TODO: This preconditions check will need to be changed to undo the in-place
        // modifications made to table refs for join inversion, if the caller decides to
        // explore more leftmost table refs.
        Preconditions.checkState(!planHasInvertedJoin);
        return null;
      }

      // we need to insert every rhs row into the hash table and then look up
      // every lhs row
      long lhsCardinality = root.getCardinality();
      long rhsCardinality = minEntry.second.getCardinality();
      numOps += lhsCardinality + rhsCardinality;
      LOG.debug(Integer.toString(i) + " chose " + minEntry.first.getUniqueAlias()
          + " #lhs=" + Long.toString(lhsCardinality)
          + " #rhs=" + Long.toString(rhsCardinality)
          + " #ops=" + Long.toString(numOps));
      remainingRefs.remove(minEntry);
      joinedRefs.add(minEntry.first);
      root = newRoot;
      // assign id_ after running through the possible choices in order to end up
      // with a dense sequence of node ids
      root.setId(ctx_.getNextNodeId());
      analyzer.setAssignedConjuncts(root.getAssignedConjuncts());
      ++i;
    }

    return root;
  }

至此我们已经大概介绍了 createSingleNodePlan 的过程。
现在让我们回到 createPlan() 函数,来看看创建分布式执行计划树,即 createPlanFrangments 过程。

DistributedPlanner.createPlanFragments()
(Planner.java)
方法为单点计划树生成多个片段。具体代码如下:

  /**
   * 根据一些执行选项为单点计划树创建多个片段
   * 片段通过 list 返回,list 中位置 i 的片段只能使用片段 j 的输出(j > i)。
   *
   * TODO: 考虑计划片段中的数据分片; 尤其是要比 createQueryPlan() 更加注重协调
   * 聚集操作中 hash partitioning 以及分析计算中的 hash partitioning。
   * (只有在相同 select 块中进行聚集和分析计算时才会发生协调)
   */
  public ArrayList<PlanFragment> createPlanFragments(
      PlanNode singleNodePlan) throws ImpalaException {
    Preconditions.checkState(!ctx_.isSingleNodeExec());
    AnalysisContext.AnalysisResult analysisResult = ctx_.getAnalysisResult();
    QueryStmt queryStmt = ctx_.getQueryStmt();
    ArrayList<PlanFragment> fragments = Lists.newArrayList();
    // 对于 insert 或 CTAS,除非有 limit 限制才保持根片段 partitioned
    // 否则,合并所有为一个单独的 coordinator fragment 以便传回到客户端
    boolean isPartitioned = false;
    if ((analysisResult.isInsertStmt() || analysisResult.isCreateTableAsSelectStmt())
        && !singleNodePlan.hasLimit()) {
      Preconditions.checkState(!queryStmt.hasOffset());
      isPartitioned = true;
    }
    LOG.debug("create plan fragments");
    long perNodeMemLimit = ctx_.getQueryOptions().mem_limit;
    LOG.debug("memlimit=" + Long.toString(perNodeMemLimit));
    // 调用私有方法
    createPlanFragments(singleNodePlan, isPartitioned, perNodeMemLimit, fragments);
    return fragments;
  }

上面的方法调用私有成员方法 DistributedPlanner.createPlanFragments()
DistributedPlanner.java
该方法返回生成 root 结果的 fragments。具体代码如下:

	/**
   * 返回生成 'root' 结果的 fragments; 递归创建所有 input fragments 到返回的 fragment
   * 如果创建了一个新的 fragment,会被追加到 ‘fragments’,这样 fragment 就会在所有需要
   * 它们的输出的 fragments 前面。
   * 如果 'isPartitioned' 为否,,那么返回的 fragment 就是 unpartitioned;
   * 否则就可能是 partitioned, 取决于它的输入是否 partitioned; 
   * the partition function is derived from the inputs.
   */
  private PlanFragment createPlanFragments(
      PlanNode root, boolean isPartitioned,
      long perNodeMemLimit, ArrayList<PlanFragment> fragments)
      throws InternalException, NotImplementedException {
    ArrayList<PlanFragment> childFragments = Lists.newArrayList();
    for (PlanNode child: root.getChildren()) {
      // 允许子 fragments 是 partition 的,除非它们保护 limit 从句。
      // (因为包含 limit 限制的结果集需要集中计算);
      // 如果需要的话在后面合并
      boolean childIsPartitioned = !child.hasLimit();
      // 递归调用 createPlanFragments,将 child 创建的 PlanFragments 添加到 childFragments
      childFragments.add(
          createPlanFragments(
            child, childIsPartitioned, perNodeMemLimit, fragments));
    }
		// 根据 root 的不同 Node 类型创建不同的 Fragment
    PlanFragment result = null;
    if (root instanceof ScanNode) {
      result = createScanFragment(root);
      fragments.add(result);
    } else if (root instanceof HashJoinNode) {
      Preconditions.checkState(childFragments.size() == 2);
      result = createHashJoinFragment(
          (HashJoinNode) root, childFragments.get(1), childFragments.get(0),
          perNodeMemLimit, fragments);
    } else if (root instanceof CrossJoinNode) {
      Preconditions.checkState(childFragments.size() == 2);
      result = createCrossJoinFragment(
          (CrossJoinNode) root, childFragments.get(1), childFragments.get(0),
          perNodeMemLimit, fragments);
    } else if (root instanceof SelectNode) {
      result = createSelectNodeFragment((SelectNode) root, childFragments);
    } else if (root instanceof UnionNode) {
      result = createUnionNodeFragment((UnionNode) root, childFragments, fragments);
    } else if (root instanceof AggregationNode) {
      result = createAggregationFragment(
          (AggregationNode) root, childFragments.get(0), fragments);
    } else if (root instanceof SortNode) {
      if (((SortNode) root).isAnalyticSort()) {
        // don't parallelize this like a regular SortNode
        result = createAnalyticFragment(
            (SortNode) root, childFragments.get(0), fragments);
      } else {
        result = createOrderByFragment(
            (SortNode) root, childFragments.get(0), fragments);
      }
    } else if (root instanceof AnalyticEvalNode) {
      result = createAnalyticFragment(root, childFragments.get(0), fragments);
    } else if (root instanceof EmptySetNode) {
      result = new PlanFragment(
          ctx_.getNextFragmentId(), root, DataPartition.UNPARTITIONED);
    } else {
      throw new InternalException(
          "Cannot create plan fragment for this node type: " + root.getExplainString());
    }
    // move 'result' to end, it depends on all of its children
    fragments.remove(result);
    fragments.add(result);
		// 如果已经分区,还需要创建 MergeFragment
    if (!isPartitioned && result.isPartitioned()) {
      result = createMergeFragment(result);
      fragments.add(result);
    }

    return result;
  }

上面的方法调用了大量的 create*Fragment() 私有成员方法。这些成员方法的具体实现可以查看源文件:
DistributedPlanner.java

这些成员方法都返回了 PlanFragment 实例,关于该类的具体实现可以查看源代码:
PlanFragment.java

至此,我们大概介绍了 createPlanFragments 的过程。

由于 createSingleNodePlan 和 createPlanFragments 两个 createPlan 最重要的部分都已经介绍了,
createPlan 也就介绍到这里。现在让我们回到 frontend.createExecRequest()
继续来看剩下的内容。frontend.createExecRequest() 其余代码如下:

	/**
   * Create a populated TExecRequest corresponding to the supplied TQueryCtx.
   */
  public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
      throws ImpalaException {
    .
    .
    .
    .
    .

    // 设置 fragment 的目的地
    for (int i = 1; i < fragments.size(); ++i) {
      PlanFragment dest = fragments.get(i).getDestFragment();
      Integer idx = fragmentIdx.get(dest);
      Preconditions.checkState(idx != null);
      queryExecRequest.addToDest_fragment_idx(idx.intValue());
    }

    // 为 Scan node 设置 scan 范围/位置
    // Also assemble list of tables names missing stats for assembling a warning message.
    LOG.debug("get scan range locations");
    Set<TTableName> tablesMissingStats = Sets.newTreeSet();
    for (ScanNode scanNode: scanNodes) {
      queryExecRequest.putToPer_node_scan_ranges(
          scanNode.getId().asInt(),
          scanNode.getScanRangeLocations());
      if (scanNode.isTableMissingStats()) {
        tablesMissingStats.add(scanNode.getTupleDesc().getTableName().toThrift());
      }
    }
    // 设置主机列表
    queryExecRequest.setHost_list(analysisResult.getAnalyzer().getHostIndex().getList());
    for (TTableName tableName: tablesMissingStats) {
      queryCtx.addToTables_missing_stats(tableName);
    }

    // Optionally disable spilling in the backend. Allow spilling if there are plan hints
    // or if all tables have stats.
    if (queryCtx.request.query_options.isDisable_unsafe_spills()
        && !tablesMissingStats.isEmpty()
        && !analysisResult.getAnalyzer().hasPlanHints()) {
      queryCtx.setDisable_spilling(true);
    }

    // 计算资源需求,因为 scan node 的开销估计取决于这些
    try {
      planner.computeResourceReqs(fragments, true, queryExecRequest);
    } catch (Exception e) {
      // 将异常转换为警告,以便查询能继续执行
      LOG.error("Failed to compute resource requirements for query\n" +
          queryCtx.request.getStmt(), e);
    }

    // 到了这里 fragment 所有信息都设置好了,序列化到 Thrift
    for (PlanFragment fragment: fragments) {
      TPlanFragment thriftFragment = fragment.toThrift();
      queryExecRequest.addToFragments(thriftFragment);
    }

    // Use VERBOSE by default for all non-explain statements.
    TExplainLevel explainLevel = TExplainLevel.VERBOSE;
    // Use the query option for explain stmts and tests (e.g., planner tests).
    if (analysisResult.isExplainStmt() || RuntimeEnv.INSTANCE.isTestEnv()) {
      explainLevel = queryCtx.request.query_options.getExplain_level();
    }

    // Global query parameters to be set in each TPlanExecRequest.
    queryExecRequest.setQuery_ctx(queryCtx);

    explainString.append(
        planner.getExplainString(fragments, queryExecRequest, explainLevel));
    queryExecRequest.setQuery_plan(explainString.toString());
    queryExecRequest.setDesc_tbl(analysisResult.getAnalyzer().getDescTbl().toThrift());

    String jsonLineageGraph = analysisResult.getJsonLineageGraph();
    if (jsonLineageGraph != null && !jsonLineageGraph.isEmpty()) {
      queryExecRequest.setLineage_graph(jsonLineageGraph);
    }

    if (analysisResult.isExplainStmt()) {
      // Return the EXPLAIN request
      createExplainRequest(explainString.toString(), result);
      return result;
    }

    result.setQuery_exec_request(queryExecRequest);

    if (analysisResult.isQueryStmt()) {
      // 填充元数据
      LOG.debug("create result set metadata");
      result.stmt_type = TStmtType.QUERY;
      result.query_exec_request.stmt_type = result.stmt_type;
      TResultSetMetadata metadata = new TResultSetMetadata();
      QueryStmt queryStmt = analysisResult.getQueryStmt();
      int colCnt = queryStmt.getColLabels().size();
      for (int i = 0; i < colCnt; ++i) {
        TColumn colDesc = new TColumn();
        colDesc.columnName = queryStmt.getColLabels().get(i);
        colDesc.columnType = queryStmt.getResultExprs().get(i).getType().toThrift();
        metadata.addToColumns(colDesc);
      }
      result.setResult_set_metadata(metadata);
    } else {
      Preconditions.checkState(analysisResult.isInsertStmt() ||
          analysisResult.isCreateTableAsSelectStmt());

      // For CTAS the overall TExecRequest statement type is DDL, but the
      // query_exec_request should be DML
      result.stmt_type =
          analysisResult.isCreateTableAsSelectStmt() ? TStmtType.DDL : TStmtType.DML;
      result.query_exec_request.stmt_type = TStmtType.DML;

      // create finalization params of insert stmt
      InsertStmt insertStmt = analysisResult.getInsertStmt();
      if (insertStmt.getTargetTable() instanceof HdfsTable) {
        TFinalizeParams finalizeParams = new TFinalizeParams();
        finalizeParams.setIs_overwrite(insertStmt.isOverwrite());
        finalizeParams.setTable_name(insertStmt.getTargetTableName().getTbl());
        finalizeParams.setTable_id(insertStmt.getTargetTable().getId().asInt());
        String db = insertStmt.getTargetTableName().getDb();
        finalizeParams.setTable_db(db == null ? queryCtx.session.database : db);
        HdfsTable hdfsTable = (HdfsTable) insertStmt.getTargetTable();
        finalizeParams.setHdfs_base_dir(hdfsTable.getHdfsBaseDir());
        finalizeParams.setStaging_dir(
            hdfsTable.getHdfsBaseDir() + "/_impala_insert_staging");
        queryExecRequest.setFinalize_params(finalizeParams);
      }
    }

    validateTableIds(analysisResult.getAnalyzer(), result);

    timeline.markEvent("Planning finished");
    result.setTimeline(analysisResult.getAnalyzer().getTimeline().toThrift());
    return result;
  }

至此,FE 结束,返回 TExecRequest 型的对象给 backend 执行。

由于笔者刚开始接触 Impala,分析可能存在某些谬误,有任何疑问或建议都欢迎讨论。


Reference:
Impala Main Page
Impala Github Project
Impala 简介

Tagged on:

3 thoughts on “Impala 源码分析-FE

发表评论

电子邮件地址不会被公开。