Skip to content

External Sort Operator

Paul Rogers edited this page Nov 2, 2016 · 12 revisions

External Sort Operator

The external operator works like other Drill operators in it's use of an incoming batch, the next( ) call hierarchy, and that the operator is a record batch for its "output" (downstream, up-the-call-stack) operator. The external sort operator is interesting for two key reasons:

  1. It buffers all the incoming rows so that they can be sorted together, and
  2. It optionally spills data to disk due to memory pressure.

General Flow

Like all operators, the external sort (xsort) operator does the following:

  • Holds an incoming operator (incoming).
  • Calls incoming.next( ) to obtain a record batch.
  • Supports an incoming batch with an associated selection vector. (?)

States

Like all operators, the xsort operator maintains a set of internal states, and performs state transitions based on responses from the incoming batch.

Incoming events are defined by the RecordBatch.IterOutcome enum:

  • NONE: Normal completion of batch.
  • OK: Zero or more records with schema same as the previous batch. Sort the individual batch and ...
  • OK_NEW_SCHEMA: Zero or more records with a schema different than the previous batch. Handles "trivial" schema changes (see below), then handles records as for OK.
  • STOP: Abnormal termination. The xsort operator simply forwards this event downstream.
  • NOT_YET: No data. Not supported by the xsort operator. (Throws an UnsupportedOperationException exception.)
  • OUT_OF_MEMORY: Non-fatal OOM. The xsort operator attempts to recover by spilling to disk.
  • Exception: Not a enum event, but all methods can throw an exception.

Sort Implementation

Sorting is done via an indirection vector. That is, the data in the vectors does not move during the sort. Instead, the xsort operator uses a int-based selection vector (SelectionVector4 sv4) to hold indexes to the data. During sort, the indexes are reordered, not the vector data.

Batches are sorted individually using a SingleBatchSorter instance (sorter). The innerNext( ) method sorts each batch using a SelectionVector2 in response to the OK and OK_NEW_SCHEMA events:

sorter.setup(context, sv2, convertedBatch);
sorter.sort(sv2);

Batches are then buffered. (LinkedList<BatchGroup> batchGroups):

batchGroups.add(new BatchGroup(rbd.getContainer(), rbd.getSv2(), oContext));

Next, xsort considers a number of conditions to determine if it should spill to disk:

  • Based on the expected memory needs of MSorter.
  • Based on the total number of batches (must be less than the maximum value of a short.)
  • If the memory used exceeds 95% of the allocated memory budget.
  • If enough new batches have accumulated (more than SPILL_BATCH_GROUP_SIZE batches.)

If spilling is needed, then:

  • The set of batches to be spilled are merged into a BatchGroup

  • The merged BatchGroup is spilled.

    final BatchGroup merged = mergeAndSpill(spilledBatchGroups);

Schema Change Handling

The xsort operator does not support schema changes (though some parts of the code seem to be written in anticipation of such support.) The operator ignores "trivial" schema changes (where the new schema is the same as the previous.) However, if the schema really does change, the operator throws a SchemaChangeException.

PriorityQueueCopier

Spilling

The operator spills to disk (when)? batch group size? threshold?

Memory Management

The life of a fragment is roughly:

  • Planner creates the plan
  • Planner creates a PhysicalOperator (operator definition) using PhysicalPlanCreator.
  • PhysicalPlanCreator sets a variety of properties. For our purposes, the key properties are the query cost (set by setCost() and the memory limits. (The interesting bits is that the PhysicalPlanCreator does not actually set the limits... See below.)
  • The PhysicalOperator is sent across the wire to each Drillbit.
  • FragmentExecutor on each Drillbit calls ImplCreator to create the FragmentRoot instance for the fragment.
  • Each operator has an OperatorContext (implementation: OperatorContextImpl) that creates the allocator for the fragment.
  • The OperatorContext uses the PhysicalOperator which has the initial and maximum allocations for the operator:

The code:

this.allocator = context.getNewChildAllocator(popConfig.getClass().getSimpleName(),
    popConfig.getOperatorId(), popConfig.getInitialAllocation(), popConfig.getMaxAllocation());

The interesting bit here is that the planner does not actually set the memory limits. Instead, they seam to always be set to the defaults defined in AbstractBase of 1MB, maximum of 10 GB:

public abstract class AbstractBase implements PhysicalOperator{
  protected long initialAllocation = 1_000_000L;
  protected long maxAllocation = 10_000_000_000L;

A quick search did not reveal how the default values might be customized per fragment or per operator... Thus, it appears that each operator has a memory allocation of (1MB, 10GB). It is not (yet) clear how a leaf operator's allocation "bubbles up" to increase the parent allocator, and how the sum of allocations bubbles up to the root fragment allocator.

Spill Conditions

The xsort operator spills when one of two memory conditions are hit:

  • The buffered data exceeds 95% of the memory reserved for buffering, or
  • An upstream operator reports that it ran out of memory.

Memory is managed using two allocators:

BufferAllocator oAllocator;      // The allocator for the xsort operator itself
BufferAllocator copierAllocator; // The allocator for the record copier

The xsort allocator is set from the context during fragment creation. The process is generic to all operators:

The copier allocator is a child of the operator allocator, meaning that the two share the same reservation. The copier is given a fixed allocation of 10 MB to 20 MB (decimal) bytes:

public static final long INITIAL_ALLOCATION = 10_000_000;
public static final long MAX_ALLOCATION = 20_000_000;

copierAllocator = oAllocator.newChildAllocator(oAllocator.getName() + ":copier",
    PriorityQueueCopier.INITIAL_ALLOCATION, PriorityQueueCopier.MAX_ALLOCATION);

Disk Management

Disk management is integrated into the xsort operator class itself. The algorithm is:

  • Use the HDFS FileSystem class, referenced by fs to manage spill files. Allows files to be written into the DFS file system in addition to local disk. This is done when cluster nodes are configured with limited local storage, and so large spill files are written to DFS instead.
  • A list of spill directories (dirs) to which spill files are written (in round robin manner?)

Individual spill files have the following name:

<query-id>_majorfragment<major-fragment-id>_minorfragment<minor-fragment-id>_operator<operator-id>/<spill-id>

Where the items in <...> are parameters for the particular operator.

The first part of the name is a directory created on each temp directory in turn, marked as delete on exit.

Merging seems to be done by a code-generated implementation of PriorityQueueCopier. Spilling is done by BatchGroup. Writing is done in several steps. Given a collection of batches:

  • Combines the linked list of batches into an array list (with batches in inverse order.)
  • Compute the maximum number of records as COPIER_BATCH_MEM_LIMIT / estimatedRecordSize. (That is, estimate row width, then use that to compute the maximum records that can be written.)
  • Loop to write records in batches of the computed record limit.
  • Use a PriorityQueueCopier to merge batches to produce an outgoing batch of the desired size.
  • Add the resulting (merged) batch to the BatchGroup.
  • Gather a list of buffers by looping over all value vectors in the batch, and each buffer for the vectors.
  • Write the buffers to disk.

Inputs

Configuration Properties

  • drill.exec.sort.external.spill.group.size (member SPILL_BATCH_GROUP_SIZE)
  • drill.exec.sort.external.spill.threshold (member EXTERNAL_SORT_SPILL_THRESHOLD)
  • drill.exec.sort.external.spill.directories (member dirs)

Open Questions

  • MappingSet
  • Dir algorithm
Clone this wiki locally