Amazon Redshift - Fundamentals

Jef Claes When I joined my current employer early 2014, I was handed over the ownership of a dashboarding and reporting solution. In the state the software was in, dashboards and reports alike would query the live transactional database directly. This approach was extremely simple and worked pretty well when there were only a handful

Amazon Redshift - Fundamentals

Jef Claes

When I joined my current employer early 2014, I was handed over the ownership of a dashboarding and reporting solution. In the state the software was in, dashboards and reports alike would query the live transactional database directly. This approach was extremely simple and worked pretty well when there were only a handful of people using the system. Things started to go awry when the website started serving more and more actual paying customers. The dasboards and reports would take a long time to load, or would even time out. To make things even worse, because of business owners frantically refreshing their dashboards, customers would start experiencing failing transactions.

With production on fire, your first instinct is to extinguish the fire as quickly as possible. We hastily disabled features that were not mission-critical, rewrote problematic queries, fine-tuned indexes and added some caching on top. This was a small investment, but had a big immediate impact on the performance of the dashboards. This hardly brought any relief for the reports though. They queried a much larger data set, and noticeably got slower day by day. Here, we introduced an intermediary layer that ran nightly roll-ups so that we could query an already aggregated, highly compressed data set.

These changes had a big pay-off and were stable for a good while. However, we still had a set of problems. When you’re seeing business 24/7, the scheduled nightly roll-ups still impacted a small set of customers.

Having to tune schemas and indexes in a way so that they can be used to both serve transactional and dashboarding workloads makes things more complex and fragile than they should be. Dashboarding queries had to be implemented and monitored carefully, since having a single dashboarding query go rogue could have a negative effect on transactional workloads.

With some big and non-trivial features in the pipeline, we set out some time to port the existing functionality onto infrastructure that could handle our ever demanding needs better. We settled for Amazon Redshift quite quickly.

Having experience running parts of our infrastructure on AWS, AWS is more often than not the first place we look. Low operational requirements, low-cost and room to scale up to petabytes of data, reduce the buy-in needed to build a proof of concept significantly.

In comparison to other Big Data products, Redshift supports Standard SQL. SQL is not always the most elegant language, but its declarative nature does a good job hiding most of the complexities of running massive parallel queries on a cluster of machines. It is ubiquitous in the developer world and well understood by most.

All these parameters combined allowed us to to gain enough confidence to validate our decision to go with Redshift in just a couple of weeks.

We got decent results very early on, but failing to fully understand all Redshift core concepts left room for improvement. Putting the work in to have a solid grasp on the fundamentals early on would definitely be a high-return investment. Shortly after committing to this goal, I started working on this piece of investigative writing - writing down what we learned, tuning our own setup along the way. By the time I finished writing the last chapter, we ported most of the functionality. This took us less than two months. Except for a handful of important schema design decisions, Redshift offered surprisingly little resistance. We were even able to drop the whole intermediary roll-up layer, saving a few thousands lines of code and removing another operational concern. Complexity dropped back to the level of the early days, where we could just directly query the raw data set in its completeness, without having to worry about things being too slow or affecting customer transactions.

My text editor counts almost 10000 words in this document, making it something the average reader comfortably works through in less than one hour. I hope it can save you the time of diving into a 400 page thick book, yet telling a more cohesive and compact story than reading the technical documentation.

The path I followed, takes you from the storage layer all the way up to query processing and workload management - layering one concept on top of the other. By the time you finish reading, you should have a solid understanding of the high-level architecture of a Redshift cluster and have a good idea of how to make it work in the real world. If you’re a technical decision maker, this text should allow you to make a well-informed decision on whether Amazon Redshift might be a sane addition to your stack.

I assume most people interested in this content have some experience working with a relational transactional database. Although the text contains some SQL queries you can play with on your own cluster, setting up a cluster, making provisioning decisions, monitoring overall health or connecting to the cluster are not covered. The AWS console does quite a good job of guiding you through this process. I’d advice you to go through the process of setting up a cluster, to immediately destroy it before you start reading. It should only take minutes.

I hope that this essay can teach you as much about Amazon Redshift as it has taught me.

In the first chapter, we’ll have a look at how Redshift stores data on disk. How columnar storage is different from row-based storage, and how it makes certain workloads run much faster than other databases.

Columnar

A lot of tools we use to store, manipulate and visualize data, have conditioned us to reason about data in a row-based fashion. It starts even before we ever touch a computer. Most of us learn to write left-to-right, top-to-bottom, appending line after line. It takes a bit of will to learn to reason about data in dimensions we’re not used to.

Let’s look at this table that contains a bunch of data on individual order items. The table is highly denormalized. A single row contains more than 30 columns.

12014-1-1 00:01859121050.0010.50EUR
22014-1-1 00:0220151116920.004.20EUR
32014-1-1 00:0362210.000.00USD

To compare apples to apples, we’ll first load this data in a row-based database. The database is set up to store each table in a dedicated file. Each file is chopped up into fixed-size blocks. Depending on the row size, multiple rows might fit in a single block, or a single row might need to be spread over multiple blocks. All rows we persist are stored in the same file, conveniently named table_orderitems.data.

row 1row 2row 3row..

After loading the order items for the last ten years, the file has grown to take up 79GB on disk. We can now start questioning the data. We’ll query the sum of all items sold, grouped by the currency.

Since the smallest unit of IO the database can work with is a block, the database will have to read all the blocks in the table_orderitems.data file when executing the query for the first time. Even though we only really care about the price and the currency columns.

Columnar databases optimize for this type of workloads by laying out data differently. If we load the order items data set in a columnar database, we also end up with a table_orderitems.data file, but the blocks contain column values instead of full rows. So instead of having to read the full 79GB from disk, our query can now only read the blocks that contain the values of the price and currency columns. Meaning the query can now be satisfied by reading only a fraction of the data from disk, instead of the full 79GB.

column 0column 0column 0column..

This makes columnar databases extremely efficient for read workloads that only require a small set of columns, but inefficient for those that read full rows. Efficient for the type of queries you often find in reporting, business intelligence or analytics - or OLAP (Online analytical processing) in short. Inefficient for those you find in a transaction processing application - OLTP (Online transaction processing). When your queries require a large set of columns you will often be better off sticking to a row-based database.

Redshift stores tables in their own file in the data directory. However, the file won’t be named table_orderitems.data. Redshift assigns each file an internally assigned filenode number. You can query the catalog tables to find out the file name, but Redshift won’t allow you access to the underlying storage to verify if the file really exists on disk.

order_items174738

As we learned earlier, files are divided up into blocks, and blocks contain column values. We can peek inside a file and see how many blocks are used per column.

0624
11672
21304
3028
3128
324940

The result shows 33 columns, 3 more than expected. The last three columns are hidden columns used by Redshift: insert_xid, delete_xid and row_id.

Now that we know the number of blocks used to store a single column, we’re still missing one piece of vital information: the block size. Redshift’s user interaction layer is based on PostgreSQL. So if you have experience tweaking and running a PostgreSQL installation, you might remember the show block_size command off the top of your head. However, executing this command in Redshift - even when we’re logged in as root, returns an error saying we have insufficient permissions. A good reminder Redshift is a piece of proprietary technology running on rented infrastructure. Fortunately the Redshift docs state more than once that the block size is set to 1MB.

In databases designed for OLTP workloads you will find far smaller block sizes (2KB, 4KB, 8KB and up). Smaller block sizes yield good results when you read and write small pieces of data and do random IO. Large block sizes perform better when you write and read large pieces of data in a sequential fashion. They waste less space storing block metadata and reduce the number of IO operations.

If you multiply the number of blocks by the block size, you end up with the actual space on disk being used to store a column or a table. A block size of 1MB makes the calculation very simple.

When Redshift writes blocks, it stores a bit of metadata for each block. A vital piece of information stored is the minimum and maximum value in each block, creating zone maps. Zone maps can speed up queries drastically by allowing Redshift to skip whole blocks while scanning for data.

As an example, let’s say we want to query how many order items were placed in 2018.

We can query the block metadata for the placed_at column.

After translating the min and max value, the result set looks something like this.

02014-1-12014-2-3
12014-2-32014-3-8
15002018-1-12018-1-11
15012018-1-112018-1-25

Having this piece of information allows Redshift to skip all blocks except for block 1500 and 1501. Admittedly, because the values of the placed_at column are sequential, the efficiency of this query is phenomenal.

If we look at another column that has no sequential order, like price, the use of zone maps becomes way less efficient.

013499
112999
40012875
40113889

If we query the order_items table for items that cost less than 100 EUR, we can’t skip any blocks and will have to scan each block individually. If we were interested in items that cost more than 3000 EUR, we would be able to skip roughly 50% of all blocks.

Sort keys

In a row-based database, indexes are used to improve query performance. An index is a data structure that allows for faster data retrieval. A clustered index defines in which order rows are stored on disk, while a non-clustered index is a separate data structure (generally a tree) sitting next to a table. It stores a small set of individual columns together with a pointer to the full row in the table. This means the database can scan this small tree structure instead of the complete table.

For example, 7 values of the price column might translate to this small tree structure which is faster to scan than a list of 7 values.

 | 10 | 20 -- | | | 30 50 -- | | 60 | 72 -- | | 100

While indexes can make all the difference in a row-based database, you can’t just try to make a row-based database behave like a columnnar database by adding indexes to each column. Indexes need to be maintained with each write to a table. The cost of both IO in time and storage increases with each index added.

Redshift doesn’t implement non-clustered indexes since each column almost acts as its own index. However as proven by the placed_at range query earlier, the order of column values in blocks makes a big difference in query performance by making zone maps extremely efficient. Redshift allows you to define a sort key, similar to a clustered index, deciding which column values will be sorted on disk writes. The order_items sort key was created on the placed_at column, since we tend to look at a lot of our data month by month.

The sort key can either be a single column or a composition of multiple columns.

There are two styles of sort keys: compound and interleaved. To have a benchmark, we’ll use a query that counts the number of items sold where the customer_id = 1 and the product_id = 2.

Compound

Since there is more than one column in the predicate, we’ll compose a compound sort key containing both customer_id and product_id. The compound key looks just like the result of an order by on two columns.

111
112
113
114
221
222
223
224
331
332
333
334
441
442
443
444

Using this sorting style, Redshift only needs to read a single block; the first one. The zone maps contain enough data to quickly eliminate the other blocks.

However, if we would only count the product_id values that equal 3, the whole sort key becomes way less efficient. Redshift is forced to scan all blocks when the sort key prefix is not involved in the predicate.

Interleaved

When you want to assign each column in the sort key an equal weight, you can use an interleaved sort key instead of a compound one. Redshift will reduce multiple columns to one dimension, while preserving locality of the data points. Sorting and querying an interleaved key can be visualized as laying values out over a multi-dimensional graph, looking at the data points as coordinates of a map, chopping up the map in blocks.

 ProductId | | 4 | (1,4) (2,4) | (3,4) (4,4) | | 3 | (1,3) (2,3) | (3,3) (4,3) |--------------| ------------- 2 | (1,2) (2,2) | (3,2) (4,2) | | 1 | (1,1) (2,1) | (3,1) (4,1) |_ _ _ _ _ _ _ | _ _ _ _ _ _ _ CustomerId 1 2 3 4
111
112
121
122
213
214
223
224
331
332
341
342
433
434
443
444

If we now query for a specific customer_id, Redshift will have to read two blocks. When we query a specific product_id, Redshift will also have to read two blocks. However, when we query a specific customer_id for a specific product_id, Redshift will only need to read one block. When all the columns in the sort key are specified, we can pinpoint the exact location of the data. The less columns in the sort keys we specify in the predicate, the harder it becomes to know the exact location of the data. Once again, it’s very much like a spatial search. If I’d tell you the coordinate of a location with a partial coordinate of (0, ?), you would know the location has to be somewhere on the equator, but you wouldn’t be able to pinpoint the exact location.

Compression

When we use compression, we choose to pay a higher price of compute in order to reduce the cost of storing and shuffling data around by making our data smaller. When you consider your application to be IO bound, making use of compression might have a big impact. Whether or not compression will bring the improvements you hope for, depends on whether the data suits the chosen compression algorithm.

Row-based databases often support compression on the row- or block level. One way to compress a sample data set containing 4 rows, is by building a dictionary which stores each unique value together with an index, and replacing the values inside the rows with its index.

Uncompressed
50.0010.00EUR
50.0010.00USD
50.0010.00EUR
100.0020.00EUR
Dictionary
050.00
110.00
2100.00
320.00
4EUR
5USD
Compressed
014
015
014
234

The dictionary will be stored on the same block as the rows as a part of the header. Keeping the size of the dictionary in mind, this algorithm compresses the contents of the block by approximately 35%.

To improve the compression ratio, we need to make the dictionary more efficient. We can consider the dictionary more efficient when it is smaller relative to the size of the data set. This can be achieved by feeding the algorithm data that’s more homogeneous and contains fewer distinct values. These are two requirements that columnar storage can easily satisfy.

Because columnar databases group column values together, the values share the same type, making the data set to compress homogeneous. When the data set is guaranteed to be of single type, we can - instead of a one-size-fits-all algorithm - choose a compression algorithm that best suits the data.

There are a few obvious examples of this. Timestamps are highly unique, but if we store the differences between each timestamp, we can save a lot of space. Long varchars, like the body of a tweet or a blog post, can benefit from a compression technique that stores individual words in a separate dictionary. For types like a boolean, you might want to avoid compression as a whole, the values are not going to get smaller than a single bit.

Looking back to the dictionary approach, there’s one other obvious change we can make to make it more efficient: make the blocks larger. The more data we compress, the better the odds of it containing the same value more than once. Redshift uses a block size of 1MB, yielding a much higher compression ratio than your day-to-day OLTP row-based database.

7 types of compression encoding are supported:

  • RAW
  • Byte-Dictionary
  • Delta
  • LZO
  • Mostly
  • Runlength
  • Text255 and Text32k
  • Zstandard

This gives us quite a few options to choose from, which might feel like a time-consuming task. However, Redshift doesn’t really need us to get that intimate with our data set. Out of the box, there are tools available that help select the right compression encoding.

If you already have a table that contains a usable amount of data, Redshift can analyze the columns for you and advise an optimal encoding.

order_itemsiddelta0.00
order_itemsplaced_atzstd39.83
order_itemscustomer_idzstd44.39

Pay attention though. The ANALYZE COMPRESSION command acquires an exclusive lock, preventing concurrent reads and writes against the table.

When you bulk import data into a table using the COPY command - which is the recommended way, Redshift will automatically apply compression if all conditions are met. For automatic compression to be applied, the destination table needs to be empty, all columns need to have RAW or no encoding applied and there needs to be enough sample data in the source for the analysis to be reliable.

There is a notable difference between using the ANALYZE COMPRESSION command or having Redshift automatically apply compression during the COPY operation. The ANALYZE COMPRESSION command looks for minimum space usage and will vary frequently recommend ZStandard. The COPY command on the other hand looks for the most time efficient encoding, which will most likely be LZO.

Now that we have a basic understanding of how Redshift stores data on disk, we’ll have a look at the composition of a Redshift cluster.

Nodes

A Redshift cluster contains one or multiple nodes. Each cluster contains exactly one leader and one or more compute nodes. If you have a cluster with only one node in it, this node assumes both roles: leader and compute.

As a client application, the composition of a cluster is transparent to you. You will always only talk to the leader node. The leader node is responsible for the intake and coordination of queries. When the leader node receives a query, it first parses the query and turns it into an optimized execution plan. These steps will then be compiled into code and distributed to the compute nodes. When the compute nodes return intermediate result sets, the leader merges these together and replies to the client application.

 Client | | Leader --------------------- / | \ | Compute | Compute | Compute | 

Slices

Redshift divides the system up into even more little computation units. Each compute node is chopped into slices. Each slice is allocated a portion of the machine’s disk, memory and CPU. Workloads are then distributed over these slices, having even more isolated individual units working in parallel to come up with results fast.

 Client | | Leader ------------- / | \ | Compute | Compute | Compute | | ------- | ------- | --------| | Slice 0 | Slice 2 | Slice 4 | | Slice 1 | Slice 3 | Slice 5 | 

The number of nodes and slices assigned to your cluster, can be found in the stv_slices table.

00
01
12
13

Inspecting the result, you can see I’m using a small cluster with 2 nodes that only contain 2 slices per node.

Distribution style

When you insert data into a table, the leader node decides how it is distributed over the slices in the Redshift cluster. When you create a table, you can choose between three distribution styles: even, key and all. The way a table is distributed over the cluster has a big impact on performance. Ideally, you want to distribute data evenly across the cluster while collocating related data. When a slice executing a query requires data that lives somewhere else, Redshift is forced to talk to other nodes in its network and copy some of their data over. In any distributed application, you want to distribute work in a way that allows each node to focus on the work at hand by keeping the overhead of communication to a bare minimum.

Even distribution

Using this style, the leader node will distribute the table over all the slices in the cluster in a round-robin fashion.

The denormalized order_items table we used earlier is a good candidate for an even distribution style. If we write a query that counts all the order items that were sold for 50 EUR or higher, all the slices would be put to work, achieving maximum parallelism. Even distribution generally works very well when there is no need to join the table.

When you don’t specify a style when creating a table, this is the default.

Key distribution

Using key distribution, a single column decides how the table is distributed over the slices. Try to choose a key that distributes the data evenly over the slices, while considering which tables it will often be joined with.

If, instead of a denormalized order_items, we would model it as a star schema, we might end up with the tables orders, order_items, customers, products etc. With this model, we know that we would need to join the tables orders and order_items almost always. Here it makes sense to define orders.id and order_items.order_id respectively as the distribution key, ensuring that there is no data redistribution required before being able to perform the join.

Making these decisions, you want to be informed about the queries to expect and the data distribution of the tables. The first one requires you to actually talk to business owners, the second one can be acquired by querying existing data.

152828
587828
200127

When you start searching for a good distribution of the data in certain columns, you will most likely find some interesting insights.

For example, looking at the order items per currency, we can see that one currency dominates sales. Fair enough, we only started selling outside of Europe the last few months. Choosing the currency as the distribution style distributes the data unevenly, introducing skew, turning specific slices into hot-spots doing more work than others.

EUR81201057
USD32481
JPY15887

Another example would be the tax column. Looking into the data, we learned that some regions are tax exempt, making this column a disasterous distribution key. Every nullable column should raise a red flag when shopping for a distribution style.

Another anti-pattern to look out for, is having a date- or timestamp as the distribution key. A query that aggregates data in a small date range might evaluate on a single slice, instead of distributing the work over multiple slices.

The distribution key is defined when creating the table.

When working with an existing data set, or when testing out different models, we can use the system table svv_table_info for any skew that looks off. The skew_rows column tells us the ratio of the number of rows in the slice with the most rows to the number of rows in the slice with the least rows.

Assuming we run a single node with two slices. If we only had 3 customers, and we would make customer_id the distribution key, we would end up with a ratio of 2 to 1.

All distribution

In a star schema, you’ll notice that it’s more often than not impossible to optimize for all cases. The all distribution can come in handy here. With this distribution style, a table is distributed to all nodes. This works really well for smaller tables that don’t change much. Distributing a table to each node will improve query performance when this table is often included in a join, but importing the data will also take much longer because it needs to be distributed to each node.

Note that the tables are distributed to each node, not each slice. So load times of tables using the all distribution will be very different when you have a cluster with 16 small nodes, compared to a cluster with 2 big nodes, even though both configurations add up to the same amount of slices.

The all distribution style is defined when creating the table.

Durability and fault tolerance

So far we’ve seen that Redshift uses a cluster of nodes as a means to distribute data and to parallelize computation. In general, with each component added to a cluster of machines, the odds of failure increase. Each failure has the potential to bring the whole system to a halt, or even worse, to lose data.

Redshift has two effective measures in place to prevent data loss, ensuring durability. When you load data, Redshift synchronously replicates this data to other disks in the cluster. Next to that, data is also automatically replicated to S3 to provide continuous and incremental backups. Note that synchronous replication is not supported in a single node cluster - there’s nowhere to replicate to. It’s recommended to at least run two nodes in production.

A Redshift cluster is actively monitored for disk and node failures. In case of a disk failing on a single node, the node automatically starts using an in-cluster replica of the failing drive, while Redshift starts preparing a new healthy drive. When a node dies, Redshift stops serving queries until the cluster is healthy again. It will automatically provision and configure a new node, resuming operations when data is consistent again.

In theory, you can expect an unhealthy cluster to heal itself - without any intervention. Depending on the gravity of the failure, some downtime is to be expected. Redshift favors consistency over availability.

There are two ways to load data into Redshift. You can use an INSERT statement, or you can use the COPY command. The INSERT statement inserts a single row, while the COPY command loads data in bulk. Both can participate in a transaction.

Multiversion Concurrency Control

There are design decisions Redshift made that will make you always want to opt for bulk operations if possible. Blocks are immutable by design, this means that for each statement you execute on a single row, the database will need to clone a whole 1MB block. Immutable blocks allow Redshift to keep serving consistent reads while writes are happening. This is a technique called Multiversion Concurrency Control, or MVCC in short - used by many modern storage engines. So for each write, the modified block will be copied, incrementing its version number, while the old block will be marked for deletion. Even though the old block is marked for deletion, queries in flight can continue to read the block without seeing inconsistent data or having to retry. Queries that are issued after the last write, will query the block with the highest version. Compared to other datawarehouse solutions, this makes that bulk loading data into Redshift is an online solution. You can just write batches of data, without seeing blocked or inconsistent reads.

The COPY command supports importing data from a handful of sources: Amazon S3 (Simple Storage Service), Amazon EMR (Elastic MapReduce) and Amazon DynamoDB.

Copy from S3

When you’re copying data from a relational database into Redshift, the best option is to use file based imports using Amazon S3. You read the data from the source database, write it to a file, compress the file and upload it to your private S3 bucket attaching a bit of useful metadata like file type, source etc. On the receiving end, you subscribe to an S3 event and import the file. Ideally, you persist the path of the uploaded file with its metadata to a small database and checkpoint the files you’ve imported. Having this secondary index into your private S3 bucket together with a checkpoint allows imports to pick up where they left off and to import the whole lot again later on.

Appending

Immutable data makes shoveling data around so much easier in general. If the source database contains concepts like clicks, events or transactions, we can read the rows since our last checkpoint and write those to a file. To add the data to the destination Redshift database, it’s just a single COPY command.

When you’re dealing with temporal data, such as clicks or transactions, chances are high the timestamp column is defined as the sort key. When the source data is also sorted by timestamp, Redshift won’t need to change any blocks, resulting in an extremely efficient copy operation.

Deep Copy

When the source database contains tables where rows change, and you can afford to have somewhat stale data or the data doesn’t change often, you can consider replacing the destination table as a whole.

When a new snapshot of the source table is uploaded, you can truncate the destination table and copy the file into the table.

This has one caveat though. The TRUNCATE command commits immediately and can not be rolled back. Between truncating the destination table and copying the new data from the source file, the table is empty, which could result in inconsistent results.

By introducing a staging table and some intermediary steps, it’s possible to replace the table’s contents as one single atomic operation. This ensures queries never get to see an empty table.

  • Truncate the staging table
  • Begin a new transaction
  • Copy the data to the staging table
  • Rename the destination table, by postfixing it with old
  • Rename the staging table to the destination table by removing the staging postfix
  • Rename the old table, by changing the old postfix to staging
  • Commit the transaction

Constraints

As you’ve already noticed in the examples used in the previous chapters, defining the schema of a table in Redshift is not much different than it would be any other relational database. If you’re used to other relational databases and have always been disciplined about defining constraints as part of your schema, you might be up for a surprise.

Although Redshift allows you to define foreign keys and unique constraints, they are not enforced when modifying data. Can you imagine how expensive a load operation would be in a distributed database like Redshift? Each load operation would require all the slices to freeze time and to talk to one another before allowing a write to happen.

This doesn’t mean that you shouldn’t define constraints though. Redshift makes use of the schema metadata to optimize query plans.

Unsorted data

When loading new data with a sort key, Redshift does not resort and rewrite all the existing blocks already on disk.

To query the percent of unsorted rows in a table, we can use the svv_table_info.

The result shows that 17.11% of the users table is unsorted.

When the table contains a sort key, you can try to load the data in sort key order to avoid introducing unsorted data. For example, if you sort the users table on registered_at or user_id, newly appended rows will not overlap with the existing zone maps. If you were to sort on the country_code, newly added rows will most likely overlap with existing zone maps. Zone maps are most effective when the min-max ranges don’t overlap.

Here’s an example of a healthy looking zone maps containing numbers.

11100
2101200
3201300
4301400

If Redshift needs to count all columns with a value 151, Redshift can - based on the zone maps, exclude all blocks except for block 2. Redshift only needs to read a single block.

When zone maps overlap, they could look like this.

11400
250200
3201300
4100400

Based on these min-max ranges, Redshift can only exclude one block, block 3. This forces Redshift to read all three blocks, searching for the value 151.

Deleted data

When you’re updating or deleting data, the data structures on disk will get fragmented. Because of Redshift’s implementation of MVCC, blocks are marked for deletion when they’re changed, but still linger around. The deleted blocks take up space on disk, but even worse, the query processor needs to know the block with the highest version, forcing a scan.

To see how bad things are, you can query the system table stv_tbl_perm.

1000100900

The result of this query shows that of the 1000 rows on disk, 100 are queryable and 900 are marked for deletion.

Avoiding dead data as a whole is not that simple. You can try to avoid modifying existing blocks by reducing the number of (small) inserts, updates and deletes. Copy data in big batches as much as you can afford.

Vacuuming

It’s impossible to keep data in use to be fragmentation free. That’s why Redshift also, like PostgreSQL, supports VACUUM. The VACUUM command gets rid of the blocks marked for deletion, reclaims space and restores the sort order.

It’s a good idea to clean house on a regular basis, but to not overdo it either. VACUUM is an expensive operation that will temporarily degrade cluster performance. It should be scheduled outside business hours as much as possible or after you’ve really made a mess of things (after a nightly batch of imports for example).

Redshift helps you avoid being too eager when doing maintenance. By default, VACUUM will skip the sort phase for any table where more than 95% of the table’s rows are already sorted. You can tweak this value on a per query basis, by adding TO threshold PERCENT to your query.

The VACUUM command can be performed to a single table or the whole database. A handful of modes are supported: The FULL mode reclaims disk space and resorts unsorted rows. The SORT ONLY mode only resorts unsorted rows, without reclaiming disk space. The DELETE ONLY mode reclaims disk space, without resorting.

Make use of the statistics available in the system tables to decide whether table maintenance is really necessary and which VACUUM mode is best suited for the job. When your cluster hardly serves any queries outside business hours, it does not hurt to be thorough and to use that idle time to simply vacuum the whole database using sane thresholds.

The COPY command makes bulk loading data into Redshift trivial. Its counterpart is the UNLOAD command. The UNLOAD command enables you to export the results of a query to S3. This is useful to export large data sets fast, without having to write too much code.

The UNLOAD operation writes one or more files (or parts) per slice in parallel. By disabling parallelization, you can force Redshift to only write a single file. Files have a maximum file size of 6.2GB. The file names are postfixed with a slice number and a part number.

orderitems0000_part_00
orderitems0000_part_01
orderitems0001_part_00

Query plans

When a client application sends a query to Redshift, the leader node first parses the query, then looks at statistics available and computes an optimized query plan. The query plan then gets sent to each compute node, where the query plan is executed and the results are returned to the leader node.

Although query plans are executed by machines, with a bit of exercise their textual representation can be interpreted by humans as well. The EXPLAIN command returns a human readable version of the query plan for any given query.

Let’s first look at a simple COUNT.

XN Aggregate (cost=8887738.40..8887738.40 rows=1 width=0) -> XN Seq Scan on orderitems (cost=0.00..7110190.72 rows=711019072 width=0)

The result represents a tree structure. Indentation and arrows are used to show the different nodes and child nodes of the tree. Lines are read bottom to top, each line representing a simplified operation. Details (between the brackets) show how much data is processed during the operation and what the relative cost is compared to the whole query plan. The cost consists of two values divided by dots (..). The first value is the cost of returning the first row, and the second value is the cost of completing the whole operation. Amount of data processed and its cost are based on statistics maintained by the database and do not reflect reality necessarily.

In this example, the whole table needs to be scanned, since each row is counted. Once the rows have been counted, the result is aggregated into a single row. The costs are cumulative as you go up the plan. In this example, the cost of the Aggregate operation contains the cost of the Seq Scan on OrderItems. Between operations, you might notice a gap in the cost, this is due to the estimated start up cost of the operation.

Now here’s another example that’s a bit more complex. We’ll count the order items placed each year, sorted by the year they were placed in.

XN Merge (cost=1000012509454.50..1000012509478.50 rows=9600 width=8) Merge Key: date_trunc('year'::text, placedat) -> XN Network (cost=1000012509454.50..1000012509478.50 rows=9600 width=8) Send to leader -> XN Sort (cost=1000012509454.50..1000012509478.50 rows=9600 width=8) Sort Key: date_trunc('year'::text, placedat) -> XN HashAggregate (cost=12508771.52..12508819.52 rows=9600 width=8) -> XN Seq Scan on orderitems (cost=0.00..8934836.80 rows=714786944 width=8)

This query plan is a bit longer, but the extra operations are pretty easy to map to the new query. Bottom up again, Redshift scans the order items, hashing the year and counting the items, to finally sort the result. This being a parallel operation, each slice sends its results to the leader node, for the leader node to merge them to a single result.

In the next example, we query more than a single table. With this query we want to get a feel of the demographics of our user base. We look at which countries user logins are originating from and we join that with the language users have the website content served in.

XN HashAggregate (cost=23048307962.53..23048307965.14 rows=1044 width=14) -> XN Hash Join DS_BCAST_INNER (cost=1440.51..23048287989.65 rows=2663051 width=14) Hash Cond: ("outer".userid = "inner".id) -> XN Seq Scan on logins (cost=0.00..26630.50 rows=2663050 width=13) -> XN Hash (cost=1152.41..1152.41 rows=115241 width=9) -> XN Seq Scan on users (cost=0.00..1152.41 rows=115241 width=9)

This example shows a JOIN in action. First the users table gets scanned, hashing the relevant columns used to join and to aggregate. Then the logins are scanned, hashing the relevant columns once more. A prerequisite to join two sets of data together is that they are on the same node. First one table is transformed into a hash table living in memory, for then to probe that hash table with rows of the second table. Since the users table, nor the logins table were created using a distribution style, Redshift is forced to use the DS_BCAST_INNER operation to broadcast the whole inner table over the network to each node to complete the join operation. Even though Redshift tries its best to limit network traffic by broadcasting the smallest table possible, this still remains a super expensive operation.

When data needs to be joined together, you should aim to put it on the same slice. In this example, we could change the distribution style of both tables to distribute over the user_id or to distribute one table over all slices. In this case, keeping other queries in mind, it could make sense to distribute the users to all slices. This really depends on how often the table changes, how much rows the table contains and how wide the rows are.

Changing the users table distribution style to ALL, changes the query plan immediately. Distribution of data is no longer needed to perform the HASH JOIN operation.

XN HashAggregate (cost=2328288.07..2328317.07 rows=11600 width=22) -> XN Hash Join DS_DIST_ALL_NONE (cost=10.75..2326245.42 rows=272354 width=22) Hash Cond: ("outer".userid = "inner".id) -> XN Seq Scan on logins (cost=0.00..26630.50 rows=2663050 width=13) -> XN Hash (cost=8.60..8.60 rows=860 width=17) -> XN Seq Scan on users (cost=0.00..8.60 rows=860 width=17)

Looking at the query plan after you write a query is a good habit to get into. Seeing how the database works underneath the abstraction teaches you to think as one. Over time you grow a pretty good intuition of what the database will be doing for any given query. Once you’re confident you can reason about data flowing through the system without peeking at the plan, the query plan turns into a tool to verify your assumptions. With Redshift being a distributed database, I’m even more frantic about avoiding costly mistakes.

Statistics

Let’s say I ask you to write a small program that joins two tables. Table A lives on node A, while table B lives on node B. Although nodes can communicate with eachother, your code is only allowed to run on node A or node B. Your objective is to make the code run as fast as possible. Being new to database development, you start by researching the most efficient algorithms to perform a JOIN operation. Quickly you learn that whether you implement a NESTED LOOP, a HASH JOIN or a MERGE JOIN heavily depends on how the data is layed out. Is the data sorted or is it unsorted? Another prerequisite to performing a JOIN operation is that the data needs to reside on the same node. Surely, since your code can only be deployed to one node, you want it to be deployed to the node that stores the largest table, so that you can copy the smaller table over the network. As is, this is just guess work. What you need is statistical meta-data on the data, or statistics.

In Redshift, a component called the query planner makes these deicisions for you. The query planner makes use of statistics to compute an optimal plan to execute any given query. Once a plan has been generated, Redshift might decide to cache the result for any subsequent similar queries. This is why you shouldn’t evaluate query performance the first time you run it.

For the query planner to compute the most efficient plan, it needs statistics that are up-to-date. Redshift doesn’t maintain statistics autonomously, it expects the user to run the ANALYZE command periodically.

You can query the svv_table_info to see which tables have stale statistics.

When the stats_off value equals 0, stats are current, 100 is out of date.

If you still want to go a bit deeper, you can query the pg_statistic_indicator table to see how many rows were inserted, updated or deleted since the last time the ANALYZE command was executed.

The ANALYZE command can analyze the complete database, a single table or even a collection of columns inside a table.

Analyzing is an expensive operation and should be performed only when necessary. The smaller the scope of the operation, the better. You can either use the system tables to come up with your own balanced heuristics on when to re-analyze, or you can make use of the built-in analyze_threshold_percent variable. The value of this variable makes Redshift skip the ANALYZE operation if the percentage of rows that have changed since the last time the command ran is lower than the threshold. You can configure the threshold yourself.

When loading big batches of data using the COPY command, you can use the STATUPDATE overload to immediately recompute the statistics.

A Redshift cluster has finite resources, making workload and queue management a crucial component of the execution engine. After Redshift has computed a query plan, the plan gets queued for execution. The queue puts an upper bound parallelism, reducing contention of the cluster’s resources. Each queue has a reserved amount of memory assigned to it with a limited amount of slots available that can execute queries. Once a query is executed, the slot is released and the next query in line starts its execution.

In the default configuration, Redshift uses a single queue with only 5 slots. For a lot of workloads, the default configuration will do just fine for a good while. Especially when you show some mechanical sympathy by coordinating ETL jobs and caching of reads on top. But let’s say you don’t implement any lite infrastructure to support coordination and caching. Big loads are happening throughout the day, while you have a small army of concurrent users, hungry for fresh dashboard widgets and reports. It’s not too hard to imagine one of those smaller queries getting stuck in the queue waiting for a slot, behind a batch of slow imports.

This is where Workload Management or WLM in short comes into play. With WLM you can tweak how Redshift makes use of its resources when executing queries. WLM allows you to create custom queues, configure which queries are routed to them and tune them for specific types of workloads. Per queue you can set the concurrency (number of available slots) and reserve a percentage of the memory. The memory that’s assigned to a queue is assigned to each slot of the queue evenly.

In the use case of having big loads, small and big queries contending for resources, we could benefit from creating a queue that has 30% memory assigned to it, with only one slot available, reserved for copy operations. Another queue could be assigned 35% of the memory, with two slots available, used to generate those large reports. We only assign it two slots, because we know these queries are memory hungry and we don’t want these queries to spill onto disk, but do as much of their work in-memory as possible. The default queue can’t be removed and will be used for the remaining queries. Since these queries are fast, only querying a small subset of the data, we change the concurrency level to 12.

Routing of queries to queues can be configured in the AWS console per queue. The first option is to use user groups or query groups.

You can create a user group and assign it a list of database users.

Or you can set a query group, which is basically not much more than a label you set when executing a query.

There’s also the option to make use of a small rules engine that can use predicates such as Return row count, CPU usage etc to decide on what queue a query is routed to. Use the S3 console to configure this.

You can stitch the stv_wlm_* system tables together to get an overview of the configured queues, their properties and the queries they’re executing.

0(super use..14760000
1(query:any)124470031511

This results shows that there are two queues in the system. The Redshift super user queue with one slot and 476MB of memory with no queries in queue. The second one is the default queue that has been configured to have 12 slots with 447MB of memory per slot. No queries are waiting to be executed, while there are 3 executing.

Another query on the stv_wlm_query_state table reveals which queries have been assigned to which queue and what their state is.

7395901..running
7395621..running
7395921..running

Configuring your own queues can be a bit intimidating. A feature that can introduce a quick win - without requiring you to have a deep understanding of WLM and each query that runs on your cluster, is Short Query Acceleration. Enabling this flag in the console, Redshift creates a dedicated queue to run short queries, allowing queries to bypass slow queries or ETL jobs. It will then use machine learning to analyze your queries, predict their execution time and classify them accordingly. The longer SQA is enabled, the more it can learn, the better its predictions will be.

The Author

Jef Claes is a software engineer working and living in Belgium. In the past he worked in domains related to public safety and banking. These days he is employed by Green Island, building software for the online gambling industry.

Outside business hours, he’s an active member of Domain Driven Design Belgium, writes on his personal blog jefclaes.be and speaks at the occassional conference.

The best way to get in touch is through Twitter.

Acknowledgements

Thanks to Joe Harris, Redshift Database Engineer at AWS. He was kind enough to reach out on Twitter willing to help. He took the time to do a technical review and provide me with crucial improvements and corrections. I had never expected someone actually working on Redshift to help out in a big way like this.

Kristien, my partner, encouraged me to write something a bit longer for a change. She was happy to help when I asked her to review the text for any obvious spelling and grammar mistakes. What a second pair of eyes can do!

Shout out to Antonios Klimis, Stijn Volders and Ward Bellen for thoroughly reviewing the text and pointing out topics that were hard to understand or missing.

ncG1vNJzZmirY2KytnnWnqqtZWFjrq6t2ailmq%2BjY7CwuY6cm6dmmpqzpLjAnqpnmpVkrq6t2ailZqqVmcCptcWtZJ%2BtnpmurrHNrZilq1%2BWxLR50Z6brKCZm8FustSnm5qllaPBorjSZ5%2BtpZw%3D

 Share!