Distributed SQL: DDL and DML

This guide covers writing SQL that is optimized for MemSQL’s distributed system. MemSQL, even in a distributed environment, makes it extremely easy to query your data with a well-understood set of performance tradeoffs involving Data Definition Language (DDL) and Data Manipulation Language (DML) query design.

Most of the traditional tradeoffs still apply– indexes still speed up seeks and sorts, distinct aggregates are slower than simple aggregates, etc. However, in a sharded distributed system, an advanced SQL user must take into account data partitioning as well. This guide will walk you through how to tune your distributed DDL and DML to perform well in MemSQL’s distributed system. This guide is not about clustering features or high availability (see Managing High Availability) but instead focuses on query execution.

Query Execution Architecture

MemSQL is a two-tiered architecture consisting of aggregators and leaves. Aggregators are cluster-aware query routers that act as a gateway into the distributed system. The only data they store is cluster metadata. The leaves function as storage and compute nodes.

As a user, you interact with the aggregator as if it were a simple, single-box system. Under the hood, an aggregator queries the leaves, aggregates together intermediate results, and sends a final result back to the client. All of the communication between aggregators and leaves for query execution is implemented as SQL statements.

Data is sharded across the leaves into partitions. Each partition is a database on a leaf (named <dbname>_N) with a slice of each table you’ve created on the aggregator. By default, MemSQL will create 8 partitions per leaf, but this number is configurable when you create a database (CREATE DATABASE <dbname> PARTITIONS N). In MemSQL, a partition is an indivisible slice of data that can be replicated and moved around the cluster.

In the context of query execution, a partition is the granular unit of query parallelism. In other words, every parallel query is run with a level of parallelism equal to the number of partitions. You can view the partitions in a database with SHOW PARTITIONS ON <db>.

MemSQL also has the notion of reference tables. These tables are present in full on every machine in the cluster- every aggregator and every leaf- and are assumed to be relatively small. The ubiquity of reference tables makes them extremely cheap to join against. See Reference Tables for more details on their implementation. Distributed DML below discusses how to use them in query execution.

This diagram visually demonstrates the layout of data in the distributed system:

../_images/data-distribution.png

The >_ prompt represents the endpoint with which the MemSQL client interacts with the server. The red table is a reference table, and the black table is a large, partitionable table. On the single-box system, all three are on the same machine. The server that you connect to (>_) contains in full all of the tables that you can query.

In the distributed system, these three objects are split differently. First note that the reference table is present everywhere in the system: on both the aggregators and the leaves. Furthermore, the client interacts only with the aggregator machine, not the leaves. Finally, the large table is split up across the leaves into partitions.

Distributed DDL

Traditionally, a schema designer must consider how to layout columns, types, and indexes in a table. Many of these considerations still apply to a distributed system, with a few new concepts.

Every distributed table has exactly one shard key that determines which partition a given row belongs to. When you run an INSERT query against a MemSQL aggregator, the aggregator computes the hash value of this key and directs the INSERT query to a table in the appropriate partition on a leaf machine.

The only guarantee that you have about the physical location of data in the system is that any two rows with the same shard key value are guaranteed to be on the same partition. Because of this guarantee, MemSQL enforces that any PRIMARY or UNIQUE index must be a superset of the shard key. This way, any two rows which might conflict as duplicate keys are guaranteed to map to the same partition.

MemSQL’s distributed query optimizer leverages shard keys to determine how a read query should be executed. For example, queries that fully match the shard key can be routed directly to the leaf node with the matching partition. Queries that are guaranteed to be non-overlapping can be streamed without any processing on the aggregator. These optimizations are discussed in depth in Distributed DML.

MemSQL supports three types of shard keys:

Primary Key as the Shard Key

This is the default and also safest option for creating a shard key in MemSQL. You do not need to introduce any distributed-specific SQL syntax to shard by the PRIMARY KEY. Since every row is guaranteed to have a unique shard key, this technique also avoids the possibility of introducing data skew by adding repeated rows with the same shard key value.

Non-Unique Shard Key

The syntax for a non-unique shard key is just SHARD KEY (col1, col2, ...). For example:

CREATE TABLE clicks (
    click_id BIGINT AUTO_INCREMENT,
    user_id INT,
    page_id INT,
    ts TIMESTAMP,
    SHARD KEY (user_id),
    PRIMARY KEY (click_id, user_id)
);

Now, any two clicks by the same user will be guaranteed to be on the same partition. You can take advantage of this property in query execution for something like a COUNT(DISTINCT user_id) query, which knows that any two equal (non-distinct) user_id values will never be on different partitions.

You must take care to not introduce too much skew in the frequency distribution of shard key values. MemSQL assumes that each partition has a roughly equal number of rows, so you can quickly run out of memory on a certain leaf if you introduce too many values with the same shard key value. In this example, if every value of clicks has the same user_id, every row will map to a single partition and you will be limited in data capacity by the capacity of a single leaf machine.

Another thing to note about this table is that even though click_id will be unique, we still had to include user_id in the primary key. MemSQL enforces that every unique key (a PRIMARY KEY is unique) must be a superset of the shard key. In this example, MemSQL can only guarantee that two rows with the same click_id are unique if they map to the same partition; therefore, they must have the same value of the shard key, which is (user_id).

Foreign Shard Key

Foreign shard keys enable you to intelligently align rows across distributed tables so that they can be efficiently joined. For any row in the parent table, it is guaranteed that rows with the same shard key value in child tables are on the same partition. Here is an example of the syntax:

CREATE TABLE users (
    id BIGINT AUTO_INCREMENT,
    user_name VARCHAR(1000),
    PRIMARY KEY (id)
);

CREATE TABLE clicks (
    click_id BIGINT AUTO_INCREMENT,
    user_id INT,
    page_id INT,
    ts TIMESTAMP,
    FOREIGN SHARD KEY (user_id) REFERENCES users (id),
    PRIMARY KEY (click_id, user_id)
);

In this example, id must be the shard key of the users table: the foreign shard key on a child table must exactly match the shard key of its parent table. The shard key on the parent table can be any type of shard key (primary, non-unique, or foreign).

MemSQL can leverage a foreign shard key relationship to join two distributed tables across a foreign key. Since every relevant row is present on a single partition, the entire join operation runs in parallel across the cluster on the leaves. As a query writer, you must ensure that the join condition contains an equality between every foreign column and its parent column. For example, you could run the following join query:

-- These queries work
SELECT * FROM users INNER JOIN clicks ON users.id = clicks.user_id WHERE clicks.page_id = 10;
SELECT avg(c1.t - c2.t) FROM clicks c1 INNER JOIN clicks c2 ON c1.user_id = c2.user_id WHERE c1.page_id > c2.page_id;

-- These queries do not work
SELECT * FROM users INNER JOIN clicks ON users.click_id = clicks.user_id;
SELECT * FROM users INNER JOIN clicks ON users.id > clicks.user_id;

If you identify your data layout and join patterns in advance, this technique can be an extremely effective way to run performant joins between distributed tables.

Distributed DML

How a table is partitioned also affects the performance of certain kinds of SELECT queries. In this section we’ll look at common query patterns and how they’re executed through the distributed system. You can also use the EXPLAIN SELECT ... syntax to examine a query’s aggregator-level and leaf-level query plans. Keep in mind throughout this section that the query execution model in MemSQL is that a client sends a query to an aggregator, the aggregator transforms the query and sends the resulting SQL query to one or many leaves (one SQL query per partition), the leaves send results back to the aggregator, the aggregator processes and merges the results, and finally the aggregator sends the merged result back to the client.

These queries assume the following schema:

CREATE TABLE a (
    a1 int,
    a2 int,
    a3 int,
    SHARD KEY (a1, a2),
    KEY (a3)
);

CREATE TABLE b (
    b1 int,
    b2 int,
    b3 int,
    FOREIGN SHARD KEY (b1, b2) REFERENCES a (a1, a2)
);

CREATE REFERENCE TABLE r (
    r1 int,
    r2 int,
    PRIMARY KEY (r1),
    KEY (r2)
);

Index Matching

Matching the Shard Key. If you specify an equality on every column in the shard key, then the aggregator will direct that query to exactly one partition. Most queries do not fall into this pattern; instead, the aggregator must send queries to every partition in the cluster for intermediate results and then stitch them together. Queries that match the shard key, however, can be directed to exactly one partition and their results streamed back to the client without any processing. Because of their simplicity, these queries are the most efficiently executed in the distributed system.

-- These queries match
SELECT * FROM a WHERE a1 = 4 AND a2 = 10;
SELECT a3, count(*) FROM a WHERE a1 = 4 AND a2 = 10 GROUP BY a3;

-- These queries do not match
SELECT * FROM a WHERE a1 = 4;
SELECT * FROM a WHERE a1 = 4 OR a2 = 10;
SELECT * FROM a WHERE a1 IN (4, 5) AND a2 IN (10);

Secondary Index Matching. If you match a secondary index, then the aggregator must send the query to every partition in the cluster. Locally, each partition’s table will use its secondary index to speed up the query. While the overall performance of the query is dictated by the seek and scan time of these indexes, the fact that the query must be sent everywhere in the cluster can increase the variance (and therefore overall latency) of the query. Furthermore, sending several concurrent instances of these queries can flood the network, slowing down the system as a whole. As a general principle, these queries are very performant (especially for analytical queries) but are not as suited for high-concurrency transactional-processing workloads as queries that fully match the shard key.

-- This query match the secondary index on a3
SELECT * FROM a WHERE a3 = 5;

No Index Matching. Queries that do not match any index perform a full table scan on each partition. From the perspective of the aggregator, these queries are the same as queries that match a secondary index.

Aggregator Merging

Most queries that don’t involve aggregates, group bys, or order bys don’t require any further processing on the aggregator. These queries are forwarded verbatim to one or many partitions, and the partition’s results are streamed back directly to the client. More complex queries do require merge processing on the aggregator. You can examine the processing that a query involves by running EXPLAIN SELECT ....

Order By. ORDER BY queries that don’t involve aggregates or group bys can sort rows on the leaves and then merge the sorted intermediate results on the aggregator. For example, a query like SELECT * FROM a WHERE a3 = 5 ORDER BY a1 will follow this pattern. These queries leverage distributed (leaf) processing to do the majority of filtering and sorting, which makes them scalable with the amount of data in the system.

Aggregates. Queries with aggregates compute aggregate values on the leaves and then use aggregate merging on the aggregator to compute a final result. Each aggregate is either assumed to be associative or converted into an expression that is associative. For example, AVG(expr) is converted to SUM(expr)/COUNT(expr) automatically by the aggregator.

Distinct Aggregates. Distinct aggregates like COUNT(DISTINCT ...) are not as efficient as simple aggregates like COUNT(*). Distinct values must be resolved across partition boundaries (you could have a3=10 on two different partitions in SELECT COUNT(DISTINCT a3) FROM a), so each partition must send every distinct value it has back to the aggregator. Queries with distinct aggregates ship one row per distinct value per partition back to the aggregator and can therefore be expensive if there are a lot of distinct values.

There is an exception to this rule: if you run a DISTINCT aggregate which exactly matches the shard key, distinct values can be resolved on the leaves and the aggregator can merge aggregate values as it would with simple aggregates. An example of such a query is SELECT COUNT(DISTINCT a1, a2) FROM a.

Group By. GROUP BY queries are spread very efficiently across the leaves. The aggregator sends the GROUP BY construct to the leaves so that the leaves process data down to the size of the final, grouped result set. The aggregator then merges together these grouped results (combining aggregates along the way) and sends the final result back to the client. The efficiency of a distributed GROUP BY query is inversely proportional to the number of rows in the final result set, since the traffic through the system is roughly the number of partitions multiplied by the size of the grouped result set.

Having. HAVING clauses are processed entirely on the aggregator since they perform filtering after the GROUP BY operation is complete.

Joins and Subqueries

MemSQL exposes patterns of joins and subqueries that can be performed efficiently and blocks out most queries that cannot yet be run efficiently. MemSQL currently does not have a “catch-all” execution model that supports queries that do not match these workflows. Each pattern is designed to solve common problems encountered in customers’ applications.

Reference Joins. The general rule is that MemSQL will efficiently execute any join query with a single sharded table and as many reference tables as you’d like. Since reference tables are fully present on every machine in the cluster, leaves can efficiently join against their their local copies of reference tables.

-- These queries leverage reference joins
SELECT * FROM a INNER JOIN r ON a.a1 = r.r1;
SELECT * FROM r LEFT JOIN a ON a.a1 = r.r1;
SELECT * FROM a, r r1, r r2, r r3;
SELECT * FROM a INNER JOIN
    (SELECT DISTINCT r1 FROM r) x
    ON a.a1 = x.c;

Foreign Shard Key Joins. These joins are introduced above in Foreign Shard Key. MemSQL allows any number of distributed tables to be joined together as long as (a) every table is fully joined on a foreign shard key and (b) every table has a common foreign shard key ancestor.

-- These queries are allowed
SELECT * FROM a INNER JOIN b ON a.a1 = b.b1 AND a.a2 = b.b2;
SELECT * FROM a LEFT JOIN b ON a.a1 = b.b1 AND a.a2 = b.b2;
SELECT * FROM a, b, r WHERE a.a1 = b.b1 AND a.a2 = b.b2;
SELECT * FROM b INNER JOIN b t ON b.b1 = t.b1 AND b.b2 = t.b2;

-- These queries are not allowed (every table must be fully joined on a foreign shard key)
SELECT * FROM a INNER JOIN b ON a.a1 = b.b1;
SELECT * FROM a INNER JOIN b ON a.a1 = b.b1 OR a.a2 = b.b2;

-- Assume these tables also exist:
CREATE TABLE c (
    c1 int,
    c2 int,
    c3 int,
    FOREIGN SHARD KEY (c1, c2) REFERENCES a (a1, a2)
);

CREATE TABLE d (
    d1 int,
    d2 int,
    d3 int,
    SHARD KEY (d1, d2)
);

-- These queries work
SELECT * FROM a, b, c WHERE a.a1 = b.b1 AND a.a2 = b.b2 AND a.a1 = c.c1 AND a.a2 = c.c2;
SELECT * FROM b INNER JOIN c ON b.b1 = c.c1 AND b.b2 = c.c2;

-- These queries are not allowed (every table must have a common foreign shard key ancestor)
SELECT * FROM a INNER JOIN d ON a.a1 = d.d1 AND a.a2 = d.d2;
SELECT * FROM b, d WHERE b.b1 = d.d1 AND b.b2 = d.d2;

Subqueries. Subqueries that contain distributed tables are executed independently and then joined together on the aggregator. The result of each subquery is stored on a temporary table on the aggregator, so you must take care not to pull too many rows back to the aggregator. For example, SELECT * FROM (SELECT * FROM a) a1, (SELECT * FROM b) b2 would pull the entire a table and b table into temporary-tables on the aggregator and then join them together. This kind of query would almost always result in an out-of-memory error on the aggregator. However, this property can be used effectively for several query shapes:

-- The group by is done mostly on the leaves and the AVG(x) operation
-- is done on the aggregator.
SELECT AVG(x) FROM (SELECT a1, COUNT(*) x FROM a GROUP BY a1) t;

-- Each subquery (over a and over b) is done on the leaves and the
-- results stored in the aggregator. We are expecting each subquery to
-- return a relatively small number of rows. You could use a LIMIT clause
-- in each subquery to enforce this.
SELECT * FROM (SELECT * FROM a WHERE a1 = 5 AND a2 = 5) ax
    INNER JOIN (SELECT * FROM b WHERE b1 = 10 AND b2 = 10) bx
    ON ax.a1 = bx.b1;

Write Queries

Updates and Deletes. The WHERE clause in an UPDATE or DELETE query is optimized the same way as the WHERE clause in a SELECT query. If the predicate matches the shard key exactly then the query is routed to a single partition, etc.

Inserts. MemSQL executes INSERT queries by analyzing the insert values relevant to the shard key and routing the query to the corresponding partition. For example, INSERT INTO a (a1, a2, a3) VALUES (1, 2, 3) would compute the hash value of (1, 2) and map this value to the appropriate partition.

If you’re bulk inserting data with INSERT queries, then you should take advantage of the multi-insert syntax: INSERT INTO a (a1, a2, a3) VALUES (1, 2, 3), (2, 3, 4), .... The aggregator will chop up the multi-insert into single insert queries and run them in parallel across the cluster. This technique enables your application to combat the inherent latency of running in a distributed system.

Currently, in the event of a rollback (duplicate key error, out-of-memory, etc.), the aggregator will not roll back any successful tuples in the multi-insert query. This is in contrast to the single-box system, which will treat a multi-insert as one large transaction.

Note

If your data is in a flat-file format, MemSQL supports LOAD DATA to facilitate efficient data load.

::...
免责声明:
当前网页内容, 由 大妈 ZoomQuiet 使用工具: ScrapBook :: Firefox Extension 人工从互联网中收集并分享;
内容版权归原作者所有;
本人对内容的有效性/合法性不承担任何强制性责任.
若有不妥, 欢迎评注提醒:

或是邮件反馈可也:
askdama[AT]googlegroups.com



自怼圈/年番新

DU21.7
关于 ~ DebugUself with DAMA ;-)


关注公众号, 持续获得相关各种嗯哼:
zoomquiet


粤ICP备18025058号-1
公安备案号: 44049002000656 ...::