Presto Connector 实现原理
前言
Presto 的一大特色是支持多源联合查询,而实现多数据源是通过 Connector 机制来实现的,Presto 内置有多种数据源,如 Hive、MySQL、MongoDB、Kafka 等十多种。不仅如此,Presto 的扩展机制允许自定义 Connector,从而实现对订制数据源的查询需求。理解 Presto 的关键之一是了解 Connector 的工作原理和实现方式。本文通过源代码解析的方式,通过 MySQL 为例,详细介绍 Connector Presto 的实现原理。
Presto 插件
在Presto中,用户自定义函数(UDF)、事件监听器(Event Listener)、支持的数据类型(Data Types和Parameter Types)、访问控制(Access Control)、资源组(Resource Group)以及本文将要介绍的Connector都是通过插件机制实现的。
插件类定义
Presto将支持的所有插件类型封装在一个统一的接口中:
public interface Plugin
default Iterable<ConnectorFactory> getConnectorFactories()
return emptyList();
default Iterable<BlockEncoding> getBlockEncodings()
return emptyList();
default Iterable<Type> getTypes()
return emptyList();
default Iterable<ParametricType> getParametricTypes()
return emptyList();
default Set<Class<?>> getFunctions()
return emptySet();
default Iterable<SystemAccessControlFactory> getSystemAccessControlFactories()
return emptyList();
default Iterable<PasswordAuthenticatorFactory> getPasswordAuthenticatorFactories()
return emptyList();
default Iterable<EventListenerFactory> getEventListenerFactories()
return emptyList();
default Iterable<ResourceGroupConfigurationManagerFactory> getResourceGroupConfigurationManagerFactories()
return emptyList();
default Iterable<SessionPropertyConfigurationManagerFactory> getSessionPropertyConfigurationManagerFactories()
return emptyList();
default Iterable<FunctionNamespaceManagerFactory> getFunctionNamespaceManagerFactories()
return emptyList();
插件加载流程
当Presto启动的时候从以下路径加载Plugin:(PluginManager.loadPlugins)
public void loadPlugins()
throws Exception{
for (File file : listFiles(installedPluginsDir)) {
if (file.isDirectory()) {
loadPlugin(file.getAbsolutePath());
for (String plugin : plugins) {
loadPlugin(plugin);
其中installedPluginsDir通过以下配置项配置:
@Config("plugin.dir")
public PluginManagerConfig setInstalledPluginsDir(File installedPluginsDir)
this.installedPluginsDir = installedPluginsDir;
return this;
plugins通过以下配置项配置:
@Config("plugin.bundles")
public PluginManagerConfig setPlugins(String plugins)
if (plugins == null) {
this.plugins = null;
else {
this.plugins = ImmutableList.copyOf(Splitter.on(',').omitEmptyStrings().trimResults().split(plugins));
return this;
plugin.bundles配置在config.properties配置文件,默认在开发环境是各个插件模块对应的pom文件路径,用于从maven仓库加载依赖的jar包。在生产环境部署时候,通常指定插件目录即可。将每种插件依赖的三方包放在$installedPluginsDir/<pluginName>目录下,将插件的配置文件放在$PrestoHome/etc/ 路径下,pluginName 为插件的名称。但是针对不同的插件类型,配置文件放置的位置略有不同,比如
- 对于Event Listerner类型的插件,其配置文件固定为$PrestoHome/etc/event-listener.properties;
- 对于UDF则不需要配置文件;
- 对于名称为mysql的connector插件,其配置文件为$PrestoHome/etc/catalog/mysql.properties。因为每种插件依赖的包位于不同的路径,根据不同的类加载器加载 就可以避免包冲突问题:
private void loadPlugin(String plugin)
throws Exception
log.info("-- Loading plugin %s --", plugin);
URLClassLoader pluginClassLoader = buildClassLoader(plugin);
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(pluginClassLoader)) {
loadPlugin(pluginClassLoader);
log.info("-- Finished loading plugin %s --", plugin);
//SPI机制加载实现了Plugin接口的类
private void loadPlugin(URLClassLoader pluginClassLoader)
ServiceLoader<Plugin> serviceLoader = ServiceLoader.load(Plugin.class, pluginClassLoader);
List<Plugin> plugins = ImmutableList.copyOf(serviceLoader);
if (plugins.isEmpty()) {
log.warn("No service providers of type %s", Plugin.class.getName());
for (Plugin plugin : plugins) {
log.info("Installing %s", plugin.getClass().getName());
installPlugin(plugin);
//在这里通过Plugin的“分发”接口依次列举并注册所有的插件类型
public void installPlugin(Plugin plugin)
for (ConnectorFactory connectorFactory : plugin.getConnectorFactories()) {
log.info("Registering connector %s", connectorFactory.getName());
connectorManager.addConnectorFactory(connectorFactory);
for (EventListenerFactory eventListenerFactory : plugin.getEventListenerFactories()) {
log.info("Registering event listener %s", eventListenerFactory.getName());
eventListenerManager.addEventListenerFactory(eventListenerFactory);
最终处理落到方法:installPlugin(Plugin plugin),在该方法中将所有支持的插件按照类型分别存放到不同的manager的Factories,如Connector插件存放到ConnectorManager,Event Listener存放到EventListenerManager中。所谓Factories其实就是一个内存Map,比如ConnectorManager的
private final ConcurrentMap<String, ConnectorFactory> connectorFactories = new ConcurrentHashMap<>();
和EventListenerManager的
private final Map<String, EventListenerFactory> eventListenerFactories = new ConcurrentHashMap<>();
插件与SPI
上文中已经了解到这行关键的代码:
ServiceLoader<Plugin> serviceLoader = ServiceLoader.load(Plugin.class, pluginClassLoader);
它用到了Java SPI(Service Provider Interface)。所谓SPI,就是JDK内置的一种服务提供发现机制,简单说就是给定接口,动态加载接口的实现。比如我们熟知的java.sql.Driver是一个接口,在使用Driver的时候根据提供的实现动态选择mysql的Driver还是postgresql的Driver。使用SPI只需要在resources目录下创建目录META-INF/services,然后在services下创建以接口全名命名的文件即可,内容为对应接口的实现类全名,比如:文件名 META-INF/services/com.facebook.presto.spi.Plugin 对应的内容为:
com.facebook.presto.plugin.mysql.MySqlPlugin
注意:翻看mysql的connector的源代码中并不能找到META-INF/services及接口类文件声明,这是因为Presto使用了插件自动发现功能,自动创建了对应的目录和文件,比如在PluginManager中的这个方法:
private URLClassLoader buildClassLoaderFromPom(File pomFile)
throws Exception
List<Artifact> artifacts = resolver.resolvePom(pomFile);
URLClassLoader classLoader = createClassLoader(artifacts, pomFile.getPath());
Artifact artifact = artifacts.get(0);
Set<String> plugins = discoverPlugins(artifact, classLoader);
if (!plugins.isEmpty()) {
writePluginServices(plugins, artifact.getFile());
return classLoader;
该方法调用的方法discoverPlugins和writePluginServices分别实现了插件声明的发现和写入。
Catalog 加载流程
从Connector的加载流程,我们了解到所有的ConnectorFactory都被注册到ConnectorFactory的Map内存结构中。但真正能够使用该Connector还需要加载Catalog,所谓Catalog就是Presto的数据源类别,Presto通过如下三级结构来定义数据表:
catalog=>schema=>table
比如每个mysql实例都是一个catalog,每个mysql实例都有独立的域名、端口和登录账号等信息,schema 类比每个mysql实例的数据库。因此不同的Catalog可以使用同一个 Connector,Connector实例由ConnectorFactory来创建。所有的catalog的配置文件都位于$PrestoHome/etc/catalog路径下,加载流程从StaticCatalogStore的loadCatalogs方法开始:
public void loadCatalogs()
throws Exception{
for (File file : listFiles(catalogConfigurationDir)) {
if (file.isFile() && file.getName().endsWith(".properties")) {
loadCatalog(file);
private void loadCatalog(File file)
throws Exception{
String catalogName = Files.getNameWithoutExtension(file.getName());
if (disabledCatalogs.contains(catalogName)) {
log.info("Skipping disabled catalog %s", catalogName);
return;
log.info("-- Loading catalog %s --", file);
Map<String, String> properties = new HashMap<>(loadProperties(file));
String connectorName = properties.remove("connector.name");
checkState(connectorName != null, "Catalog configuration %s does not contain connector.name", file.getAbsoluteFile());
connectorManager.createConnection(catalogName, connectorName, ImmutableMap.copyOf(properties));
log.info("-- Added catalog %s using connector %s --", catalogName, connectorName);
public synchronized ConnectorId createConnection(String catalogName, String connectorName, Map<String, String> properties)
requireNonNull(connectorName, "connectorName is null");
ConnectorFactory connectorFactory = connectorFactories.get(connectorName);
checkArgument(connectorFactory != null, "No factory for connector %s", connectorName);
return createConnection(catalogName, connectorFactory, properties);
private synchronized ConnectorId createConnection(String catalogName, ConnectorFactory connectorFactory, Map<String, String> properties)
checkState(!stopped.get(), "ConnectorManager is stopped");
requireNonNull(catalogName, "catalogName is null");
requireNonNull(properties, "properties is null");
requireNonNull(connectorFactory, "connectorFactory is null");
checkArgument(!catalogManager.getCatalog(catalogName).isPresent(), "A catalog already exists for %s", catalogName);
ConnectorId connectorId = new ConnectorId(catalogName);
checkState(!connectors.containsKey(connectorId), "A connector %s already exists", connectorId);
addCatalogConnector(catalogName, connectorId, connectorFactory, properties);
return connectorId;
以上流程非常简单,整个流程的入参主要有两个:connectorName和catalogName,结果输出catalog对应的connectorId。connectorName从配置的Connector Plugin的connector.name配置项中获得,catalogName从catalog目录下的配置文件的文件名前缀解析获得,而创建Connector用到前面提到的ConnectorFactory。真正起作用的主要逻辑在addCatalogConnector方法中实现。
Catalog模型
在看addCatalogConnector实现逻辑之前,有必要先了解下Catalog模型:
public class Catalog
private final String catalogName;
private final ConnectorId connectorId;
private final Connector connector;
private final ConnectorId informationSchemaId;
private final Connector informationSchema;
private final ConnectorId systemTablesId;
private final Connector systemTables;
public Catalog(
String catalogName,
ConnectorId connectorId,
Connector connector,
ConnectorId informationSchemaId,
Connector informationSchema,
ConnectorId systemTablesId,
Connector systemTables)
从Catalog定义可知,每种数据源除了自身对应的Connector之外,还有另外两种:informationSchema和systemTables。其中information_schema对应了管理数据源的元数据信息,比如在mysql中,informationSchema存储了所有数据库的信息,如数据库名,数据库的表,表栏的数据类型与访问权限等,但systemTables不是每个数据源都有的。3个ConnectorId都跟catalogName关联:
public final class ConnectorId
private static final String INFORMATION_SCHEMA_CONNECTOR_PREFIX = "$info_schema@";
private static final String SYSTEM_TABLES_CONNECTOR_PREFIX = "$system@";
private final String catalogName;
@JsonCreator
public ConnectorId(String catalogName)
this.catalogName = requireNonNull(catalogName, "catalogName is null");
if (catalogName.isEmpty()) {
throw new IllegalArgumentException("catalogName is empty");
public static ConnectorId createInformationSchemaConnectorId(ConnectorId connectorId)
return new ConnectorId(INFORMATION_SCHEMA_CONNECTOR_PREFIX + connectorId.getCatalogName());
public static ConnectorId createSystemTablesConnectorId(ConnectorId connectorId)
return new ConnectorId(SYSTEM_TABLES_CONNECTOR_PREFIX + connectorId.getCatalogName());
Catalog注册过程
private synchronized void addCatalogConnector(String catalogName, ConnectorId connectorId, ConnectorFactory factory, Map<String, String> properties){
// create all connectors before adding, so a broken connector does not leave the system half updated
MaterializedConnector connector = new MaterializedConnector(connectorId, createConnector(connectorId, factory, properties));
MaterializedConnector informationSchemaConnector = new MaterializedConnector(
createInformationSchemaConnectorId(connectorId),
new InformationSchemaConnector(catalogName, nodeManager, metadataManager, accessControlManager, connector.getSessionProperties()));
ConnectorId systemId = createSystemTablesConnectorId(connectorId);
SystemTablesProvider systemTablesProvider;
if (nodeManager.getCurrentNode().isCoordinator()) {
systemTablesProvider = new DelegatingSystemTablesProvider(
new StaticSystemTablesProvider(connector.getSystemTables()),
new MetadataBasedSystemTablesProvider(metadataManager, catalogName));
else {
systemTablesProvider = new StaticSystemTablesProvider(connector.getSystemTables());
MaterializedConnector systemConnector = new MaterializedConnector(systemId, new SystemConnector(
systemId,
nodeManager,
systemTablesProvider,
transactionId -> transactionManager.getConnectorTransaction(transactionId, connectorId),
connector.getSessionProperties()));
Catalog catalog = new Catalog(
catalogName,
connector.getConnectorId(),
connector.getConnector(),
informationSchemaConnector.getConnectorId(),
informationSchemaConnector.getConnector(),
systemConnector.getConnectorId(),
systemConnector.getConnector());
addConnectorInternal(connector);
addConnectorInternal(informationSchemaConnector);
addConnectorInternal(systemConnector);
catalogManager.registerCatalog(catalog);
其中MaterializedConnector的定义为:
private static class MaterializedConnector
private final ConnectorId connectorId;
private final Connector connector;
private final ConnectorSplitManager splitManager;
private final Set<SystemTable> systemTables;
private final Set<Procedure> procedures;
private final ConnectorPageSourceProvider pageSourceProvider;
private final Optional<ConnectorPageSinkProvider> pageSinkProvider;
private final Optional<ConnectorIndexProvider> indexProvider;
private final Optional<ConnectorNodePartitioningProvider> partitioningProvider;
private final Optional<ConnectorPlanOptimizerProvider> planOptimizerProvider;
private final Optional<ConnectorAccessControl> accessControl;
private final List<PropertyMetadata<?>> sessionProperties;
private final List<PropertyMetadata<?>> tableProperties;
private final List<PropertyMetadata<?>> schemaProperties;
private final List<PropertyMetadata<?>> columnProperties;
private final List<PropertyMetadata<?>> analyzeProperties;
MaterializedConnector包装了Connector所有需要的信息,如进行数据读取的分片管理器ConnectorSplitManager、用于索引读取的ConnectorIndexProvider、用于分页读取的ConnectorPageSourceProvider、用于分页写入的ConnectorPageSinkProvider以及各种属性元数据PropertyMetadata等,而addConnectorInternal方法则将MaterializedConnector包装的各属性注册到对应的Manager:如ConnectorSplitManager注册到SplitManager,ConnectorIndexProvider注册到IndexManager,ConnectorPageSourceProvider注册到PageSourceManager,ConnectorPageSinkProvider注册到PageSinkManager,各种属性注册到MetadataManager等,addConnectorInternal实现逻辑如下:
private synchronized void addConnectorInternal(MaterializedConnector connector)
checkState(!stopped.get(), "ConnectorManager is stopped");
ConnectorId connectorId = connector.getConnectorId();
checkState(!connectors.containsKey(connectorId), "A connector %s already exists", connectorId);
connectors.put(connectorId, connector);
splitManager.addConnectorSplitManager(connectorId, connector.getSplitManager());
pageSourceManager.addConnectorPageSourceProvider(connectorId, connector.getPageSourceProvider());
connector.getPageSinkProvider()
.ifPresent(pageSinkProvider -> pageSinkManager.addConnectorPageSinkProvider(connectorId, pageSinkProvider));
connector.getIndexProvider()
.ifPresent(indexProvider -> indexManager.addIndexProvider(connectorId, indexProvider));
connector.getPartitioningProvider()
.ifPresent(partitioningProvider -> partitioningProviderManager.addPartitioningProvider(connectorId, partitioningProvider));
if (nodeManager.getCurrentNode().isCoordinator()) {
connector.getPlanOptimizerProvider()
.ifPresent(planOptimizerProvider -> connectorPlanOptimizerManager.addPlanOptimizerProvider(connectorId, planOptimizerProvider));
metadataManager.getProcedureRegistry().addProcedures(connectorId, connector.getProcedures());
connector.getAccessControl()
.ifPresent(accessControl -> accessControlManager.addCatalogAccessControl(connectorId, accessControl));
metadataManager.getTablePropertyManager().addProperties(connectorId, connector.getTableProperties());
metadataManager.getColumnPropertyManager().addProperties(connectorId, connector.getColumnProperties());
metadataManager.getSchemaPropertyManager().addProperties(connectorId, connector.getSchemaProperties());
metadataManager.getAnalyzePropertyManager().addProperties(connectorId, connector.getAnalyzeProperties());
metadataManager.getSessionPropertyManager().addConnectorSessionProperties(connectorId, connector.getSessionProperties());
只有完成这一步catalog的注册,catalog才可以真正对外提供服务:如查看当前catalog的所有schema等元信息,查看当前catalog支持的函数,从catalog读取或者写入数据等。
Connector模型
再次回到Plugin接口定义来看看关于Connector接口方法的定义:
public interface Plugin
default Iterable<ConnectorFactory> getConnectorFactories()
return emptyList();
方法返回一个ConnectorFactory迭代器,用于创建具体的Connector,为了进一步了解ConnectorFactory的实现,我们以Mysql为例来详细探索。从Mysql Plugin插件实现声明直达JdbcPlugin:
@Override
public Iterable<ConnectorFactory> getConnectorFactories()
return ImmutableList.of(new JdbcConnectorFactory(name, module, getClassLoader()));
public interface ConnectorFactory
String getName();
ConnectorHandleResolver getHandleResolver();
Connector create(String catalogName, Map<String, String> config, ConnectorContext context);
实现ConnectorFactory接口需要需要实现以上3个方法,后面2个方法是重点也是难点。其中getHandleResolver返回一个ConnectorHandleResolver接口,用于对数据源的解析处理,各接口方法定义如下:
public interface ConnectorHandleResolver
Class<? extends ConnectorTableHandle> getTableHandleClass();
Class<? extends ConnectorTableLayoutHandle> getTableLayoutHandleClass();
Class<? extends ColumnHandle> getColumnHandleClass();
Class<? extends ConnectorSplit> getSplitClass();
default Class<? extends ConnectorIndexHandle> getIndexHandleClass()
throw new UnsupportedOperationException();
default Class<? extends ConnectorOutputTableHandle> getOutputTableHandleClass()
throw new UnsupportedOperationException();
default Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass()
throw new UnsupportedOperationException();
default Class<? extends ConnectorPartitioningHandle> getPartitioningHandleClass()
throw new UnsupportedOperationException();
default Class<? extends ConnectorTransactionHandle> getTransactionHandleClass()
throw new UnsupportedOperationException();
Mysql的ConnectorHandleResolver由类JdbcHandleResolver来实现。ConnectorFactory的另一个重要方法是create,用于创建Connector,Connector也是一个接口,其定义为:
public interface Connector
ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly);
* Guaranteed to be called at most once per transaction. The returned metadata will only be accessed
* in a single threaded context.
ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle);
ConnectorSplitManager getSplitManager();
* @throws UnsupportedOperationException if this connector does not support reading tables page at a time
default ConnectorPageSourceProvider getPageSourceProvider()
throw new UnsupportedOperationException();
* @throws UnsupportedOperationException if this connector does not support reading tables record at a time
default ConnectorRecordSetProvider getRecordSetProvider()
throw new UnsupportedOperationException();
* @throws UnsupportedOperationException if this connector does not support writing tables page at a time
default ConnectorPageSinkProvider getPageSinkProvider()
throw new UnsupportedOperationException();
* @throws UnsupportedOperationException if this connector does not support indexes
default ConnectorIndexProvider getIndexProvider()
throw new UnsupportedOperationException();
* @throws UnsupportedOperationException if this connector does not support partitioned table layouts
default ConnectorNodePartitioningProvider getNodePartitioningProvider()
throw new UnsupportedOperationException();
* @throws UnsupportedOperationException if this connector does not need to optimize query plans
default ConnectorPlanOptimizerProvider getConnectorPlanOptimizerProvider()
throw new UnsupportedOperationException();
* @return the set of system tables provided by this connector
default Set<SystemTable> getSystemTables()
return emptySet();
* @return the set of procedures provided by this connector
default Set<Procedure> getProcedures()
return emptySet();
* @return the system properties for this connector
default List<PropertyMetadata<?>> getSessionProperties()
return emptyList();
* @return the schema properties for this connector
default List<PropertyMetadata<?>> getSchemaProperties()
return emptyList();
* @return the analyze properties for this connector
default List<PropertyMetadata<?>> getAnalyzeProperties()
return emptyList();
* @return the table properties for this connector
default List<PropertyMetadata<?>> getTableProperties()
return emptyList();
* @return the column properties for this connector
default List<PropertyMetadata<?>> getColumnProperties()
return emptyList();
* @throws UnsupportedOperationException if this connector does not have an access control
default ConnectorAccessControl getAccessControl()
throw new UnsupportedOperationException();
* Commit the transaction. Will be called at most once and will not be called if
* {@link #rollback(ConnectorTransactionHandle)} is called.
default void commit(ConnectorTransactionHandle transactionHandle)
* Rollback the transaction. Will be called at most once and will not be called if
* {@link #commit(ConnectorTransactionHandle)} is called.
* Note: calls to this method may race with calls to the ConnectorMetadata.
default void rollback(ConnectorTransactionHandle transactionHandle)
* True if the connector only supports write statements in independent transactions.
default boolean isSingleStatementWritesOnly()
return false;
* Shutdown the connector by releasing any held resources such as
* threads, sockets, etc. This method will only be called when no
* queries are using the connector. After this method is called,
* no methods will be called on the connector or any objects that
* have been returned from the connector.
default void shutdown() {}
default Set<ConnectorCapabilities> getCapabilities()
return emptySet();
接口方法比较多,总体来看都是跟对数据源的读写操作紧密相关。
@Override
public Connector create(String catalogName, Map<String, String> requiredConfig, ConnectorContext context)
requireNonNull(requiredConfig, "requiredConfig is null");
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
Bootstrap app = new Bootstrap(
binder -> {
binder.bind(FunctionMetadataManager.class).toInstance(context.getFunctionMetadataManager());
binder.bind(StandardFunctionResolution.class).toInstance(context.getStandardFunctionResolution());
binder.bind(RowExpressionService.class).toInstance(context.getRowExpressionService());
new JdbcModule(catalogName),
module);
Injector injector = app
.strictConfig()
.doNotInitializeLogging()
.setRequiredConfigurationProperties(requiredConfig)
.initialize();
return injector.getInstance(JdbcConnector.class);