Impala快速上手:Impala简介,Impala shell、Java、Python操作Impala

摘要: impala JDBC DBCP impyla

impala简介

Impala是Cloudera由C++编写的基于MPP(massively parallel processing 大规模并行处理)理念的查询引擎,由运行在CDH集群上的不同的守护进程组成,它跟Hive的metastore集成,共用database和tables等信息。
Impala具有下面几个优势:

  • impala跟现有的 CDH组件自动集成 ,数据可以被CDH中的各种组件共用
  • 支持sql查询 hbase hdfs kudu 等。
  • impala只需要 几秒钟 或者 分钟级别 就能返回数据
  • 支持parquet、text、rcfile、hfile等文件格式
  • 操作impala-shell

    impala-shell -i cloudera01:21000 -l --auth_creds_ok_in_clear -u cdh01
    -i: 集群中任意一台impalad服务器,如果使用了该参数需要输入用户密码
    -l: 使用ldap, --auth_creds_ok_in_clear 由于没有使用ssl,需要添加该参数
    -u: 用户
    其他参数:
  • -B : 去除输出格式(表格)从而降低性能负载,配合--output_delimiter=一起使用,默认分割符是\t--output_delimiter=,可以指定分隔符
  • -o: 将查询结果输出到一个目录,例如-o "./load_data_test.csv"查询完毕后exit在本地目录会有一个csv格式文件
  • -f: 在命令行运行一个sql脚本,比如
  • [root@cloudera01 home]# cat query.sql 
    use test_gp;
    insert into student_info values('5', 345);
    insert into student_info values('6', 345);
    insert into student_info values('7', 345);
    insert into student_info values('8', 345);
    insert into student_info values('9', 345);
    

    在命令行执行

    [root@cloudera01 pgeng]# impala-shell -i cloudera01:21000 -l --auth_creds_ok_in_clear -u cdh001 -f query.sql
    
    [cloudera01:21000] > create database test_gp;
    Query: create database test_gp
    Fetched 0 row(s) in 5.71s
    [cloudera01:21000] > create table student_info(name String, age INTEGER);
    Query: create table student_info(name String, age INTEGER)
    Fetched 0 row(s) in 6.59s
    [cloudera01:21000] > insert into student_info values('王帆', 15);
    Query: insert into student_info values('王帆', 15)
    Query submitted at: 2020-11-11 16:46:21 (Coordinator: http://cloudera01:25000)
    Query progress can be monitored at: http://cloudera01:25000/query_plan?query_id=624958465fedb197:13ace98200000000
    Modified 1 row(s) in 6.14s
    [cloudera01:21000] > insert into student_info values('猪坚强', 16);
    Query: insert into student_info values('猪坚强', 16)
    Query submitted at: 2020-11-11 16:46:41 (Coordinator: http://cloudera01:25000)
    Query progress can be monitored at: http://cloudera01:25000/query_plan?query_id=27405bb44af85efa:49d42b600000000
    Modified 1 row(s) in 6.34s
    [cloudera01:21000] > insert into student_info values('李想', 17);
    Query: insert into student_info values('李想', 17)
    Query submitted at: 2020-11-11 16:46:56 (Coordinator: http://cloudera01:25000)
    Query progress can be monitored at: http://cloudera01:25000/query_plan?query_id=564d89653cdde3a2:8ed5550a00000000
    [cloudera01:21000] > select * from student_info;
    Query: select * from student_info
    Query submitted at: 2020-11-11 16:47:11 (Coordinator: http://cloudera01:25000)
    Query progress can be monitored at: http://cloudera01:25000/query_plan?query_id=1c4ba5816f72703e:490e41f000000000
    +--------+-----+
    | name   | age |
    +--------+-----+
    | 王帆   | 15  |
    | 李想   | 17  |
    | 猪坚强 | 16  |
    +--------+-----+
    

    JDBC连接

    使用hive-jdbc链接impala,增加pom依赖

    <dependency>
                <groupId>org.apache.hive</groupId>
                <artifactId>hive-jdbc</artifactId>
                <version>1.1.0-cdh5.15.2</version>
            </dependency>
    

    测试查询impala和往impala插入数据,设置driverurlusernamepasswd

    import java.sql.*;
    import java.util.HashMap;
    import java.util.Map;
    public class ImpalaUtils {
        private Connection conn = null;
        private static ImpalaUtils instance = null;
        public ImpalaUtils() {
            try {
                String driver = "org.apache.hive.jdbc.HiveDriver";
                String JDBCUrl = "jdbc:hive2://192.168.60.104:21050/test_gp";
                String username = "cdh01";
                String password = "******";
                Class.forName(driver);
                conn = DriverManager.getConnection(JDBCUrl, username, password);
            } catch (Exception e) {
                e.printStackTrace();
        public static ImpalaUtils getInstance() {
            if (instance == null) {
                synchronized (ImpalaUtils.class) {
                    if (instance == null) {
                        instance = new ImpalaUtils();
            return instance;
        public static int selectTest(String name) {
            int res = 0;
            Statement statement = null;
            try {
                statement = ImpalaUtils.getInstance().conn.createStatement();
                ResultSet result = statement.executeQuery(String.format("select * from test_gp.student_info where name = '%s';", name));
                while (result.next()) {
                    res = Integer.parseInt(result.getString("age"));
            } catch (Exception e) {
                e.printStackTrace();
            return res;
        public static void insertTest(Map<String, Integer> map) {
            Connection conn = ImpalaUtils.getInstance().conn;
            PreparedStatement preparedStatement;
            try {
                preparedStatement = conn.prepareStatement("insert into test_gp.student_info (name, age) values(?,?)");
                for (Map.Entry<String, Integer> entry : map.entrySet()) {
                    preparedStatement.setString(1, entry.getKey());
                    preparedStatement.setInt(2, entry.getValue());
                    preparedStatement.execute();
            } catch (Exception e) {
                e.printStackTrace();
        public static void main(String[] args) {
            int res = ImpalaUtils.selectTest("王帆");
            System.out.println(res);
            insertTest(new HashMap<String, Integer>() {{
                put("陈龙", 23);
                put("小王", 55);
                put("老陈", 12);
    

    使用DBCP连接池

    使用DBCP连接池链接Impala查询数据

    import java.sql.{Connection, Statement}
    import java.util.{HashMap, Properties}
    import ImpalaUtils.msqlComnsMap
    import org.apache.commons.dbcp.BasicDataSource
    object ImpalaScala {
      private var msqlComnsMap: java.util.Map[String,BasicDataSource] = new HashMap[String,BasicDataSource]
      def getConnPool(prop: Properties, name:String) = {
        if (!msqlComnsMap.containsKey(name)){
          this.synchronized {
            if (!msqlComnsMap.containsKey(name)){
              var tmpCon: BasicDataSource = new BasicDataSource()
              tmpCon = new BasicDataSource()
              tmpCon.setDriverClassName(prop.getProperty("jdbc.driver"))
              tmpCon.setUrl(prop.getProperty("jdbc."+name+".url"))
              tmpCon.setUsername(prop.getProperty("jdbc."+name+".user"))
              tmpCon.setPassword(prop.getProperty("jdbc."+name+".passwd"))
              tmpCon.setMaxActive(4)
              tmpCon.setMaxIdle(4)
              tmpCon.setMinIdle(2)
              tmpCon.setInitialSize(4)
              tmpCon.setMaxWait(10000)
              tmpCon.setTestWhileIdle(true)
              tmpCon.setValidationQuery("select 1")
              tmpCon.setValidationQueryTimeout(10000)
              tmpCon.setTimeBetweenEvictionRunsMillis(10000)
              msqlComnsMap.put(name,tmpCon)
        msqlComnsMap.get(name)
      def main(args: Array[String]): Unit = {
        val prop = new Properties() {
            put("jdbc.driver", "org.apache.hive.jdbc.HiveDriver")
            put("jdbc.impala.user", "cdh01")
            put("jdbc.impala.passwd", "******")
            put("jdbc.impala.url", "jdbc:hive2://192.168.60.104:21050/test_gp")
        val connPool: BasicDataSource = ImpalaScala.getConnPool(prop, "impala")
        if (connPool != null) {
          var conn: Connection = null
          var statement: Statement = null
          try {
            conn = connPool.getConnection
            statement = conn.createStatement
            val res = statement.executeQuery("select * from test_gp.student_info where name='王帆';")
            while (res.next) {
              System.out.println("age => " + res.getString("age"))
          } catch {
            case e: Exception =>
              e.printStackTrace()
          } finally try {
            if (statement != null) statement.close()
            if (conn != null) conn.close()
          } catch {
            case e: Exception =>
              System.out.println()
              e.printStackTrace()
    

    Python操作Impala

    Python操作Impala使用Python库impyla,使用示例如下

    root@ubuntu:~/ # pip install impyla
    
    from impala.dbapi import connect
    from impala.util import as_pandas
    config = {
        "host": "192.20.3.55", 
        "port": 21050,
        "database": "test",
        "user": "cdh_dev", 
        "password": "123456", 
        "auth_mechanism": "PLAIN"
    def get_impala_data(query):
        conn = None
        cursor = None
        data = []
            conn = connect(**config)
            cursor = conn.cursor()
            cursor.execute(query)
            data = as_pandas(cursor)  # 支持fetch_all等收集方式
        except Exception as e:
            print("error", e.args)
        finally:
            if cursor:
                cursor.close()
            if conn: