Impala 简介

Impala 架构

Impala 是 Cloudera 在受到 Google 的 Dremel 启发下开发的实时交互 SQL 大数据查询工具,Impala 没有再使用缓慢的 Hive+MapReduce 批处理,而是通过使用与商用并行关系数据库中类似的分布式查询引擎(由 Query Planner、Query Coordinator 和 Query Exec Engine 三部分组成),可以直接从 HDFS 或 HBase 中用 SELECT、JOIN 和统计函数查询数据,从而大大降低了延迟。其架构如图 1所示,Impala 主要由 Impalad,State Store,Impala Catalog Service 和 CLI组成。

图 1

  • Impala Daemon: 与 DataNode 运行在同一节点上,由 Impalad 进程表示,它接收客户端的查询请求(接收查询请求的 Impalad 为 Coordinator,Coordinator 通过 JNI 调用 java 前端解释 SQL 查询语句,生成查询计划树,再通过调度器把执行计划分发给具有相应数据的其它 Impalad 进行执行),读写数据,并行执行查询,并把结果通过网络流式的传送回给 Coordinator,由 Coordinator 返回给客户端。同时 Impalad 也与 State Store 保持连接,用于确定哪个 Impalad 是健康和可以接受新的工作。在 Impalad 中启动一个 ImpalaServer 服务和三个 ThriftServer:
    1. beeswax_server: 连接客户端
    2. hs2_server: 借用Hive元数据
    3. be_server: Impalad内部使用
  • Impala State Store: 跟踪集群中的 Impalad 的健康状态及位置信息,由 statestored 进程表示,它通过创建多个线程来处理 Impalad 的注册订阅和与各 Impalad 保持心跳连接,各 Impalad 都会缓存一份 State Store 中的信息,当 State Store 离线后(Impalad 发现 State Store 处于离线时,会进入 recovery 模式,反复注册,当 State Store 重新加入集群后,自动恢复正常,更新缓存数据)因为 Impalad 有 State Store 的缓存仍然可以工作,但会因为有些 Impalad 失效了,而已缓存数据无法更新,导致把执行计划分配给了失效的 Impalad,导致查询失败。
  • Impala Catalog service:被称为 catalog service 的 Impala 组件中继 SQL 语句导致的元数据更改到集群中的所有节点。通常由物理进程 catalogd 表示。集群中只需要一个这样的节点。由于请求通过 statestored,因此 statestored 和 catalogd 可以运行在同一节点。
  • CLI:提供给用户查询使用的命令行工具(Impala Shell 使用 python 实现),同时 Impala 还提供了 Hue,JDBC, ODBC 使用接口。

与 Hive 的关系

Impala 与 Hive 都是构建在 Hadoop 之上的数据查询工具各有不同的侧重适应面,但从客户端使用来看 Impala 与 Hive 有很多的共同之处,如数据表元数据、ODBC/JDBC 驱动、SQL 语法、灵活的文件格式、存储资源池等。Impala 与 Hive 在 Hadoop 中的关系如图 2所示。Hive 适合于长时间的批处理查询分析,而 Impala 适合于实时交互式 SQL 查询,Impala 给数据分析人员提供了快速实验、验证想法的大数 据分析工具。可以先使用 hive 进行数据转换处理,之后使用 Impala 在 Hive 处理后的结果数据集上进行快速的数据分析。

图 2

Impala 的查询处理过程

Impalad 分为 Java 前端与 C++ 处理后端,接受客户端连接的 Impalad 即作为这次查询的 Coordinator,Coordinator 通过 JNI 调用 Java 前端对用户的查询 SQL 进行分析生成执行计划树,不同的操作对应不用的 PlanNode, 如:SelectNode, ScanNode, SortNode, AggregationNode, HashJoinNode 等等。

执行计划树的每个原子操作由一个 PlanFragment 表示,通常一条查询语句由多个 Plan Fragment 组成, Plan Fragment 0 表示执行树的根,汇聚结果返回给用户,执行树的叶子结点一般是 Scan 操作,分布式并行执行。

Java 前端产生的执行计划树以 Thrift 数据格式返回给 Impala C++ 后端(Coordinator)(执行计划分为多个阶段,每一个阶段叫做一个 PlanFragment,每一个 PlanFragment 在执行时可以由多个 Impalad 实例并行执行(有些 PlanFragment 只能由一个 Impalad 实例执行,如聚合操作),整个执行计划为一执行计划树),由 Coordinator 根据执行计划,数据存储信息(Impala 通过 libhdfs 与 HDFS 进行交互。通过 hdfsGetHosts 方法获得文件数据块所在节点的位置信息),通过调度器(现在只有 simple-scheduler, 使用 round-robin 算法)Coordinator::Exec 对生成的执行计划树分配给相应的后端执行器 Impalad 执行(查询会使用 LLVM 进行代码生成,编译,执行。

对于使用 LLVM 如何提高性能这里有说明),通过调用 GetNext()方法获取计算结果,如果是 insert 语句,则将计算结果通 过libhdfs 写回 HDFS 当所有输入数据被消耗光,执行结束,之后注销此次查询服务。

Impala 的查询处理流程大概如图3所示:

图 3

下面以一个 SQL 查询语句为例分析 Impala 的查询处理流程。如 select sum(id), count(id), avg(id) from customer_small group by id; 以此语句生成的计划为:

PLAN FRAGMENT 0
 PARTITION: UNPARTITIONED
4:EXCHANGE
 tuple ids: 1
PLAN FRAGMENT 1
 PARTITION: HASH_PARTITIONED: <slot 1>
STREAM DATA SINK
 EXCHANGE ID: 4
 UNPARTITIONED
3:AGGREGATE
 |  output: SUM(<slot 2>), SUM(<slot 3>)
 |  group by: <slot 1>
 |  tuple ids: 1
 |
 2:EXCHANGE
 tuple ids: 1
PLAN FRAGMENT 2
 PARTITION: RANDOM
STREAM DATA SINK
 EXCHANGE ID: 2
 HASH_PARTITIONED: <slot 1>
1:AGGREGATE
 |  output: SUM(id), COUNT(id)
 |  group by: id
 |  tuple ids: 1
 |
 0:SCAN HDFS
 table=default.customer_small #partitions=1 size=193B
 tuple ids: 0

执行行计划树如图 4所示,绿色的部分为可以分布式并行执行:


图 4

下面是 Impala 中一个查询流程的形象描述

Step 1 : 客户端发送一个查询到集群中的某一个 Impala Daemon

Impala Query Life Cycle

Step 2 : Impala Coordinator 在可用节点上初始化查询执行

Impala Query Life Cycle

Step 3: 每个工作 Daemon 收集本地结果发送回 coordinator 节点

Impala Query Life Cycle

Step 4 : 接收查询的 Impala Daemon 将查询结果返回给客户端

Impala Query Life Cycle

Impala 相对于 Hive 所使用的优化技术

  • 1、没有使用 MapReduce 进行并行计算,虽然 MapReduce 是非常好的并行计算框架,但它更多的面向批处理模式,而不是面向交互式的 SQL 执行。与 MapReduce 相比:Impala 把整个查询分成一执行计划树,而不是一连串的 MapReduce 任务,在分发执行计划后,Impala 使用拉式获取数据的方式获取结果,把结果数据组成按执行树流式传递汇集,减少的了把中间结果写入磁盘的步骤,再从磁盘读取数据的开销。Impala 使用服务的方式避免每次执行查询都需要启动的开销,即相比 Hive 没了 MapReduce 启动时间。
  • 2、使用 LLVM 产生运行代码,针对特定查询生成特定代码,同时使用 Inline 的方式减少函数调用的开销,加快执行效率。
  • 3、用 C++ 实现,做了很多有针对性的硬件优化,充分利用可用的硬件指令(SSE4.2)。
  • 4、使用了支持 Data locality 的 I/O 调度机制,尽可能地将数据和计算分配在同一台机器上进行,减少了网络开销。Impala 知道数据块所在的磁盘位置能够更好的利用多磁盘的优势,同时 Impala 支持直接数据块读取和本地代码计算 checksum。
  • 5、通过选择合适的数据存储格式可以得到最好的性能(Impala 支持多种存储格式)。
  • 6、最大使用内存,中间结果不写磁盘,及时通过网络以 stream 的方式传递。

Impala 与 Hive 的异同

  • 数据存储:使用相同的存储数据池都支持把数据存储于 HDFS, HBase。
  • 元数据:两者使用相同的元数据。
  • SQL解释处理:比较相似都是通过词法分析生成执行计划。

执行计划

  • Hive: 依赖于 MapReduce 执行框架,执行计划分成 map->shuffle->reduce->map->shuffle->reduce… 的模型。如果一个 Query 会被编译成多轮 MapReduce,则会有更多的写中间结果。由于 MapReduce 执行框架本身的特点,过多的中间过程会增加整个 Query 的执行时间。
  • Impala: 把执行计划表现为一棵完整的执行计划树,可以更自然地分发执行计划到各个 Impalad 执行查询,而不用像 Hive 那样把它组合成管道型的 map->reduce 模式,以此保证 Impala 有更好的并发性和避免不必要的中间 sort 与 shuffle。

数据流

  • Hive: 采用推的方式,每一个计算节点计算完成后将数据主动推给后续节点。
  • Impala: 采用拉的方式,后续节点通过 getNext 主动向前面节点要数据,以此方式数据可以流式的返回给客户端,且只要有1条数据被处理完,就可以立即展现出来,而不用等到全部处理完成,更符合 SQL 交互式查询使用。

内存使用

  • Hive: 在执行过程中如果内存放不下所有数据,则会使用外存,以保证 Query 能顺序执行完。每一轮 MapReduce 结束,中间结果也会写入 HDFS 中,同样由于 MapReduce 执行架构的特性,shuffle 过程也会有写本地磁盘的操作。
  • Impala: 在遇到内存放不下数据时,当前版本1.0.1是直接返回错误,而不会利用外存,以后版本应该会进行改进。这使用得 Impala 目前处理 Query 会受到一定的限制,最好还是与 Hive 配合使用。Impala 在多个阶段之间利用网络传输数据,在执行过程不会有写磁盘的操作(insert 除外)。

调度

  • Hive: 任务调度依赖于 Hadoop 的调度策略。
  • Impala: 调度由自己完成,目前只有一种调度器 simple-schedule,它会尽量满足数据的局部性,扫描数据的进程尽量靠近数据本身所在的物理机器。调度器目前还比较简单,在 SimpleScheduler::GetBackend 中可以看到,现在还没有考虑负载,网络 IO 状况等因素进行调度。但目前 Impala 已经有对执行过程的性能统计分析,应该以后版本会利用这些统计信息进行调度吧。

容错

  • Hive: 依赖于 Hadoop 的容错能力。
  • Impala: 在查询过程中,没有容错逻辑,如果在执行过程中发生故障,则直接返回错误(这与 Impala 的设计有关,因为 Impala 定位于实时查询,一次查询失败, 再查一次就好了,再查一次的成本很低)。但从整体来看,Impala 是能很好的容错,所有的 Impalad 是对等的结构,用户可以向任何一个 Impalad 提交查询,如果一个 Impalad 失效,其上正在运行的所有 Query 都将失败,但用户可以重新提交查询由其它 Impalad 代替执行,不 会影响服务。对于 State Store 目前只有一个,但当 State Store 失效,也不会影响服务,每个 Impalad 都缓存了 State Store 的信息,只是不能再更新集群状态,有可能会把执行任务分配给已经失效的 Impalad 执行,导致本次 Query 失败。

适用面

  • Hive: 复杂的批处理查询任务,数据转换任务。
  • Impala:实时数据分析,因为不支持 UDF,能处理的问题域有一定的限制,与 Hive 配合使用,对 Hive 的结果数据集进行实时分析。

Impala 的优缺点

优点

  1. 支持 SQL 查询,快速查询大数据。
  2. 可以对已有数据进行查询,减少数据的加载,转换。
  3. 多种存储格式可以选择(Parquet, Text, Avro, RCFile, SequeenceFile)。
  4. 可以与 Hive 配合使用。

缺点

  1. 不支持用户定义函数 UDF。
  2. 不支持 text 域的全文搜索。
  3. 不支持 Transforms。
  4. 不支持查询期的容错。
  5. 对内存要求高。

RPC

Component Service Port Access Requirement Comment
ImpalaDaemon Impala Daemon Backend Port 22000 Internal ImpalaBackendService export
Impala Daemon Frontend Port 21000 External ImpalaService export
Impala Daemon HTTP Server Port 25000 External Impala debug web server
StateStoreSubscriber Service Port 23000 Internal StateStoreSubscriberService
ImpalaStateStore Daemon StateStore HTTP Server Port 25010 External StateStore debug web server
StateStore Service Port 24000 Internal StateStoreService export

下面介绍三个组件之间的 Thrift RPC(“<->” 前面的表示 RPC client,“<->” 后面的表示 RPC server)

(1)Client <-> impalad(frontend)

BeeswaxService(beeswax.thrift): client 通过 query() 提交 SQL 请求,然后异步调用 get_state() 监听该 SQL 的查询进度,一旦完成,调用 fetch() 取回结果。

TCLIService(cli_service.thrift): client 提交 SQL 请求,功能和上面类似,更丰富的就是对 DDL 操作的支持,例如 GetTables() 返回指定 table 的元数据。

ImpalaService 和 ImpalaHiveServer2Service(ImpalaService.thrift) 分别是上面两个类的子类,各自丰富了点功能而已,核心功能没啥大变化。

(2)Impalad(backend) <-> statestored

StateStoreService(StateStoreService.thrift): statestored 保存整个系统所有 backend service 状态的全局数据库,这里是个单节点中央数据交换中心(该节点保存的状态是 soft state,一旦宕机,保存的状态信息就没了)。例如每个 impala backend 启动的时候会调用 StateStoreService.RegisterService() 向 statestored 注册自己(其实是通过跟这个 backend service 捆绑在一起的 StateStoreSubscriber 标识的),然后再调用 StateStoreService.RegisterSubscription()表明这个 StateStoreSubscriber 接收来自 statestored 的 update。

(3)Statestord <-> impalad(backend)

StateStoreSubscriberService(StateStoreSubscriberService.thrift): backend 向 statestored 调用 RegisterSubscription 之后,statestored 就会定期向 backend 这边捆绑的 StateStoreSubscriber 发送该 backend 的状态更新信息。然后 backend 这边调用 StateStoreSubscriberService.UpdateState() 更新相关状态。同时这个 UpdateState() 调用在 impalad backend/StateStoreSubscriber 这端还会返回该 backend 的一些 update 信息给 statestored。

(4)Impalad(backend) <-> other impalad(backend) (这两个是互为 client/server 的)

ImpalaInternalService(ImpalaInternalService.thrift):某个 backend 的 coordinator 要向其他 backend 的 execute engine 发送执行某个 plan fragment 的请求(提交 ExecPlanFragment 并要求返回 ReportExecStatus)。这部分功能会在 backend 分析中详细讨论。

(5)Impalad backend <-> other frontend

ImpalaPlanService(ImpalaPlanService.thrift): 可以由其他形式的 frontend 生成 TExecRequest 然后交给 backend 执行。

另外,Impala frontend 是用 Java 写的,而 backend 使用 C++ 写的。Frontend 负责把输入的 SQL 解析,然后生成执行计划,之后通过 Thrift 的序列化/反序列化的方式传给backend。TExecRequest(frontend.thrift)是中间传输的数据结构,表示了一个 Query/DML/DDL 的查询请求,也是 SQL 执行过程中在 frontend 和 backend 之间的数据接口。所以我们可以把 impala-frontend 换掉,用其他的形式拼凑出这个 TExecRequest 就可以传给 backend 执行,这也就是前面说的 ImpalaPlanService 干的事。

impala 组件执行流程

1, impala-shell

client 就可以通过 Beeswax 和 HiveServer2 的 Thrift API 向 Impala 提交 query。这两种访问接口的作用是一样的(都是用于 client 提交 query,返回 query result)。

Impala_shell.py 是通过 Beeswax 方式访问 impala 的,下面我们看看 impala_shell.py 是怎么向 impalad 提交 query 的。

(1)通过 OptionParser() 解析命令行参数。如果参数中有 —query 或者 —query_file,则执行 execute_queries_non_interactive_mode(options),这是非交互查询(也就是就查询一个 SQL 或者一个写满 SQL 的文件);否则进入 ImpalaShell.cmdloop (intro) 循环。

(2)进入命令行循环后,一般是先 connect 某一个 impalad,输入 ”connect localhost:21000”,进入 do_connect(self, args) 函数。这个函数根据用户指定的host和port,生成与相应的 impalad 的 socket 连接。最重要的就是这行代码:

self.imp_service = ImpalaService.Client(protocol)

至此 imp_service 就是 client 端的代理了,所有请求都通过它提交。

(3)下面以 select 命令为例说明,如果 client 输入这样的命令 ”select col1, col2 from tbl”,则进入 do_select(self, args) 函数。在这个函数里首先生成 BeeswaxService.Query 对象,向这个对象填充 query statement 和 configuration。然后进入__query_with_result() 函数通过 imp_service.query(query) 提交 query。注意 ImpalaService 都是异步的,提交之后返回一个 QueryHandle,然后就是在一个 while 循环里不断 __get_query_state() 查询状态。如果发现这个 SQL 的状态是 FINISHED,那么就通过 fetch() RPC 获取结果。

2, statestored

Statestored 进程对外提供 StateStoreService RPC 服务,而 StateStoreSubscriberService RPC 服务是在 impalad 进程中提供的。StateStoreService 这个 RPC 的逻辑实现是在 StateStore 这个类里面实现的。

Statestored 收到 backend 发送的 RegisterService RPC 请求时,调用 StateStore::RegisterService() 处理,主要做两件事:

(1)根据 TRegisterServiceRequest 提供的 service_id 把该 service 加入 StateStore.service_instances_。

通常在整个 impala 集群只存在名为 “impala_backend_service” 这一个服务,所以 service_id=”impala_backend_service”。而每个 backend 捆绑的 <SubscriberId,impala::THostPort> 是不一样的,所以就形成了 service 和 backend 一对多的关系,这个关系存储在 StateStore.service_instances_ 组。

(2)Impalad backend 在向 statestored RegisterService 的时候,会把 subscriber_address 发送过去。在 statestored 端,会根据这个 subscriber_address 生成对应的 Subscriber 对象(表示与该 Subscriber 捆绑的 backend)。把与该 backend 绑定的 Subscriber 加入 StateStore.subscribers_ 这个 map 里。每个 Subscriber 有个唯一的 id,这样分布在集群内的 impala backend 就有了全局唯一 id 了。

这样如果以后某个 backend/StateStoreSubscriber fail 或者其中运行的 SQL 任务出了问题,在 statestored 这里就会有体现了,那么就会通知给其他相关的 backend。

那么每个 backend 是怎么 update 的呢?StateStore::UpdateLoop()负责定期向各个 backend 推送其所订阅的 service 的所有成员的更新,目前的更新策略是全量更新,未来会考虑增量更新。

3, impalad

Impalad 进程的服务被 wrapper 在 ImpalaServer 这个类中。ImpalaServer 包括 fe 和 be 的功能,实现了 ImpalaService(Beeswax), ImpalaHiveServer2Service(HiveServer2) 和 ImpalaInternelService API。

全局函数CreateImpalaServer()创建了一个ImpalaServer 其中包含了多个 ThriftServer:

(1)创建一个名为 beeswax_server 的 ThriftServer 对系统外提供 ImpalaService(Beeswax) 服务,主要服务于Query查询,是 fe/frontend 的核心服务,端口21000

(2)创建一个名为 hs2_server 的 ThriftServer 对系统外提供 ImpalaHiveServer2Service 服务,提供Query, DML, DDL相关操作,端口21050

(3)创建一个名为 be_server 的 ThriftServer 对系统内其他 impalad 提供 ImpalaInternalService,端口22000

(4)创建 ImpalaServer 对象,前面三个 ThriftServer 的 TProcessor 被赋值这个 ImpalaServer 对象,所以对前面三个 Thrift 服务的 RPC 请求都交由这个 ImpalaServer 对象处理。最典型的例子就是我们通过 Beeswax接口提交了一个BeeswaxService.query() 请求,在 impalad 端的处理逻辑是由 void ImpalaServer::query(QueryHandle& query_handle, const Query& query)这个函数(在impala-beeswax-server.cc中实现)完成的。

下面是impalad-main.cc的主函数:

int main(int argc, char** argv) {
  //参数解析,开启日志(基于Google gflags和glog)
  InitDaemon(argc, argv);

  LlvmCodeGen::InitializeLlvm();

  // Enable Kerberos security if requested.
  if (!FLAGS_principal.empty()) {
    EXIT_IF_ERROR(InitKerberos("Impalad"));
  }
  //因为frontend, HBase等相关组件是由Java开发的,所以下面这几行都是初始化JNI相关的reference和method id
  JniUtil::InitLibhdfs();
  EXIT_IF_ERROR(JniUtil::Init());
  EXIT_IF_ERROR(HBaseTableScanner::Init());
  EXIT_IF_ERROR(HBaseTableCache::Init());
  InitFeSupport();

  //ExecEnv类是impalad backend上Query/PlanFragment的执行环境。
  //生成SubscriptionManager, SimpleScheduler和各种Cache
  ExecEnv exec_env;
  //生成Beeswax, hive-server2和backend三种ThriftServer用于接收client请求,不过这三种服务的后端真正的处理逻辑都是ImpalaServer* server这个对象。
  ThriftServer* beeswax_server = NULL;
  ThriftServer* hs2_server = NULL;
  ThriftServer* be_server = NULL;
  ImpalaServer* server =
      CreateImpalaServer(&exec_env, FLAGS_fe_port, FLAGS_hs2_port, FLAGS_be_port,
          &beeswax_server, &hs2_server, &be_server);
  //因为be_server是对系统内提供服务的,先启动它。
  be_server->Start();
  //这里面关键是启动了SubscriptionManager和Scheduler
  Status status = exec_env.StartServices();
  if (!status.ok()) {
    LOG(ERROR) << "Impalad services did not start correctly, exiting";
    ShutdownLogging();
    exit(1);
  }

  // register be service *after* starting the be server thread and after starting
  // the subscription mgr handler thread
  scoped_ptr cb;
  if (FLAGS_use_statestore) {
    THostPort host_port;
    host_port.port = FLAGS_be_port;
    host_port.ipaddress = FLAGS_ipaddress;
    host_port.hostname = FLAGS_hostname;
    //注册这个be服务到statestored,整个集群里所有的be服务组成一个group,这样以后来了Query请求就可以在各个backend之间dispatch了。
    Status status =
        exec_env.subscription_mgr()->RegisterService(IMPALA_SERVICE_ID, host_port);

    unordered_set services;
    services.insert(IMPALA_SERVICE_ID);
    //注册callback函数,每当StateStoreSubscriber接收到来自statestored的update之后调用该函数。
    cb.reset(new SubscriptionManager::UpdateCallback(
        bind(mem_fn(&ImpalaServer::MembershipCallback), server, _1)));
    exec_env.subscription_mgr()->RegisterSubscription(services, "impala.server",
        cb.get());

    if (!status.ok()) {
      LOG(ERROR) << "Could not register with state store service: "
                 << status.GetErrorMsg();
           ShutdownLogging();
           exit(1);
    }
  }   // this blocks until the beeswax and hs2 servers terminate   //前面对内服务的be_server已经成功启动,下面启动对外服务的beeswax_server和hs2_server   beeswax_server->Start();
  hs2_server->Start();
  beeswax_server->Join();
  hs2_server->Join();

  delete be_server;
  delete beeswax_server;
  delete hs2_server;
}

exec_env.StartServices() 调用 SubscriptionManager.Start(),进一步调用 StateStoreSubscriber.Start() 启动一个 ThriftServer。

StateStoreSubscriber 实现了 StateStoreSubscriberService(StateStoreSubscriberService.thrift 中定义),用于接收来自 statestored 的 update,并把与这个 StateStoreSubscriber 捆绑的 backend 的 update 反馈给 statestored。这样这个 backend 就可以对其他 backend 可见,这样就可以接受其他 impala backend 发来的任务更新了(当然,接收 backend 更新是通过 statestored 中转的)。


Reference:
Impala 与 Hive 的比较
Impala 架构和 RPC
Real Time Querying on Hadoop using Impala
Impala: A Modern, Open-Source SQL Engine for Hadoop
Meet Impala: Open Source Real-Time SQL Queries on Hadoop

Tagged on: , , ,

6 thoughts on “Impala 简介

  1. nfwufeng

    楼主你好, 我最近在看Impala,有点不明白的,想请教下。你上面说的是一个抽象的查询过程。如果具体到Plan Fragments,然后把PlanFragment分发下去,涉及到物理层的部分, 我觉得首先各个节点的数据不是所有的都由coordinator来收集,首先也要由查询树中的Exchangenode来收集和处理,然后最后再汇总。

    1. 木头小鬼 Post author

      coordinator 并不负责收集信息,而是由statestore跟踪;Coordinator 分发PlanFragment的时候根据statestore 提供的数据位置信息以及调度器实现具体的调度。上面的基本是我转载别人的,有些地方我也不理解,我才接触Impala几天,还是在熟悉呢。两篇源码分析是我自己看源码写的,也不一定对,欢迎一起交流学习哈。

      1. nfwufeng

        最近在纠结一个问题,带有exchangenode的fragment被分发给下去,每个主机都运行一个完整的fragment的话,那么岂不是会有多个exchangenode,对应的scannode怎么和这些exchangenode对应上,scannode怎么知道该把数据给哪个exchangenode?这个对应关系怎么处理?楼主怎么看?PS:我涉足大数据领域也没几天

        1. 木头小鬼 Post author

          ExchangeNode.java 里面有这样一段话:Receiver side of a 1:n data stream. Logically, an ExchangeNode consumes the data produced by its children. For each of the sending child nodes the actual data transmission is performed by the DataStreamSink of the PlanFragment housing that child node。可以看一下 common/thrift 文件夹里面定义的结构体文件,一个 PlanFragment 是由 list-PlanNode组成的,PlanFragment 中有个成员PlanNode planRoot_ 是该PlanFragment 执行 plan tree 的 root 节点,还有ExchangeNode destNode_ 成员,是该 PlanFragment 发送数据的目的地,实际数据传输由另一个成员DataSink sink_ 完成。DataSink 描述了一个 PlanFragment output rows 的 destination。

发表评论

电子邮件地址不会被公开。