Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.
mujtabachohan edited this page Aug 26, 2013 · 18 revisions

Phoenix provides many different knobs and dials to configure and tune the system to run more optimally on your cluster. The configuration is done through a series of Phoenix-specific properties specified for the most part in your client-side hbase-site.xml file. In addition to these properties, there are of course all the HBase configuration properties with the most important ones documented here. This blog will focus on the Phoenix-specific properties and touch on some important considerations to maximize Phoenix and HBase performance.

The table below outlines the full set of Phoenix-specific configuration properties and their defaults. Of these, we'll talk in depth about some of the most important ones below.

Property Description Default
phoenix.query.timeoutMs Number of milliseconds after which a query will timeout on the client. Default is 10 min. 600000
phoenix.query.keepAliveMs When the number of threads is greater than the core in the client side thread pool executor, this is the maximum time in milliseconds that excess idle threads will wait for a new tasks before terminating. Default is 60 sec. 60000
phoenix.query.threadPoolSize Number of threads in client side thread pool executor. As the number of machines/cores in the cluster grows, this value should be increased. 128
phoenix.query.queueSize Max queue depth of the bounded round robin backing the client side thread pool executor, beyond which an attempt to queue additional work is rejected by throwing an exception. If zero, a SynchronousQueue is used instead of the bounded round robin queue. 500
phoenix.query.spoolThresholdBytes Threshold size in bytes after which results from parallelly executed aggregate query results are spooled to disk. Default is 20 mb. 20971520
phoenix.query.maxGlobalMemoryPercentage Percentage of total heap memory (i.e. Runtime.getRuntime().totalMemory()) that all threads may use. Only course grain memory usage is tracked, mainly accounting for memory usage in the intermediate map built during group by aggregation. When this limit is reached the clients block attempting to get more memory, essentially throttling memory usage. Defaults to 50% 50
phoenix.query.maxGlobalMemoryWaitMs Maximum amount of time that a client will block while waiting for more memory to become available. After this amount of time, an InsufficientMemoryException is thrown. Default is 10 sec. 10000
phoenix.query.maxTenantMemoryPercentage Maximum percentage of phoenix.query.maxGlobalMemoryPercentage that any one tenant is allowed to consume. After this percentage, an InsufficientMemoryException is thrown. Default is 100% 100
phoenix.query.targetConcurrency Target concurrent threads to use for a query. It serves as a soft limit on the number of scans into which a query may be split. The value should not exceed the hard limit imposed by phoenix.query.maxConcurrency. 32
phoenix.query.maxConcurrency Maximum concurrent threads to use for a query. It servers as a hard limit on the number of scans into which a query may be split. A soft limit is imposed by phoenix.query.targetConcurrency. 64
phoenix.query.dateFormat Default pattern to use for conversion of a date to/from a string, whether through the TO_CHAR(<date>) or TO_DATE(<date-string>) functions, or through resultSet.getString(<date-column>). yyyy-MM-dd HH:mm:ss
phoenix.query.statsUpdateFrequency The frequency in milliseconds at which the stats for each table will be updated. Default is 15 min. 900000
phoenix.query.maxStatsAge The maximum age of stats in milliseconds after which they will no longer be used (i.e. the stats were not able to be updated in this amount of time and thus are considered too old). Default is 1 day. 1
phoenix.mutate.maxSize The maximum number of rows that may be batched on the client before a commit or rollback must be called. 500000
phoenix.mutate.batchSize The number of rows that are batched together and automatically committed during the execution of an UPSERT SELECT or DELETE statement. This property may be overridden at connection time by specifying the UpsertBatchSize property value. Note that the connection property value does not affect the batch size used by the coprocessor when these statements are executed completely on the server side. 15000
phoenix.query.regionBoundaryCacheTTL The time-to-live in milliseconds of the region boundary cache used to guide the split points for query parallelization. Default is 60 sec. 60000
phoenix.query.maxIntraRegionParallelization The maximum number of threads that will be spawned to process data within a single region during query execution 64
phoenix.query.rowKeyOrderSaltedTable Whether or not a non aggregate query returns rows in row key order for salted tables. If this option is turned on, split points may not be specified at table create time, but instead the default splits on each salt bucket must be used. Default is false false

Parallelization

Phoenix breaks up aggregate queries into multiple scans and runs them in parallel through custom aggregating coprocessors to improve performance. Hari Kumar, from Ericsson Labs, did a good job of explaining the performance benefits of parallelization and coprocessors here. One of the most important factors in getting good query performance with Phoenix is to ensure that table splits are well balanced. This includes having regions of equal size as well as an even distribution across region servers. There are open source tools such as Hannibal that can help you monitor this. By having an even distribution of data, every thread spawned by the Phoenix client will have an equal amount of work to process, thus reducing the time it takes to get the results back.

The phoenix.query.targetConcurrency and phoenix.query.maxConcurrency control how a query is broken up into multiple scans on the client side. The idea for parallelization of queries is to align the scan boundaries with region boundaries. If rows are not evenly distributed across regions, using this scheme compensates for regions that have more rows than others, by applying tighter splits and therefore spawning off more scans over the overloaded regions.

The split points for parallelization are computed as follows. Let's suppose:
  • t is the target concurrency
  • m is the max concurrency
  • r is the number of regions we need to scan
if r >= t
   scan using regional boundaries
else if r/2 > t
   split each region in s splits such that: s = max(x) where s * x < m
else
   split each region in s splits such that:  s = max(x) where s * x < t

Depending on the number of cores in your client machine and the size of your cluster, the phoenix.query.threadPoolSize, phoenix.query.queueSize, phoenix.query.maxConcurrency, and phoenix.query.targetConcurrency may all be increased to allow more threads to process a query in parallel. This will allow Phoenix to divide up a query into more scans that may then be executed in parallel, thus reducing latency.

This approach is not without its limitations. The primary issue is that Phoenix does not have sufficient information to divide up a region into equal data sizes. If the query results span many regions of data, this is not a problem, since regions are more or less of equal size. However, if a query accesses only a few regions, this can be an issue. The best Phoenix can do is to divide up the key space between the start and end key evenly. If there's any skew in the data, then some scans are bound to bear the brunt of the work. You can adjust phoenix.query.maxIntraRegionParallelization to a smaller number to decrease the number of threads spawned per region if you find that throughput is suffering.

For example, let's say a row key is comprised of a five digit zip code in California, declared as a CHAR(5). Phoenix only knows that the column has 5 characters. In theory, the byte array could vary from five 0x01 bytes to five 0xff bytes (or what ever is the largest valid UTF-8 encoded single byte character). While in actuality, the range is from 90001 to 96162. Since Phoenix doesn't know this, it'll divide up the region based on the theoretical range and all of the work will end up being done by the single thread that has the range encompassing the actual data. The same thing will occur with a DATE column, since the theoretical range is from 1970 to 2038, while in actuality the date is probably +/- a year from the current date. Even if Phoenix uses better defaults for the start and end range rather than the theoretical min and max, it would not usually help - there's just too much variability across domains.

One solution to this problem is to maintain statistics for a table to feed into the parallelization process to ensure an even data distribution. This is the solution we're working on, as described in more detail in this issue.

Batching

An important HBase configuration property hbase.client.scanner.caching controls scanner caching, that is how many rows are returned from the server in a single round trip when a scan is performed. Although this is less important for aggregate queries, since the Phoenix coprocessors are performing the aggregation instead of returning all the data back to the client, it is important for non aggregate queries. If unset, Phoenix defaults this property to 1000.

On the DML side of the fence, performance may improve by turning the connection auto commit to on for multi-row mutations such as those that can occur with DELETE and UPSERT SELECT. In this case, if possible, the mutation will be performed completely on the server side without returning data back to the client. However, when performing single row mutations, such as UPSERT VALUES, the opposite is true: auto commit should be off and a reasonable number of rows should be batched together for a single commit to reduce RPC traffic.

Measuring Performance

One way to get a feeling for how to configure these properties is to use the performance.sh shell script provided in the bin directory of the installation tar.

Usage: performance.sh <zookeeper> <row count>
Example: performance.sh localhost 1000000

This will create a new table named performance_1000000 and upsert 1000000 rows. The schema and data generated is similar to examples/web_stat.sql and examples/web_stat.csv. On the console it will measure the time it takes to:
  • upsert these rows
  • run queries that perform COUNT, GROUP BY, and WHERE clause filters
For convenience, an hbase-site.xml file is included in the bin directory and pre-configured to already be on the classpath during script execution.

Here is a screenshot of the performance.sh script in action:

 Conclusion

Phoenix has many knobs and dials to tailor the system to your use case. From controlling the level of parallelization, to the size of batches, to the consumption of resource, there's a knob for that.  These controls are not without there limitations, however. There's still more work to be done and we'd love to hear your ideas on what you'd like to see made more configurable.

Clone this wiki locally