;
随后,创建server端的HiveMetaStore.HMSHandler,HMSHandler继承自IHMSHandler,而IHMSHandler又继承自ThriftHiveMetastore.Iface,在HMSHandler中实现了所有操作的对外方法:
public class ThriftHiveMetastore {
* This interface is live.
public interface Iface extends com.facebook.fb303.FacebookService.Iface {
public String getMetaConf(String key) throws MetaException, org.apache.thrift.TException;
public void setMetaConf(String key, String value) throws MetaException, org.apache.thrift.TException;
public void create_database(Database database) throws AlreadyExistsException, InvalidObjectException, MetaException, org.apache.thrift.TException;
public Database get_database(String name) throws NoSuchObjectException, MetaException, org.apache.thrift.TException;
public void drop_database(String name, boolean deleteData, boolean cascade) throws NoSuchObjectException, InvalidOperationException, MetaException, org.apache.thrift.TException;
public List<String> get_databases(String pattern) throws MetaException, org.apache.thrift.TException;
public List<String> get_all_databases() throws MetaException, org.apache.thrift.TException;
public void alter_database(String dbname, Database db) throws MetaException, NoSuchObjectException, org.apache.thrift.TException;
public Type get_type(String name) throws MetaException, NoSuchObjectException, org.apache.thrift.TException;
public boolean create_type(Type type) throws AlreadyExistsException, InvalidObjectException, MetaException, org.apache.thrift.TException;
public boolean drop_type(String type) throws MetaException, NoSuchObjectException, org.apache.thrift.TException;
......
在创建HiveMetaStore的init方法中,同时创建了三种Listener---MetaStorePreEventListener,MetaStoreEventListener,MetaStoreEndFunctionListener用于对每一步事件的监听。
1 initListeners = MetaStoreUtils.getMetaStoreListeners(
2 MetaStoreInitListener.class, hiveConf,
3 hiveConf.getVar(HiveConf.ConfVars.METASTORE_INIT_HOOKS));
4 for (MetaStoreInitListener singleInitListener: initListeners) {
5 MetaStoreInitContext context = new MetaStoreInitContext();
6 singleInitListener.onInit(context);
9 String alterHandlerName = hiveConf.get("hive.metastore.alter.impl",
10 HiveAlterHandler.class.getName());
11 alterHandler = (AlterHandler) ReflectionUtils.newInstance(MetaStoreUtils.getClass(
12 alterHandlerName), hiveConf);
13 wh = new Warehouse(hiveConf);
15 synchronized (HMSHandler.class) {
16 if (currentUrl == null || !currentUrl.equals(MetaStoreInit.getConnectionURL(hiveConf))) {
17 createDefaultDB();
18 createDefaultRoles();
19 addAdminUsers();
20 currentUrl = MetaStoreInit.getConnectionURL(hiveConf);
21 }
22 }
24 if (hiveConf.getBoolean("hive.metastore.metrics.enabled", false)) {
25 try {
26 Metrics.init();
27 } catch (Exception e) {
28 // log exception, but ignore inability to start
29 LOG.error("error in Metrics init: " + e.getClass().getName() + " "
30 + e.getMessage(), e);
31 }
32 }
34 preListeners = MetaStoreUtils.getMetaStoreListeners(MetaStorePreEventListener.class,
35 hiveConf,
36 hiveConf.getVar(HiveConf.ConfVars.METASTORE_PRE_EVENT_LISTENERS));
37 listeners = MetaStoreUtils.getMetaStoreListeners(MetaStoreEventListener.class, hiveConf,
38 hiveConf.getVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS));
39 listeners.add(new SessionPropertiesListener(hiveConf));
40 endFunctionListeners = MetaStoreUtils.getMetaStoreListeners(
41 MetaStoreEndFunctionListener.class, hiveConf,
42 hiveConf.getVar(HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS));
同时创建了AlterHandler,它是HiveAlterHandler的接口,是将修改表和修改partition的操作抽离了出来单独实现(修改表很复杂的。。)。
1 public interface AlterHandler extends Configurable {
3 /**
4 * handles alter table
6 * @param msdb
7 * object to get metadata
8 * @param wh
9 * TODO
10 * @param dbname
11 * database of the table being altered
12 * @param name
13 * original name of the table being altered. same as
14 * <i>newTable.tableName</i> if alter op is not a rename.
15 * @param newTable
16 * new table object
17 * @throws InvalidOperationException
18 * thrown if the newTable object is invalid
19 * @throws MetaException
20 * thrown if there is any other error
21 */
22 public abstract void alterTable(RawStore msdb, Warehouse wh, String dbname,
23 String name, Table newTable) throws InvalidOperationException,
24 MetaException;
26 /**
27 * handles alter table, the changes could be cascaded to partitions if applicable
28 *
29 * @param msdb
30 * object to get metadata
31 * @param wh
32 * Hive Warehouse where table data is stored
33 * @param dbname
34 * database of the table being altered
35 * @param name
36 * original name of the table being altered. same as
37 * <i>newTable.tableName</i> if alter op is not a rename.
38 * @param newTable
39 * new table object
40 * @param cascade
41 * if the changes will be cascaded to its partitions if applicable
42 * @throws InvalidOperationException
43 * thrown if the newTable object is invalid
44 * @throws MetaException
45 * thrown if there is any other error
46 */
47 public abstract void alterTable(RawStore msdb, Warehouse wh, String dbname,
48 String name, Table newTable, boolean cascade) throws InvalidOperationException,
49 MetaException;
最重要的是RawStore的创建。RawStore不光是定义了一套最终的物理操作,使用JDO将一个对象当作表进行存储。ObjectStore中的transaction机制也是通过JDO提供的transaction实现的。当commit失败时,将rollback所有操作。
1 @Override
2 public void createDatabase(Database db) throws InvalidObjectException, MetaException {
3 boolean commited = false;
4 MDatabase mdb = new MDatabase();
5 mdb.setName(db.getName().toLowerCase());
6 mdb.setLocationUri(db.getLocationUri());
7 mdb.setDescription(db.getDescription());
8 mdb.setParameters(db.getParameters());
9 mdb.setOwnerName(db.getOwnerName());
10 PrincipalType ownerType = db.getOwnerType();
11 mdb.setOwnerType((null == ownerType ? PrincipalType.USER.name() : ownerType.name()));
12 try {
13 openTransaction();
14 pm.makePersistent(mdb);
15 commited = commitTransaction();
16 } finally {
17 if (!commited) {
18 rollbackTransaction();
19 }
20 }
21 }
23 @SuppressWarnings("nls")
24 private MDatabase getMDatabase(String name) throws NoSuchObjectException {
25 MDatabase mdb = null;
26 boolean commited = false;
27 try {
28 openTransaction();
29 name = HiveStringUtils.normalizeIdentifier(name);
30 Query query = pm.newQuery(MDatabase.class, "name == dbname");
31 query.declareParameters("java.lang.String dbname");
32 query.setUnique(true);
33 mdb = (MDatabase) query.execute(name);
34 pm.retrieve(mdb);
35 commited = commitTransaction();
36 } finally {
37 if (!commited) {
38 rollbackTransaction();
39 }
40 }
41 if (mdb == null) {
42 throw new NoSuchObjectException("There is no database named " + name);
43 }
44 return mdb;
今晚就到这里。。。自己摸索,如有错误,还望指出谢谢~
spark 对于hive metastore的兼容性随笔--通过classloader实现
spark 对于hive metastore的兼容性随笔--通过classloader实现
hive metastore 3.0介绍
我们说到Hive 3.0.0版本开始,其单独提供了standalone metastore服务以作为像presto等处理引擎的元数据管理中心。