In Chapter 3, you configured a catalog to use a connector to access a data source in Presto—specifically, the TPC-H benchmark data—and then learned a bit about how to query that data with SQL.
Catalogs are an important aspect of using Presto. They define the connection to the underlying data source and storage system, and use concepts such as connector, schema, and table. These fundamental concepts are described in Chapter 4, and their use with SQL is discussed in more detail in Chapter 8.
A connector translates the query and storage concepts of an underlying data source, such as a relational database management system (RDBMS), object storage, or a key-value store, to the SQL and Presto concepts of tables, columns, rows, and data types. These can be simple SQL-to-SQL translations and mappings but also much more complicated translations from SQL to object storage or NoSQL systems. These can also be user defined.
You can think of a connector the same way you think of a driver for a database. It translates the user input into operations that the underlying database can execute. Every connector implements the Presto service provider interface (SPI). This enables Presto to allow you to use the same SQL tooling to work with whatever underlying data source the connector exposes and makes Presto a SQL-on-Anything system.
Query performance is also influenced by the connector implementation. The most basic connector makes a single connection to the data source and provides the data to Presto. However, a more advanced connector can break a statement into multiple connections, performing operations in parallel to allow for better performance. Another advanced feature of a connector is to provide table statistics, that can then be used by the cost-based optimizer to create highly performant query plans.. Such a connector is, however, more complex to implement.
Presto provides numerous connectors:
Connectors for RDBMS systems such as PostgreSQL or MySQL—see “RDBMS Connector Example PostgreSQL”
A Hive connector suitable for querying systems by using the Hadoop Distributed File System (HDFS) and similar object storage systems—see “Hive Connector for Distributed Storage Data Sources”
Numerous connectors to nonrelational data sources—see “Non-Relational Data Sources”
tpch
and tpcds
connectors designed to serve TPC benchmark data—see “Presto TPC-H and TPC-DS Connectors”
A connector for Java Management Extensions, or JMX—see “Presto JMX Connector”
In this chapter, you learn more about some of these connectors, available from the Presto project. More than two dozen connectors are shipped in Presto today, and more are created by the Presto team and the user community. Commercial, proprietary connectors are also available to further extend the reach and performance of Presto. Finally, if you have a custom data source, or one that there is not a connector for, you can implement your own connector by implementing the necessary SPI calls and drop it into the plug-ins directory in Presto.
One important aspect of the catalog and connector usage is that all of them become available to SQL statements and queries in Presto at the same time. This means that you can create queries that span data sources. For example, you can combine data from a relational database with the data in files stored in your object storage backend. These federated queries are discussed in more detail in “Query Federation in Presto”.
As discussed in “Adding a Data Source”, every data source you want to access needs to be configured as a catalog by creating a catalog file. The name of the file determines the name of the catalog when writing queries.
The mandatory property connector.name
indicates which connector is used for
the catalog. The same connector can be used multiple times in different
catalogs; for example, to access different RDBMS server instances with different
databases all using the same technology such as PostgreSQL. Or if you have two
Hive clusters, you can configure two catalogs in a single Presto cluster that
both use the Hive connector, allowing you to query data from both Hive clusters.
Presto contains connectors to both open source and proprietary RDBMSs, including MySQL, PostgreSQL, AWS Redshift, and Microsoft SQL Server. Presto queries these data sources with the connectors by using each system’s respective JDBC drivers.
Let’s look at a simple example using PostgreSQL. A PostgreSQL instance may consist of several databases. Each database contains schemas, which contain objects such as tables and views. When configuring Presto with PostgreSQL, you choose the database that is exposed as a catalog in Presto.
After creating a simple catalog file pointing at a specific database in the
server, etc/catalog/postgresql.properties shown next, and restarting Presto,
you can find out more information. You can also see that the postgresql
connector is configured as the required connector.name
:
connector.name
=
postgresql
connection-url
=
jdbc:postgresql://db.example.com:5432/database
connection-user
=
root
connection-password
=
secret
The user and password in the catalog properties file determines the access rights to the underlying data source. This can be used to, for example, restrict access to read-only operations or to restrict available tables.
You can list all catalogs to confirm that the new catalog is available, and inspect details with the Presto CLI, or a database management tool using the JDBC driver (as explained in “Presto Command-Line Interface” and “Presto JDBC Driver”):
SHOW
CATALOGS
;
Catalog
------------
system
postgresql
(
2
rows
)
SHOW
SCHEMAS
IN
postgresql
;
Catalog
------------
public
airline
(
2
rows
)
USE
postgresql
.
airline
SHOW
TABLES
;
Table
---------
airport
carrier
(
2
rows
)
In this example, you see we connected to a PostgreSQL database that
contains two schemas: public
and airline
. And then within the airline
schema are two tables, airport
and carrier
. Let’s try running a
query. In this example, we issue a SQL query to Presto, where the table exists in
a PostgreSQL database. Using the PostgreSQL connector, Presto is able to
retrieve the data for processing, returning the results to the user:
SELECT
code
,
name
FROM
airport
WHERE
code
=
'ORD'
;
code
|
name
------+------------------------------
ORD
|
Chicago
O
'
Hare
International
(
1
row
)
As displayed in Figure 6-1, the client submits the query to the Presto coordinator. It offloads the work to a worker, which sends the entire SQL query statement to PostgrSQL using JDBC. The PostgreSQL JDBC driver is contained within the PostgresSQL connector. PostgreSQL processes the query and returns the results over JDBC. The connector reads the results and writes them to the Presto internal data format. Presto continues the processing on the worker, provides it to the coordinator, and then returns the results to the user.
As we saw in the previous example, Presto is able to offload processing by pushing the SQL statement down into the underlying data source. This is known as query pushdown, or SQL pushdown. This is advantageous, as the underlying system can reduce the amount of data returned to Presto, avoiding unnecessary memory, CPU, and network costs. Furthermore, systems like PostgreSQL typically have indexes on certain filter columns, allowing for faster processing. However, it is not always possible to push the entire SQL statement down into the data source. Currently, the Presto Connector SPI limits the types of operations that can be pushed down to filter and column projections:
SELECT
state
,
count
(
*
)
FROM
airport
WHERE
country
=
'US'
GROUP
BY
state
;
Given the preceding Presto query, the PostgreSQL connector constructs the SQL query to push down to PostgreSQL:
SELECT
state
FROM
airport
WHERE
country
=
'US'
;
There are two important places to look when queries are pushed by a RDBMS
connector. The columns in the SELECT
list are set to specifically what is
needed by Presto. In this case, we need only the state
column for processing
the GROUP BY
in Presto. We also push the filter country = 'US'
, which means
we do not need to perform further processing of the country
column in Presto.
You notice that the aggregations are not pushed down to PostgreSQL. This is
because Presto is not able to push any other form of queries down, and the
aggregations must be performed in Presto. This can be advantageous because Presto
is a distributed query processing engine, whereas PostgreSQL is not.
If you do want to push additional processing down to the underlying RDBMS source, you can accomplish this by using views. If you encapsulate the processing in a view in PostgreSQL, it is exposed as a table to Presto, and the processing occurs within PostgreSQL. For example, let’s say you create the view in PostgreSQL:
CREATE
view
airline
.
airports_per_us_state
AS
SELECT
state
,
count
(
*
)
AS
count_star
FROM
airline
.
airport
WHERE
country
=
'US'
GROUP
BY
state
;
When you run SHOW TABLES
in Presto, you see this view:
SHOW
TABLES
IN
postgresql
.
airline
;
Table
---------
airport
carrier
airports_per_us_state
(
3
rows
)
Now you can just query the view, and all processing is pushed down to PostgreSQL, since the view appears as an ordinary table to Presto:
SELECT
*
FROM
airports_per_us_state
;
Currently, all RDBMS connectors use JDBC to make a single connection to the underlying data source. The data is not read in parallel, even if the underlying data source is a parallel system. For parallel systems, like Teradata or Vertica, you have to write parallel connectors that can take advantage of how those systems store the data in a distributed fashion.
When accessing multiple tables from the same RDBMS, a JDBC connection is created and used for each table in the query. For example, if the query is performing a join between two tables in PostgreSQL, Presto creates two different connections over JDBC to retrieve the data, as displayed in Figure 6-2. They run in parallel, send their results back, and then the join is performed in Presto.
As with aggregations, joins cannot be pushed down. However, if you want to take advantage of the performance enhancements possible in the underlying PostgreSQL system, you can create a view in PostgreSQL, or even add native indices for further improvements.
Currently, the Presto open source project has four RDBMS connectors. The MySQL, PostgreSQL, AWS Redshift, and Microsoft SQL Server connectors are already included in the plug-ins directory of Presto and ready to be configured. If you have multiple servers, or want to separate access, you can configure multiple catalogs in Presto for each instance. You just need to name the *.properties file differently. As usual, the name of the properties file determines the name of the catalog:
SHOW
CATALOGS
;
Catalog
------------
system
mysql
-
dev
mysql
-
prod
mysql
-
site
(
2
rows
)
Nuances exist among different RDBMSs. Let’s take a look at how each is configured in their catalog configuration files.
In MySQL, there is no difference between a database and a schema, and the catalog file and the JDBC connection string basically point at the specific MySQL server instance:
connector.name
=
mysql
connection-url
=
jdbc:mysql://example.net:3306
connection-user
=
root
connection-password
=
secret
PostgreSQL makes a clear distinction, and an instance can contain multiple databases that contain schemas. The JDBC connection points at a specific database:
connector.name
=
postgresql
connection-url
=
jdbc:postgresql://example.net:5432/database
connection-user
=
root
connection-password
=
secret
The AWS Redshift catalog looks similar to PostgreSQL’s. In fact, Redshift uses the PostgreSQL JDBC driver, since it is based on the open source PostgreSQL code and the JDBC driver is compatible and can be used:
connector.name=redshift connection-url=jdbc:postgresql://example.net:5439/database connection-user=root connection-password=secret
Microsoft SQL Server connection strings look similar to the MySQL string. However, SQL Server does have the notion of databases and schemas, and the example simply connects to the default database:
connector.name=sqlserver connection-url=jdbc:sqlserver://example.net:1433 connection-user=root connection-password=secret
Using a different database like sales
has to be configured with a property:
connection-url=jdbc:sqlserver://example.net:1433;databaseName=sales
Currently, the only way to authenticate for RDBMS connectors is by storing the username and password in the catalog configuration file. Since the machines in the Presto cluster are designed to be a trusted system, this should be sufficient for most uses. In order to keep Presto and the connected data sources secure, it’s important to secure access to the machines and configuration files. It should be treated the same way as a private key. All users of Presto use the same connection to the RDBMS.
If you do not want to store a password in cleartext, there are ways to pass through the username and password from the Presto client. We discuss this further in Chapter 10.
In conclusion, using Presto with RDBMSs is easy and allows you to expose all the systems in one place and query them all at the same time. This usage alone is already providing a significant benefit of Presto usage. Of course, it gets much more interesting when you add more data sources with other connectors. So let’s continue to learn more.
You have already discovered the TPC-H connector usage in Chapter 2. Let’s have a closer look.
The TPC-H and the TPC-DS connector are built into Presto and provide a set of schemas to support the TPC Benchmark H (TPC-H) and the TPC Benchmark DS (TPC-DS). These database benchmark suites from the Transaction Processing Performance Council are industry standard benchmarks for database systems, used to measure the performance of highly complex decision support databases.
The connectors can be used to test the capabilities and query syntax of Presto without configuring access to an external data source. When you query a TPC-H or TPC-DS schema, the connector generates some data on the fly by using a deterministic algorithm.
Create a catalog properties file, etc/catalog/tpch.properties, to configure the TPC-H connector:
connector.name
=
tpch
Configuration is similar for the TPC-DS connector; for example, with etc/catalog/tpcds.properties:
connector.name
=
tpcds
Both connectors expose schemas that include the same data sets in terms of structure:
SHOW
SCHEMAS
FROM
tpch
;
Schema
--------------------
information_schema
sf1
sf100
sf1000
sf10000
sf100000
sf300
sf3000
sf30000
tiny
(
10
rows
)
Table 6-1 shows how the different schemas contain increasingly larger numbers of records in
transactional tables such as orders
.
Schema | Count |
---|---|
tiny |
15000 |
sf1 |
1500000 |
sf3 |
4500000 |
sf100 |
150000000 |
You can use these data sets for learning more about SQL supported by Presto, as discussed in Chapter 8 and Chapter 9, without the need to connect another database.
Another important use case of these connectors is the simple availability of the data. You can use the connectors for development and testing, or even on a production Presto deployment. With the huge data amounts easily available, you can build queries that put a significant load on your Presto cluster. This allows you to better understand the performance of your cluster, tune and optimize it, and ensure that it performs over time and across version updates and other changes.
As you learned in “A Brief History of Presto”, Presto is designed to run fast queries at Facebook scale. Given that Facebook had a massive storage in its Hive data warehouse, it is only natural that the Hive connector is one of the first connectors that was developed with Presto.
Before you read about the Hive connector, and its suitability for numerous object storage formats, you need to brush up on your knowledge about Apache Hadoop and Apache Hive a bit.
If you’re not that familiar with Hadoop and Hive and want to learn more, we recommend the official websites of the projects, videos and other resources on the web, and some of the great books available. For example, Programming Hive by Edward Capriolo et al. (O’Reilly) has proven to be a great guide to us.
For now, we need to discuss certain Hadoop and Hive concepts to provide enough context for the Presto usage.
At its very core, Hadoop consists of the Hadoop Distributed File System (HDFS) and application software, such a Hadoop MapReduce, to interact with the data stored in HDFS. Apache YARN is used to manage the resources needed by Hadoop applications. Hadoop is the leading system for distributed processing of large data sets across clusters of computers. It is capable of scaling the system while maintaining a highly available service on top of a cluster of computers.
Originally, data processing was performed by writing MapReduce programs. They followed a specific programming model that enables the data processing to be naturally distributed across the cluster. This model works well and is robust. However, writing MapReduce programs for analytical questions is cumbersome. It also does not transfer well for existing infrastructure, tooling, and users that rely on SQL and data warehousing.
Hive provides an alternative to the usage of MapReduce. It was created as a method to provide a SQL layer of abstraction on top of Hadoop to interact with data in HDFS using a SQL-like syntax. Now the large set of users who know and understand SQL can interact with data stored in HDFS.
Hive data is stored as files, often referred to as objects, in HDFS. These files use various formats, such as ORC, Parquet, and others. The files are stored using a particular directory and file layout that Hive understands; for example, partitioned and bucketed tables. We refer to the layout as a Hive-style table format.
Hive metadata describes how data stored in HDFS maps to schemas, tables, and columns to be queried via SQL. This metadata information is persisted in a database such as MySQL or PostgreSQL and is accessible via the Hive Metastore Service (HMS).
The Hive runtime provides the SQL-like query language and distributed execution layer to execute the queries. The Hive runtime translates the query into a set of MapReduce programs that can run on a Hadoop cluster. Over time, Hive has evolved to provide other execution engines such as Apache Tez and Spark that the query is translated to.
Hadoop and Hive are widely used across the industry. With their use, the HDFS format has become a supported format for many other distributed storage systems such as Amazon S3 and S3-compatible stores, Azure Data Lake Storage, Azure Blob Storage, Google Cloud Storage, and many others.
The Hive connector for Presto allows you to connect to an HDFS object storage cluster. It leverages the metadata in HMS and queries and processes the data stored in HDFS.
Probably the most common use case of Presto is to leverage the Hive connector to read data from distributed storage such as HDFS or cloud storage.
Presto and the Presto Hive connector do not use the Hive runtime at all. Presto is a replacement for it and is suitable for running interactive queries.
The Hive connector allows Presto to read and write from distributed storage such as HDFS. However, it is not constrained to HDFS but designed to work with distributed storage in general. Currently, you can configure the Hive connector to work with HDFS, AWS S3, Azure Blob Storage, Azure Data Lake Storage, Google Cloud Storage, and S3-compatible storage. S3-compatible storage may include MinIO, Ceph, IBM Cloud Object Storage, SwiftStack, Cloudian, Riak CS, LeoFS, OpenIO, and others. A variety of these compatible stores exist. As long as they implement the S3 API and behave the same way, Presto does not need to know the difference for the most part.
Because of the widespread use of Hadoop and other compatible systems using HDFS and the expanded feature set of the Hive connector to support them, it can be considered the main connector for querying object storage with Presto and is therefore of critical importance for many, if not most, Presto users.
Architecturally, the Hive connector is a bit different from RBDMS and other connectors since it does not use the Hive engine itself at all. It therefore cannot push SQL processing to Hive. Instead, it simply uses the metadata in HMS and accesses the data directly on HDFS, using the HDFS client provided with the Hadoop project. It also assumes the Hive table format in the way the data is organized in the distributed storage.
In all cases, the schema information is accessed from HMS, and the data layout is the same as with a Hive data warehouse. The concepts are the same, but the data is simply stored in a location other than HDFS. However, unlike Hadoop, these non-Hadoop distributed filesystems do not always have an HMS equivalent to store the metadata for use by Presto. To leverage the Hive-style table format, you must configure Presto to use HMS from an existing Hadoop cluster or to your own standalone HMS. This can mean that you use a replacement for HMS such as AWS Glue or run a minimal Hadoop deployment with HMS only.
Using HMS to describe data in blob storage other than HDFS allows the Hive connector to be used to query these storage systems. This unlocks the data stored in these systems to all the power of SQL usage via Presto and any tool capable of using SQL.
Configuring a catalog to use the Hive connector requires you to create a catalog properties file with the desired name, for example, etc/catalog/s3.properties, etc/catalog/gcs.properties, etc/catalog/minio.properties, or even just etc/catalog/hdfs.properties or etc/catalog/objectstorage.properties. In the following, we assume the use of etc/catalog/hive.properties. At a minimum, you need to configure the connector name and the URL for HMS:
connector.name
=
hive-hadoop2
hive.metastore.uri
=
thrift://example.net:9083
Numerous other configuration properties apply for different use cases, some of which you learn more about soon. When in doubt, always make sure to check out the documentation; see “Documentation”. Let’s get into some of those details next.
Once the connector is configured, you can create a schema from Presto,f for example, in HDFS:
CREATE
SCHEMA
hive
.
web
WITH
(
location
=
'hdfs://starburst-oreilly/web'
)
The schema, sometimes still called a database, can contain multiple tables. You can read more about them in the next section. The schema creation typically creates only the metadata about the schema in the HMS:
... hdfs://starburst-oreilly/web/customers hdfs://starburst-oreilly/web/clicks hdfs://starburst-oreilly/web/sessions ...
Using Amazon S3 is not much different. You just use a different protocol string:
CREATE
SCHEMA
hive
.
web
WITH
(
location
=
's3://example-org/web'
)
... s3://example-org/web/customers s3://example-org/web/clicks s3://example-org/web/sessions ...
After the schema, we need to learn more about the content in the schema, organized as tables. Hive distinguishes between managed tables and external tables. A managed table is managed by Hive, and therefore also by Presto potentially, and is created along with its data under the location of the database’s directory. An external table is not managed by Hive and explicitly points to another location outside the directory that Hive manages.
The main difference between a managed table and an external table is that Hive, and therefore Presto, owns the data in the managed table. If you drop a managed table, the metadata in the HMS and the data are deleted. If you drop an external table, the data remains, and only the metadata about the table is deleted.
The types of tables you use really comes down to the way you plan to use Presto. You may be using Presto for data federation, your data warehouse or data lake, both, or some other mix. You need to decide who owns the data. It could be Presto working with the HMS, or it could be Hadoop and HMS, or Spark, or other tools in an ETL pipeline. In all cases, the metadata is managed in HMS.
The decision about which system owns and manages HMS and the data is typically based on your data architecture. In the early use cases of Presto, Hadoop often controls the data life cycle. But as more use cases leverage Presto as the central tool, many users shift their pattern, and Presto takes over control.
Some new Presto users start by querying an existing Hadoop deployment. In this case, it starts off as more of a data federation use, and Hadoop owns the data. You then configure the Hive connector to expose the existing tables in Hadoop to Presto for querying. You use external tables. Typically, you do not allow Presto to write to these tables in this case.
Other Presto users may start to migrate away from Hadoop and completely toward Presto, or they may start new with another object storage system, specifically, often a cloud-based system. In this case, it is best to start creating the data definition language (DDL) via Presto to let Presto own the data.
Let’s consider the following DDL for a Presto table:
CREATE
TABLE
hive
.
web
.
page_views
(
view_time
timestamp
,
user_id
bigint
,
page_url
varchar
,
view_date
date
,
country
varchar
)
In this example, the table page_views
stores data under a directory also named
page_views. This page_views directory is either a subdirectory under the
directory defined by hive.metastore.warehouse.dir
or is a different directory
if you defined the schema location when creating the schema.
hdfs:/user/hive/warehouse/web/page_views/...
And here’s an Amazon S3 example:
s3://example-org/web/page_views/...
Next, let’s consider DDL for a Presto table that points to existing data. This data is created and managed by another means, such as by Spark or an ETL process where the data lands in storage. In this case, you may create an external table via Presto pointing to the external location of this data:
CREATE
TABLE
hive
.
web
.
page_views
(
view_time
timestamp
,
user_id
bigint
,
page_url
varchar
,
view_date
date
,
country
varchar
)
WITH
(
external_location
=
's3://starburst-external/page_views'
)
This inserts the metadata about the table into the HMS, including the external path and a flag that signals to Presto and HMS that the table is external and therefore managed by another system.
As a result, the data, located in s3://example-org/page_views, may already exist. Once the table is created in Presto, you can start querying it. When you configure the Hive connector to an existing Hive warehouse, you see the existing tables and are able to query from them immediately.
Alternatively, you could create the table in an empty directory and expect the data to be loaded later, either by Presto or by an external source. In either case, Presto expects that the directory structure is already created; otherwise, the DDL errors. The most common case for creating an external table is when data is shared with other tools.
So far, you have learned how the data for a table, whether managed or external, is stored as one or more files in a directory. Data partitioning is an extension of this and is a technique used to horizontally divide a logical table into smaller pieces of data known as partitions.
The concept itself derives from partitioning schemes in RDBMSs. Hive introduced this technique for data in HDFS as a way to achieve better query performance and manageability of the data.
Partitioning is now a standard data organization strategy in distributed filesystems, such as HDFS, and in object storage, such as S3.
Let’s use this table example to demonstrate partitioning:
CREATE
TABLE
hive
.
web
.
page_views
(
view_time
timestamp
,
user_id
bigint
,
page_url
varchar
,
view_date
date
)
WITH
(
partitioned_by
=
ARRAY
[
'view_date'
]
)
The columns listed in the partitioned_by
clause
must be the last columns as defined in the DDL. Otherwise, you get an error from
Presto.
As with nonpartitioned tables, the data for the page_views
table is located
within …/page_views. Using partitioning changes the way the table layout is
structured. With partitioned tables, additional subdirectories are added within
the table subdirectory. In the following example, you see the directory
structure as defined by the partition keys:
... .../page_views/view_date=2019-01-14/... .../page_views/view_date=2019-01-15/... .../page_views/view_date=2019-01-16/... ...
Presto uses this same Hive-style table format. Additionally, you can chose to partition on multiple columns:
CREATE
TABLE
hive
.
web
.
page_views
(
view_time
timestamp
,
user_id
bigint
,
page_url
varchar
,
view_date
date
,
country
varchar
)
WITH
(
partitioned_by
=
ARRAY
[
'view_date'
,
'country'
]
)
When choosing multiple partitioned columns, Presto creates a hierarchical directory structure:
... .../page_views/view_date=2019-01-15/country=US… .../page_views/view_date=2019-01-15/country=PL… .../page_views/view_date=2019-01-15/country=UA... .../page_views/view_date=2019-01-16/country=US… .../page_views/view_date=2019-01-17/country=AR... ...
Partitioning gives you improved query performance, especially as your data grows in size. For example, let’s take the following query:
SELECT
DISTINCT
user_id
FROM
page_views
WHERE
view_date
=
DATE
'2019-01-15'
AND
country
=
'US'
;
When this query is submitted, Presto recognizes the partition columns in the
WHERE
clause and uses the associated value to read only the
view_date=2019-01-15/country=US subdirectory. By reading only the partition
you need, potentially large performance savings can result. If your data is small
today, the performance gain might not be noticeable. But as your data grows, the
improved performance is significant.
So far, you’ve learned about the Hive-style table format, including partitioned data. How do you get the data into the tables? It really depends on who owns the data. Let’s start under the assumption that you are creating the tables in Presto and loading the data with Presto:
CREATE
TABLE
hive
.
web
.
page_views
(
view_time
timestamp
,
user_id
bigint
,
page_url
varchar
,
view_date
date
,
country
varchar
)
WITH
(
partitioned_by
=
ARRAY
[
'view_date'
,
'country'
]
)
To load data via Presto, Presto supports INSERT INTO ... VALUES
, INSERT INTO
... SELECT
, and CREATE TABLE AS SELECT
. Although INSERT INTO
exists, it
has limited use, since it creates a single file and single row for each statement.
It is often good to use as you learn Presto.
INSERT SELECT
and CREATE TABLE AS
perform the same function. Which one you use is a
matter of whether you want to load into an existing table or create the table
as you’re loading. Let’s take, for example, INSERT SELECT
where you may be
querying data from a nonpartitioned external table and loading into a
partitioned table in Presto:
presto
:
web
>
INSERT
INTO
page_views_ext
SELECT
*
FROM
page_views
;
INSERT
:
16
rows
The preceding example shows inserting new data into an external table. By default,
Presto disallows writing to an external table. To enable it, you need to set
hive.non-managed-table-writes-enabled
to true
in your catalog configuration
file.
If you’re familiar with Hive, Presto does what is known as dynamic partitioning: the partitioned directory structure is created the first time Presto detects a partition column value that doesn’t have a directory.
You can also create an external partitioned table in Presto. Say a directory structure with data in S3 is as follows:
... s3://example-org/page_views/view_date=2019-01-14/... s3://example-org/page_views/view_date=2019-01-15/... s3://example-org/page_views/view_date=2019-01-16/... ...
We create the external table definition:
CREATE
TABLE
hive
.
web
.
page_views
(
view_time
timestamp
,
user_id
bigint
,
page_url
varchar
,
view_date
date
)
WITH
(
partitioned_by
=
ARRAY
[
'view_date'
]
)
Now let’s query from it:
presto
:
web
>
SELECT
*
FROM
page_views
;
view_time
|
user_id
|
page_url
|
view_date
-----------+---------+----------+-----------
(
0
rows
)
What happened? Even though we know there is data in it, the HMS does not
recognize the partitions. If you’re familiar with Hive, you know about the MSCK
REPAIR TABLE
command to autodiscover all the partitions. Fortunately, Presto
has its own command as well to autodiscover and add the partitions:
CALL
system
.
sync_partition_metadata
(
'web'
,
'page_views'
,
‘
FULL
’
)
...
Now that you have added the partitions, let’s try again:
presto
:
web
>
SELECT
*
FROM
page_views
;
view_time
|
user_id
|
page_url
|
view_date
-------------------------+---------+----------+------------
2019
-
01
-
25
02
:
39
:
09
.
987
|
123
|
...
|
2019
-
01
-
14
...
2019
-
01
-
25
02
:
39
:
11
.
807
|
123
|
...
|
2019
-
01
-
15
...
Alternatively, Presto provides the ability to create partitions manually. This is often cumbersome because you have to use the command to define each partition separately:
CALL
system
.
create_empty_partition
[
w
][
x
](
'web'
,
'page_views'
,
ARRAY
[
'view_date'
],
ARRAY
[
'2019-01-14'
]
)
...
Adding empty partitions is useful when you want to create the partitions outside Presto via an ETL process and then want to expose the new data to Presto.
Presto also supports dropping partitions simply by specifying the partition
column value in the WHERE
clause of a DELETE
statement. And in this example,
the data stays intact because it is an external table:
DELETE
FROM
hive
.
web
.
page_views
WHERE
view_date
=
DATE
'2019-01-14'
It is important to emphasize that you do not have to manage your tables and data with Presto, but you can if desired. Many users leverage Hive, or other tools, to create and manipulate data, and use Presto only to query the data.
Presto supports many of the common file formats used in Hadoop/HDFS, including the following:
ORC
PARQUET
AVRO
JSON
TEXTFILE
RCTEXT
RCBINARY
CSV
SEQUENCEFILE
The three most common file formats used by Presto are ORC, Parquet, and Avro data files. The readers for ORC, Parquet, RC Text, and RC Binary formats are heavily optimized in Presto for performance.
The metadata in HMS contains the file format information so that Presto knows
what reader to use when reading the data files. When creating a table in Presto,
the default data type is set to ORC. However, the default can be overridden in
the CREATE TABLE
statement as part of the WITH
properties:
CREATE
TABLE
hive
.
web
.
page_views
(
view_time
timestamp
,
user_id
bigint
,
page_url
varchar
,
ds_date
,
country
varchar
)
WITH
(
format
=
'ORC'
)
The default storage format for all tables in the catalog can be set with the
hive.storage-format
configuration in the catalog properties file.
By default, the GZIP
compression codec is used by Presto for writing files.
You can change the code to use SNAPPY
or NONE
by setting the
hive.compression-codec
configuration in the catalog properties file.
MinIO is an S3-compatible, lightweight distributed storage system you can use with Presto and the Hive connector. If you want to explore its use in more detail, you can check out our example project.
If your HDFS is secured with Kerberos, you can learn more about configuring the Hive connector in “Kerberos Authentication with the Hive Connector”.
Presto includes connectors to query variants of nonrelational data sources. These data sources are often referred to as NoSQL systems and can be key-value stores, column stores, stream processing systems, document stores, and other systems.
Some of these data sources provide SQL-like query languages such as CQL for Apache Cassandra. Others provide only specific tools or APIs to access the data or include entirely different query languages such as the Query Domain Specific Language used in Elasticsearch. The completeness of these languages is often limited and not standardized.
Presto connectors for these NoSQL data sources allow you to run SQL queries for these systems as if they were relational. This allows you to use applications such as business intelligence tools or allow those who know SQL to query these data sources. This includes use of joins, aggregations, subqueries, and other advanced SQL capabilities against these data sources.
In the next chapter, you learn more about some of these connectors:
NoSQL system such as Elasticsearch or MongoDB—“Document Store Connector Example: Elasticsearch”
Streaming systems such as Apache Kafka—“Streaming System Connector Example: Kafka”
Key-value store systems such as Apache Accumulo—“Key-Value Store Connector Example: Accumulo” and Apache Cassandra—“Apache Cassandra Connector”
Apache HBase with Apache Phoenix connector—“Connecting to HBase with Phoenix”
Let’s skip over these for now and talk about some simpler connectors and related aspects first.
The JMX connector can easily be configured for use in the catalog properties file etc/catalog/jmx.properties:
connector.name
=
jmx
The JMX connector exposes runtime information about the JVMs running the Presto coordinator and workers. It uses Java Management Extensions (JMX) and allows you to use SQL in Presto to access the available information. It is especially useful for monitoring and troubleshooting purposes.
The connector exposes a history
schema for historic, aggregate data, a
current
schema with up-to-date information and the information_schema
schema
for
metadata.
The easiest way to learn more is to use Presto statements to investigate the available tables:
SHOW
TABLES
FROM
jmx
.
current
;
Table
------------------------------------------------------------------
com
.
sun
.
management
:
type
=
diagnosticcommand
com
.
sun
.
management
:
type
=
hotspotdiagnostic
io
.
airlift
.
discovery
.
client
:
name
=
announcer
io
.
airlift
.
discovery
.
client
:
name
=
serviceinventory
io
.
airlift
.
discovery
.
store
:
name
=
dynamic
,
type
=
distributedstore
io
.
airlift
.
discovery
.
store
:
name
=
dynamic
,
type
=
httpremotestore
....
As you can see, the table names use the Java classpath for the metrics emitting classes and parameters. This means that you need to use quotes when referring to the table names in SQL statements. Typically, it is useful to find out about the available columns in a table:
DESCRIBE
jmx
.
current
.
"java.lang:type=runtime"
;
Column
|
Type
|
Extra
|
Comment
------------------------+---------+-------+---------
bootclasspath
|
varchar
|
|
bootclasspathsupported
|
boolean
|
|
classpath
|
varchar
|
|
inputarguments
|
varchar
|
|
librarypath
|
varchar
|
|
managementspecversion
|
varchar
|
|
name
|
varchar
|
|
objectname
|
varchar
|
|
specname
|
varchar
|
|
specvendor
|
varchar
|
|
specversion
|
varchar
|
|
starttime
|
bigint
|
|
systemproperties
|
varchar
|
|
uptime
|
bigint
|
|
vmname
|
varchar
|
|
vmvendor
|
varchar
|
|
vmversion
|
varchar
|
|
node
|
varchar
|
|
object_name
|
varchar
|
|
(
19
rows
)
This allows you to get information nicely formatted:
SELECT
vmname
,
uptime
,
node
FROM
jmx
.
current
.
"java.lang:type=runtime"
;
vmname
|
uptime
|
node
--------------------------+---------+--------------
OpenJDK
64
-
Bit
Server
VM
|
1579140
|
ffffffff
-
ffff
(
1
row
)
Notice that only one node is returned in this query since this is a simple installation of a single coordinator/worker node, as described in Chapter 2.
The JMX connector exposes a lot of information about the JVM in general,
including as Presto specific aspects. You can start exploring the available
information by looking at the tables starting with presto
; for example, with
DESCRIBE jmx.current."presto.execution:name=queryexecution";
.
Here are a few other describe statements worth checking out:
DESCRIBE
jmx
.
current
.
"presto.execution:name=querymanager"
;
DESCRIBE
jmx
.
current
.
"presto.memory:name=clustermemorymanager"
;
DESCRIBE
jmx
.
current
.
"presto.failuredetector:name=heartbeatfailuredetector"
;
To learn more about monitoring Presto by using the Web UI and other related aspects, you can head over to Chapter 12.
The black hole connector can easily be configured for use in a catalog properties file such as etc/catalog/blackhole.properties:
connector.name
=
blackhole
It acts as a sink for any data, similar to the null device in Unix operating systems, /dev/null. This allows you to use it as a target for any insert queries reading from other catalogs. Since it does not actually write anything, you can use this to measure read performance from those catalogs.
For example, you can create a test schema in blackhole
and create a table from
the tpch.tiny
data set. Then you read a data set from the tpch.sf3
data and
insert it into the blackhole
catalog:
CREATE
SCHEMA
blackhole
.
test
;
CREATE
TABLE
blackhole
.
test
.
orders
AS
SELECT
*
from
tpch
.
tiny
.
orders
;
INSERT
INTO
blackhole
.
test
.
orders
SELECT
*
FROM
tpch
.
sf3
.
orders
;
This operation essentially measures read performance from the tpch
catalog,
since it reads 1.5 million order records and then sends them to blackhole
.
Using other schemas like tcph.sf100
increases the data-set size. This allows
you to assess the performance of your Presto deployment.
A similar query with a RDBMS, object storage, or a key-value store catalog can be helpful for query development and performance testing and improvements.
The memory connector can be configured for use in a catalog properties file; for example, etc/catalog/memory.properties:
connector.name
=
memory
You can use the memory connector like a temporary database. All data is stored in memory in the cluster. Stopping the cluster destroys the data. Of course, you can also actively use SQL statements to remove data in a table or even drop the table altogether.
Using the memory connector is useful for testing queries or temporary storage. For example, we use it as a simple replacement for the need to have an external data source configured when using the Iris data set; see “Iris Data Set”.
While useful for testing and small tasks, the memory connector is not suitable for large data sets and production usage, especially when distributed across a cluster. For example, the data might be distributed across different worker nodes, and a crash of a worker results in loss of that data. Use the memory connector only for temporary data.
As you now know, the Presto project includes many connectors, yet sometimes you end up in a situation where you need just one more connector for that one specific data source.
The good news is that you are not stuck. The Presto team, and the larger Presto community, are constantly expanding the list of available connectors, so by the time you read this book, the list is probably longer than it is now.
Connectors are also available from parties outside the Presto project itself. This includes other community members and users of Presto, who wrote their own connectors and have not yet contributed them back, or cannot contribute for one reason or another.
Connectors are also available from commercial vendors of database systems, so asking the owner or creator of the system you want to query is a good idea. And the Presto community includes commercial vendors, such as Starburst, which bundle Presto with support and extensions, including additional or improved connectors.
Last, but not least, you have to keep in mind that Presto is a welcoming community around the open source project. So you can, and are encouraged to, look at the code of the existing connectors, and create new connectors as desired. Ideally, you can even work with the project and contribute a connector back to the project to enable simple maintenance and usage going forward.
Now you know a lot more about the power of Presto to access a large variety of data sources. No matter what data source you access, Presto makes the data available to you for querying with SQL and SQL-powered tools. In particular, you learned about the crucial Hive connector, used to query distributed storage such as HDFS or cloud storage systems. In the next chapter, Chapter 7, you can learn more details about a few other connectors that are widely in use.
Detailed documentation for all the connectors is available on the Presto website; see “Website”. And if you do not find what you are looking for, you can even work with the community to create your own connector or enhance existing connectors.