This repository has been archived by the owner on Feb 12, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 227
Tuning
jtaylor-sfdc edited this page Feb 23, 2013
·
18 revisions
Phoenix provides many different knobs and dials to configure and tune the system to run more optimally on your cluster. The configuration is done through a series of Phoenix-specific properties specified for the most part in your client-side hbase-site.xml
file. In addition to these properties, there are of course all the HBase configuration properties with the most important ones documented here. This blog will focus on the Phoenix-specific properties and touch on some important considerations to maximize Phoenix and HBase performance.
The table below outlines the full set of Phoenix-specific configuration properties and their defaults. Of these, we'll talk in depth about some of the most important ones below.
Property | Description | Default |
phoenix.query.timeoutMs | Number of milliseconds after which a query will timeout on the client. Default is 60 sec. | 60000 |
phoenix.query.keepAliveMs | When the number of threads is greater than the core in the client side thread pool executor, this is the maximum time in milliseconds that excess idle threads will wait for a new tasks before terminating. Default is 1 sec. | 1000 |
phoenix.query.threadPoolSize | Number of threads in client side thread pool executor. As the number of machines/cores in the cluster grows, this value should be increased. | 20 |
phoenix.query.queueSize | Max queue depth of the bounded round robin backing the client side thread pool executor, beyond which attempts to queue additional work cause the client to block. If zero, a SynchronousQueue is used instead of the bounded round robin queue. | 250 |
phoenix.query.spoolThresholdBytes | Threshold size in bytes after which results from parallelly executed aggregate query results are spooled to disk. Default is 50 mb. | 52428800 |
phoenix.query.maxGlobalMemoryPercentage | Percentage of total heap memory (i.e. Runtime.getRuntime().totalMemory()) that all threads may use. Only course grain memory usage is tracked, mainly accounting for memory usage in the intermediate map built during group by aggregation. When this limit is reached the clients block attempting to get more memory, essentially throttling memory usage. Defaults to 20% | 20 |
phoenix.query.maxGlobalMemoryWaitMs | Maximum
amount of time that a client will block while waiting for more memory
to become available. After this amount of time, an
InsufficientMemoryException is
thrown. Default is 5 sec. |
5000 |
phoenix.query.maxTenantMemoryPercentage | Maximum
percentage of phoenix.query.maxGlobalMemoryPercentage that
any one tenant is allowed to consume. After this percentage, an
InsufficientMemoryException is
thrown. Default is 100% |
100 |
phoenix.query.targetConcurrency | Target concurrent
threads to use for a query. It serves as a soft limit on the number of
scans into which a query may be split. The value should not exceed the hard limit imposed by phoenix.query.maxConcurrency . |
8 |
phoenix.query.maxConcurrency | Maximum concurrent
threads to use for a query. It servers as a hard limit on the number
of scans into which a query may be split. A soft limit is imposed by
phoenix.query.targetConcurrency . |
12 |
phoenix.query.dateFormat | Default pattern to use
for conversion of a date to/from a string, whether through the
TO_CHAR(<date>) or
TO_DATE(<date-string>) functions, or through
resultSet.getString(<date-column>) . |
yyyy-MM-dd HH:mm:ss |
phoenix.query.statsUpdateFrequency | The frequency in milliseconds at which the stats for each table will be updated. Default is 15 min. | 900000 |
phoenix.query.maxStatsAge | The maximum age of stats in milliseconds after which they will no longer be used (i.e. the stats were not able to be updated in this amount of time and thus are considered too old). Default is 1 day. | 1 |
phoenix.mutate.maxSize | The maximum number of rows that may be batched on the client before a commit or rollback must be called. | 500000 |
phoenix.mutate.upsertBatchSize | The number of rows that are batched together and automatically committed during the execution of an
UPSERT SELECT or DELETE statement. This property may be
overridden at connection
time by specifying the UpsertBatchSize
property value. Note that overriding in this manner does not affect the batch size used by the coprocessor when these statements are executed completely on the server side. Default is 10,000 rows |
10000 |
phoenix.query.regionBoundaryCacheTTL | The time-to-live in milliseconds of the region boundary cache used to guide the split points for query parallelization. Default is 60 sec. | 60000 |
Phoenix breaks up aggregate queries into multiple scans and runs them in parallel through custom aggregating coprocessors to improve performance. Hari Kumar, from Ericsson Labs, did a good job of explaining the performance benefits of parallelization and coprocessors here. One of the most important factors in getting good query performance with Phoenix is to ensure that table splits are well balanced. This includes having regions of equal size as well as an even distribution across region servers. There are open source tools such as Hannibal that can help you monitor this. By having an even distribution of data, every thread spawned by the Phoenix client will have an equal amount of work to process, thus reducing the time it takes to get the results back.
The
phoenix.query.targetConcurrency
and phoenix.query.maxConcurrency
control how a query is broken up into multiple scans on the client side. The idea for parallelization of queries is to align the scan boundaries with region boundaries. If rows are not evenly distributed across regions, using this scheme compensates for regions that have more rows than others, by applying tighter splits and therefore spawning off more scans over the overloaded regions.The split points for parallelization are computed as follows. Let's suppose:
-
t
is the target concurrency -
m
is the max concurrency -
r
is the number of regions we need to scan
if r >= t
scan using regional boundaries
else if r/2 > t
split each region in s splits such that:
s = max(x) where s * x < m
else
split each region in s splits such that:
s = max(x) where s * x < t
Depending on the number of cores in your client machine and the size of your cluster, the
phoenix.query.threadPoolSize
, phoenix.query.queueSize
, phoenix.query.maxConcurrency
, and phoenix.query.targetConcurrency
may all be increased to allow more threads to process a query in parallel. This will allow Phoenix to divide up a query into more scans that may then be executed in parallel, thus reducing latency.This approach is not without its limitations. The primary issue is that Phoenix does not have sufficient information to divide up a region into equal data sizes. If the query results span many regions of data, this is not a problem, since regions are more or less of equal size. However, if a query accesses only a few regions, this can be an issue. The best Phoenix can do is to divide up the key space between the start and end key evenly. If there's any skew in the data, then some scans are bound to bear the brunt of the work.
For example, let's say a row key is comprised of a five digit zip code in California, declared as a CHAR(5). Phoenix only knows that the column has 5 characters. In theory, the byte array could vary from five 0x01 bytes to five 0xff bytes (or what ever is the largest valid UTF-8 encoded single byte character). While in actuality, the range is from 90001 to 96162. Since Phoenix doesn't know this, it'll divide up the region based on the theoretical range and all of the work will end up being done by the single thread that has the range encompassing the actual data. The same thing will occur with a DATE column, since the theoretical range is from 1970 to 2038, while in actuality the date is probably +/- a year from the current date. Even if Phoenix uses better defaults for the start and end range rather than the theoretical min and max, it would not usually help - there's just too much variability across domains.
One solution to this problem is to maintain statistics for a table to feed into the parallelization process to ensure an even data distribution. This is the solution we're working on, as described in more detail in this issue.
An important HBase configuration property
hbase.client.scanner.caching
controls scanner caching, that is how many rows are returned from the server in a single round trip when a scan is performed. Although this is less important for aggregate queries, since the Phoenix coprocessors are performing the aggregation instead of returning all the data back to the client, it is important for non aggregate queries. If unset, Phoenix defaults this property to 1000.On the DML side of the fence, performance may improve by turning the connection auto commit to on for multi-row mutations such as those that can occur with
DELETE
and UPSERT SELECT
. In this case, if possible, the mutation will be performed completely on the server side without returning data back to the client. However, when performing single row mutations, such as UPSERT VALUES
, the opposite is true: auto commit should be off and a reasonable number of rows should be batched together for a single commit to reduce RPC traffic.One way to get a feeling for how to configure these properties is to use the performance.sh shell script provided in the bin directory of the installation tar.
Usage:
performance.sh <zookeeper> <row count>
Example:
performance.sh localhost 1000000
This will create a new table named
performance_1000000
and upsert 1000000 rows. The schema and data generated is similar to examples/web_stat.sql
and examples/web_stat.csv
. On the console it will measure the time it takes to:- upsert these rows
- run queries that perform
COUNT
,GROUP BY
, andWHERE
clause filters
hbase-site.xml
file is included in the bin directory and pre-configured to already be on the classpath during script execution.Here is a screenshot of the performance.sh script in action:
Phoenix has many knobs and dials to tailor the system to your use case. From controlling the level of parallelization, to the size of batches, to the consumption of resource, there's a knob for that. These controls are not without there limitations, however. There's still more work to be done and we'd love to hear your ideas on what you'd like to see made more configurable.