The Apache Rya rya.pcj.fluo.app
project contains an Apache Fluo Incremental
Join Maintenance Application (PCJ Updater).
The Rya Shell Interface provides a command line utility for the registration of new persisted queries within the Rya-Fluo incremental join maintenance application. This section provides instructions on setting up the maintenance application on a distributed Apache Hadoop YARN execution environment with Apache Accumulo.
There are a number of steps required to ensure that both Fluo and the Rya PCJ Updater Application are configured correctly for the target execution environment.
To install the rya.pcj.fluo.app
, it is necessary to download the Apache Fluo
1.0.0-incubating release.
wget https://www.apache.org/dist/incubator/fluo/fluo/1.0.0-incubating/fluo-1.0.0-incubating-bin.tar.gz
tar xzvf fluo-1.0.0-incubating-bin.tar.gz
Below is an abridged version of instructions for configuring Fluo to work with Rya. For complete installation instructions, see the Apache Fluo 1.0.0-incubating Documentation.
cd fluo-1.0.0-incubating
# copy the example properties to the conf directory
cp conf/examples/* conf/
# edit the base fluo properties file which is used for new applications
vi conf/fluo.properties
The following properties in the conf/fluo.properties
file should be
uncommented and populated with appropriate values for your
Accumulo/Hadoop (YARN)/Zookeeper execution environment:
fluo.client.zookeeper.connect=${fluo.client.accumulo.zookeepers}/fluo
fluo.client.accumulo.instance=<accumulo instance name>
fluo.client.accumulo.user=<accumulo user name>
fluo.client.accumulo.password=<accumulo user password>
fluo.client.accumulo.zookeepers=<your zookeeper connect string>
fluo.admin.hdfs.root=hdfs://<your hdfs host name>:8020
Fluo defers realization of dependencies until as late as possible. You can
either download dependencies from the internet, or install on a system that
already has the dependencies installed on it. Regardless of approach taken,
the fluo-1.0.0-incubating/conf/fluo-env.sh
file will need to be tailored to
your execution environment. See the
Apache Fluo 1.0.0-incubating Install Instructions for more information.
The following instructions go through the steps of downloading dependencies from
the internet. Note, you will still need a system with the correct version of
hadoop installed on it as bin/fluo
requires the $HADOOP_PREFIX/bin/hdfs
command to be available.
# If using a vendor's distribution of hadoop, edit the lib/ahz/pom.xml to specify the vendor's maven repo.
vi lib/ahz/pom.xml
<repositories>
<repository>
<id>vendor</id>
<url>https://repository.vendor.com/content/repositories/releases/</url>
</repository>
</repositories>
./lib/fetch.sh ahz -Daccumulo.version=1.7.3 -Dhadoop.version=2.6.0-vendor5.8.5 -Dzookeeper.version=3.4.5-vendor5.8.5
# Otherwise fetch the desired the apache release versions for accumulo, hadoop and zookeeper
./lib/fetch.sh ahz -Daccumulo.version=1.7.3 -Dhadoop.version=2.6.5 -Dzookeeper.version=3.4.6
# Then fetch the remaining Fluo dependencies
./lib/fetch.sh extra
Next it is necessary to update the fluo-1.0.0-incubating/conf/fluo-env.sh
file
to use the locally downloaded libraries.
vi conf/fluo-env.sh
The listing below highlights a few modifications that may need to be made to the
fluo-env.sh
to adapt it to your system:
- Define a value for the environmental variable
HADOOP_PREFIX
if it is not already set. The correct value depends on your system configuration and could be/usr
,/usr/lib/hadoop
, or perhaps another path. - Depending on the value used for
HADOOP_PREFIX
, which may or may not include a directory for$HADOOP_PREFIX/etc/hadoop
, it may be necessary to modify the shell variableCLASSPATH
to include the hadoop configuration directory. In the following listing, we append the directory/etc/hadoop/conf
to theCLASSPATH
. - Uncomment the
setupClasspathFromLib
function and comment thesetupClasspathFromSystem
.
# Sets HADOOP_PREFIX if it is not already set. Please modify the
# export statement to use the correct directory. Remove the test
# statement to override any previously set environment.
#test -z "$HADOOP_PREFIX" && export HADOOP_PREFIX=/path/to/hadoop
test -z "$HADOOP_PREFIX" && export HADOOP_PREFIX=/usr
#
# ...
#
# This function obtains Accumulo, Hadoop, and Zookeeper jars from
# $FLUO_HOME/lib/ahz/. Before using this function, make sure you run
# `./lib/fetch.sh ahz` to download dependencies to this directory.
setupClasspathFromLib(){
#CLASSPATH="$FLUO_HOME/lib/*:$FLUO_HOME/lib/logback/*:$FLUO_HOME/lib/ahz/*"
CLASSPATH="$FLUO_HOME/lib/*:$FLUO_HOME/lib/logback/*:$FLUO_HOME/lib/ahz/*:/etc/hadoop/conf"
}
# Call one of the following functions to setup the classpath or write your own
# bash code to setup the classpath for Fluo. You must also run the command
# `./lib/fetch.sh extra` to download extra Fluo dependencies before using Fluo.
#setupClasspathFromSystem
setupClasspathFromLib
As discussed above, Fluo requires some hadoop configuration files to be
accessible, either in the $HADOOP_PREFIX/etc/hadoop
directory, or on the
classpath. The requirements for these configuration files are system specific,
and it is recommended that they be copied from the target system. However, if
configuring manually, the required files core-site.xml
and
yarn-site.xml
should have at a minimum the following properties configured.
In the file core-site.xml
:
<property>
<name>fs.defaultFS</name>
<value>hdfs://[your hdfs host name]:8020</value>
</property>
In the file yarn-site.xml
:
<property>
<name>yarn.resourcemanager.hostname</name>
<value>[your yarn resourcemanager hostname]</value>
</property>
Now that Fluo has been configured to work with your target Accumulo/Hadoop/Zookeeper execution environment, it is time to specify a Fluo App definition for the Rya Incremental Join Maintenance Application (PCJ Updater).
Note, in this documentation we will refer to this Fluo App with the fluoApplicationId
rya_pcj_updater
, but the current convention is for the fluoApplicationId to be
a completion of a rya instance name. For example, if the Rya instance is
my_rya_instance_
then the recommended corresponding fluoApplicationID would be my_rya_instance_pcj_updater
.
The bin/fluo new <fluoApplicationId>
command uses the base
fluo-1.0.0-incubating/conf/fluo.properties
file that was configured earlier in
this guide as a template for this Fluo Application.
# Create the new Fluo Application
bin/fluo new rya_pcj_updater
# Edit the Fluo Application Configuration
vi apps/rya_pcj_updater/conf/fluo.properties
Add the following entries under Observer properties in the
apps/rya_pcj_updater/conf/fluo.properties
file.
# Observer properties
# -------------------
# Specifies observers
# fluo.observer.0=com.foo.Observer1
# Can optionally have configuration key values
# fluo.observer.1=com.foo.Observer2,configKey1=configVal1,configKey2=configVal2
fluo.observer.0=org.apache.rya.indexing.pcj.fluo.app.batch.BatchObserver
fluo.observer.1=org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver
fluo.observer.2=org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver
fluo.observer.3=org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver
fluo.observer.4=org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver
fluo.observer.5=org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver
fluo.observer.6=org.apache.rya.indexing.pcj.fluo.app.observers.PeriodicQueryObserver
fluo.observer.7=org.apache.rya.indexing.pcj.fluo.app.observers.ProjectionObserver
#fluo.observer.8=org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver
fluo.observer.8=org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver,pcj.fluo.export.rya.enabled=true,pcj.fluo.export.rya.ryaInstanceName=rya_,pcj.fluo.export.rya.fluo.application.name=rya_pcj_updater,pcj.fluo.export.rya.accumuloInstanceName=myAccumuloInstance,pcj.fluo.export.rya.zookeeperServers=zoo1;zoo2;zoo3;zoo4;zoo5,pcj.fluo.export.rya.exporterUsername=myUserName,pcj.fluo.export.rya.exporterPassword=myPassword,pcj.fluo.export.rya.bindingset.enabled=true,pcj.fluo.export.periodic.bindingset.enabled=true,pcj.fluo.export.kafka.subgraph.enabled=true,pcj.fluo.export.kafka.bindingset.enabled=true,bootstrap.servers=kafka1:9092
Description of configuration keys for the
org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver
:
Key | Description |
---|---|
pcj.fluo.export.rya.enabled | If true, pcj.fluo.export.rya.* prefixed properties will be used for exporting query results to Rya. If false, they are ignored and can be omitted. |
pcj.fluo.export.rya.ryaInstanceName | The Rya Instance (ie, my_rya_instance_ ) this PCJ Updater app should be exporting to. |
pcj.fluo.export.rya.accumuloInstanceName | The Accumulo instance that is hosting the specified Rya Instance. |
pcj.fluo.export.rya.zookeeperServers | The Zookeeper connect string for the Zookeepers that are used by the Accumulo instance that is hosting the specified Rya Instance. Note, the host:port values are separated by semi-colons instead of the traditional commas. |
pcj.fluo.export.rya.exporterUsername | The Accumulo username to be used for the Rya Export operation. |
pcj.fluo.export.rya.exporterPassword | The Accumulo password to be used for the Rya Export operation. |
pcj.fluo.export.kafka.enabled | If true, the bootstrap.servers , key.serializer , and value.serializer properties will be used for exporting query results to Kafka. If false, they are ignored and can be omitted. |
bootstrap.servers | A hostname:port string specifying a kafka broker. Note, multiple bootstrap servers are not currently supported. |
key.serializer | The Kafka serializer class that should be used for keys published to the query result topic. Default value: org.apache.kafka.common.serialization.ByteArraySerializer . |
value.serializer | The Kafka serializer class that should be used for values published to the query result topic. Default value: org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer . |
Depending on the workload, it may be necessary to increase the resources of a
Fluo worker's YARN container, or to distribute the Observers defined in the
listing above into multiple Fluo workers that are located in multiple YARN
containers to scale performance. The following table contains descriptions of
relevant properties in the YARN properties
section of the fluo.properties
file that can be tailored.
Key | Description |
---|---|
fluo.yarn.worker.instances | Defines the number of YARN containers used for executing Observers. Allows for scaling out. |
fluo.yarn.worker.max.memory.mb | Defines the amount of memory in Megabytes that should be allocated to a worker's YARN container. Allows for scaling up. |
fluo.yarn.worker.num.cores | Defines the number of CPUs that should be allocated to a worker's YARN container. Allows for scaling up. |
The RYA PCJ Updater Fluo App jar is in a special uber jar that contains a subset of dependencies.
This jar is represented by the maven coordinate
org.apache.rya:rya.pcj.fluo.app:<version>:fluo-app
and when Rya is
built from source, it can be found here:
rya/extras/rya.pcj.fluo/pcj.fluo.app/target/rya.pcj.fluo.app-<version>-fluo-app.jar
.
The Rya fluo-app jar needs to be copied to Fluo here:
fluo-1.0.0-incubating/apps/rya_pcj_updater/lib/rya.pcj.fluo.app-<version>-fluo-app.jar
The initialization step creates entries in the Zookeeper cluster for this Fluo application
This step also copies the Fluo jars over to HDFS so Accumulo tablet servers can access custom Fluo iterators.
bin/fluo init rya_pcj_updater
The Rya Shell Interface provides an interface to create Rya instances. See this documentation for more information on the shell.
To create and connect to a Rya instance that is configured to use a PCJ Updater, use the following commands in the rya shell:
$ rya
_____ _____ _ _ _
| __ \ / ____| | | | |
| |__) | _ __ _ | (___ | |__ ___| | |
| _ / | | |/ _` | \___ \| '_ \ / _ \ | |
| | \ \ |_| | (_| | ____) | | | | __/ | |
|_| \_\__, |\__,_| |_____/|_| |_|\___|_|_|
__/ |
|___/
<version>
Welcome to the Rya Shell.
Execute one of the connect commands to start interacting with an instance of Rya.
You may press tab at any time to see which of the commands are available.
rya>
rya> connect-accumulo --username myUserName --instanceName myAccumuloInstance --zookeepers zoo1,zoo2,zoo3
Password: *********
Connected. You must select a Rya instance to interact with next.
rya/myAccumuloInstance> install-with-parameters --instanceName rya_ --enablePcjIndex --fluoPcjAppName rya_pcj_updater
A Rya instance will be installed using the following values:
Instance Name: rya_
Use Shard Balancing: false
Use Entity Centric Indexing: false
Use Free Text Indexing: false
Use Geospatial Indexing: false
Use Temporal Indexing: false
Use Precomputed Join Indexing: true
PCJ Updater Fluo Application Name: rya_pcj_updater
Continue with the install? (y/n) y
The Rya instance named 'rya_' has been installed.
rya/myAccumuloInstance> connect-rya --instance rya_
rya/myAccumuloInstance:rya_>
Now that the Rya instance has been created, to start the app, issue the following command to start the Rya PCJ Updater on YARN:
bin/fluo start rya_pcj_updater
Once the PCJ Updater app has been started, it is now possible to register and
unregister SPARQL Queries with it using the create-pcj
and delete-pcj
Rya
shell commands. It is possible to see details on registered PCJ Queries using
the print-instance-details
Rya shell command. See the
Rya Shell Interface documentation for more information on this step.
To stop the Rya PCJ Updater on YARN, issue the following command:
bin/fluo stop rya_pcj_updater
Fluo employs a scan backoff that dynamically adjusts the scan interval between a minimum and maximum delay to reduce the amount of scanning overhead if the database becomes idle with no modifications. This reduced overhead comes with a cost of increased latency for an initial notification on an idle database.
There are two internal fluo properties (fluo.implScanTask.minSleep
and
fluo.implScanTask.maxSleep
, both in milliseconds) that can be modified to
tailor the scanning overhead and maximum initial notification latency for your
use case.
For the scenario where a database is tends to be active and frequently modified,
scan latency will largely be influenced by the property
fluo.implScanTask.minSleep
which has a default value of 5 seconds.
For the scenario where a database is tends to be idle and infrequently modified,
scan latency will largely be influenced by the property
fluo.implScanTask.maxSleep
which has a default value of 5 minutes.
To configure these settings, modify your Fluo Application's
fluo-1.0.0-incubating/apps/rya_pcj_updater/conf/fluo.properties
file to
contain the the following section and tailor the values for your use case:
# Fluo Internal Implementation Properties (Not part of public API)
------------------------------------------------------------------
# fluo.implScanTask.minSleep default value is 5000ms (5 seconds)
fluo.implScanTask.minSleep = 5000
# fluo.implScanTask.maxSleep default value is 300000ms (5 minutes)
fluo.implScanTask.maxSleep = 300000
Accumulo may generate warnings that the Apache Commons VFS classloader cannot find Fluo jars on HDFS, or that Accumulo is unable to find Fluo iterators. There are typically two reasons why this occurs: HDFS Accessibility or the Accumulo VFS Cache Dir.
The Fluo Jars fluo-api-1.0.0-incubating.jar
and
fluo-accumulo-1.0.0-incubating.jar
are not copied to HDFS or they have been
copied with permissions that make then inaccessible by the Accumulo Tablet
servers. Verify the property fluo.admin.accumulo.classpath
in
fluo-1.0.0-incubating/apps/rya_pcj_updater/conf/fluo.properties
is correct.
The default value is typically adequate:
fluo.admin.accumulo.classpath=${fluo.admin.hdfs.root}/fluo/lib/fluo-api-1.0.0-incubating.jar,${fluo.admin.hdfs.root}/fluo/lib/fluo-accumulo-1.0.0-incubating.jar`.
It is possible to verify that the correct Fluo iterators are installed for the
table by running this command in the Accumulo shell:
config -t rya_pcj_updater -f iterators
.
The configuration of accumulo/conf/accumulo-site.xml
needs to be updated to
explicitly include a definition for the property general.vfs.cache.dir
. The
Accumulo tablet servers need to be restarted to get the new property.
Depending on system configuration, /tmp
or /var/lib/accumulo
may be
appropriate. An example entry is listed below:
<property>
<name>general.vfs.cache.dir</name>
<value>/var/lib/accumulo</value>
<description>Directory to use for the vfs cache. The cache will keep a soft
reference to all of the classes loaded in the VM. This should be on local disk on
each node with sufficient space. It defaults to /tmp and will use a directory with the
format "accumulo-vfs-cache-" + System.getProperty("user.name","nouser")</description>
</property>
If the YARN NodeManagers in your cluster have firewalls enabled, it will be necessary to specify and open a dedicated port for the Fluo Oracle YARN container. The Oracle is a mandatory component of every Fluo Application.
To specify the port, modify your Fluo Application's
fluo-1.0.0-incubating/apps/rya_pcj_updater/conf/fluo.properties
file to contain
the the following section:
# Fluo Internal Implementation Properties (Not part of public API)
------------------------------------------------------------------
# The Fluo Oracle uses a random free port by default. Specify a port
# here and open it on the firewall of all potential YARN NodeManagers.
fluo.impl.oracle.port=[port number]
Fluo's underlying Apache Twill version does not support assignment of a port or
port range to the Resource Manager's Tracking URL. As a result, it is always
assigned to a random free port on a NodeManager. This makes it impossible to
use some of Fluo's administrative functionality
(for example, bin/fluo stop rya_pcj_updater
) on a cluster where firewalls are
enabled on the NodeManagers. Even with this limitation, it is still possible to
successfully launch the Rya PCJ Updater app and terminate it when desired.
If your target execution environment has firewalls enabled, the following issues may occur while starting and stopping.
It is likely that the command bin/fluo start rya_pcj_updater
will timeout while waiting for a ResourceReport from the Twill TrackerService,
or you may throw a series of java.net.NoRouteToHostException
exceptions
like in the following listing:
...
15:57:39.802 [main] INFO o.a.f.cluster.runner.YarnAppRunner - Waiting for ResourceReport from Twill. Elapsed time = 10000 ms
15:57:45.913 [ STARTING] INFO o.a.h.y.c.api.impl.YarnClientImpl - Submitted application application_1496425295778_0015
15:57:49.838 [main] INFO o.a.f.cluster.runner.YarnAppRunner - Waiting for ResourceReport from Twill. Elapsed time = 20000 ms
15:57:53.434 [main] ERROR o.a.twill.yarn.ResourceReportClient - Exception getting resource report from http://<my-application-master-host>:<random-port>/resources.
java.net.NoRouteToHostException: No route to host
at java.net.PlainSocketImpl.socketConnect(Native Method) ~[na:1.8.0_102]
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) ~[na:1.8.0_102]
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) ~[na:1.8.0_102]
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) ~[na:1.8.0_102]
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[na:1.8.0_102]
at java.net.Socket.connect(Socket.java:589) ~[na:1.8.0_102]
at java.net.Socket.connect(Socket.java:538) ~[na:1.8.0_102]
at sun.net.NetworkClient.doConnect(NetworkClient.java:180) ~[na:1.8.0_102]
at sun.net.www.http.HttpClient.openServer(HttpClient.java:432) ~[na:1.8.0_102]
at sun.net.www.http.HttpClient.openServer(HttpClient.java:527) ~[na:1.8.0_102]
at sun.net.www.http.HttpClient.<init>(HttpClient.java:211) ~[na:1.8.0_102]
at sun.net.www.http.HttpClient.New(HttpClient.java:308) ~[na:1.8.0_102]
at sun.net.www.http.HttpClient.New(HttpClient.java:326) ~[na:1.8.0_102]
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1169) ~[na:1.8.0_102]
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1105) ~[na:1.8.0_102]
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:999) ~[na:1.8.0_102]
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:933) ~[na:1.8.0_102]
at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1513) ~[na:1.8.0_102]
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441) ~[na:1.8.0_102]
at java.net.URL.openStream(URL.java:1045) ~[na:1.8.0_102]
at org.apache.twill.yarn.ResourceReportClient.get(ResourceReportClient.java:52) ~[twill-yarn-0.6.0-incubating.jar:0.6.0-incubating]
at org.apache.twill.yarn.YarnTwillController.getResourceReport(YarnTwillController.java:303) [twill-yarn-0.6.0-incubating.jar:0.6.0-incubating]
at org.apache.fluo.cluster.runner.YarnAppRunner.getResourceReport(YarnAppRunner.java:302) [fluo-cluster-1.0.0-incubating.jar:1.0.0-incubating]
at org.apache.fluo.cluster.runner.YarnAppRunner.start(YarnAppRunner.java:232) [fluo-cluster-1.0.0-incubating.jar:1.0.0-incubating]
at org.apache.fluo.cluster.command.FluoCommand.main(FluoCommand.java:74) [fluo-cluster-1.0.0-incubating.jar:1.0.0-incubating]
...
As long as the application is submitted and is shown to be running in the Hadoop YARN UI for running applications, the Rya PCJ Updater app has likely been started correctly. To verify, look at the YARN container log files to ensure that no unexpected errors occurred.
It is likely that the command bin/fluo stop rya_pcj_updater
will fail. If that occurs, look up the YARN Application-Id in the YARN UI,
or with the command yarn application -list
and then kill it with a command
similar to: yarn application -kill application_1503402439867_0009
.
There are a number of optimizations that can boost the performance of the Rya PCJ Updater. The main bottleneck that will prevent an instance of the PCJ Updater from scaling is the load that the application places on the Accumulo Tablet Servers. In an effort to mitigate this, there are a number of things that can be done to lighten the scan load on each Tablet Server and cut down on the number of scans overall.
The PCJ Updater uses metadata associated with each query node to route and process intermediate query results.
New queries can be dynamically added to and deleted from the Rya PCJ Updater, but for the most part this data
is static and can be cached. So the PCJ Updater aggressively caches whatever metadata it can to avoid lookups. In addition,
each time the Updater processes new statements, it must match the new triples with StatementPatterns registered with the
PCJ Updater. The ids of these StatementPatterns are also cached to avoid costly scans for new StatementPatterns. All
metadata caches are active and utilized by default.
Another important optimization that can drastically boost performance is sharding. Sharding ensures that the processing and scanning load is equally distributed among all Tablet Servers. By default, the PCJ Updater shards its rows. However, to take advantage of this optimization, the user must pre-split the Fluo table after initializing the application. To do this, add the following properties to the fluo.properties file for the application before initializing:
# RowHasher Split Properties
# -------------------
fluo.app.recipes.optimizations.SP=org.apache.fluo.recipes.core.data.RowHasher$Optimizer
fluo.app.recipes.optimizations.J=org.apache.fluo.recipes.core.data.RowHasher$Optimizer
fluo.app.recipes.optimizations.A=org.apache.fluo.recipes.core.data.RowHasher$Optimizer
fluo.app.recipes.optimizations.PR=org.apache.fluo.recipes.core.data.RowHasher$Optimizer
fluo.app.recipes.optimizations.Q=org.apache.fluo.recipes.core.data.RowHasher$Optimizer
fluo.app.recipes.rowHasher.SP.numTablets=8
fluo.app.recipes.rowHasher.J.numTablets=8
fluo.app.recipes.rowHasher.A.numTablets=8
fluo.app.recipes.rowHasher.PR.numTablets=8
fluo.app.recipes.rowHasher.Q.numTablets=8
The above properties are used by the Fluo RowHasher recipe, which splits the Fluo table along each specified prefix. The properties above indicate that the Fluo table will be split across 8 tablets along each of the prefixes SP, J, A, PR, and Q. Each of these prefixes correspond to prefixes of distinct result ranges stored in the Rya PCJ Updater. The choice of 8 tablets is arbitrary and should be based on available resources. After adding the above properties and initializing the application, execute the following command:
./bin/fluo exec <app_name> org.apache.fluo.recipes.accumulo.cmds.OptimizeTable
This command generates and applies the splits indicated by above properties. See the Fluo row hash prefix recipe for more details.
The PCJ Updater application retains processed notifications and triples that are marked for deletion until a minor or major compaction runs and triggers the Fluo Garbage Collection iterator. Because new Triples are processed by the TripleObserver and then immediately marked as deleted, it's quite possible that a large number of "deleted" triples could build up before they are actually removed from the table. Similarly, old notifications that have already been processed and marked as deleted can pile up as well. As these entries build up in the Fluo table, Tablet Servers have to scan over these entries when the Fluo NotificationIterator is run, creating extra work for the Tablet Servers. To count the number of "DELETED" notifications and triples that are in the table, run the following commands:
#counts number of old notifications
./bin/fluo scan <app_name> --raw -c ntfy | grep -c 'DELETE'
#counts number of deleted triples
./bin/fluo scan <app_name> --raw -c triples | grep -c 'DELETE'
It's good practice to monitor how these quantities grow after starting the application to get a sense of whether or not Accumulo compactions are being executed frequently enough. One possible optimization to increase the compaction rate is to adjust the compaction ratio through the Accumulo shell. After initializing the PCJ Updater application, execute the following command in the Accumulo shell:
config -t <app_table_name> -s table.compaction.major.ratio=1.0
This sets the major compaction ratio of the Fluo table to 1.0, where the lower the ratio, the more frequently major compactions will occur. Another approach is to compact on a specified Range. Fluo supports periodically compacting on specified ranges. To do this, add any of the following hex ranges to the fluo.properties file before initializing the PCJ Updater.
#Rya PCJ Updater Compaction ranges
#Compact all triples
fluo.app.recipes.transientRange.triples=543C3C3A3E3E:543C3C3A3E3EFF
#Compact all statement pattern results
fluo.app.recipes.transientRange.statementPattern=53503C3C3A3E3E:53503C3C3A3E3EFF
#Compact all join results
fluo.app.recipes.transientRange.join=4A3C3C3A3E3E:4A3C3C3A3E3EFF
#Compact all aggregation results
fluo.app.recipes.transientRange.aggregation=413C3C3A3E3E:413C3C3A3E3EFF
#Compact all projection results
fluo.app.recipes.transientRange.projection=50523C3C3A3E3E:50523C3C3A3E3EFF
#Compact full table
fluo.app.recipes.transientRange.fullTable=:FF
To apply the above transient range properties, execute the following Fluo command:
./bin/fluo exec <app_name> org.apache.fluo.recipes.accumulo.cmds.CompactTransient <compaction_period> <multiplier>
The above command executes a compaction over each transient range that is included in the fluo.properties file. These compactions will run indefinitely and execute every time the compaction period (in ms) elapses. The multiplier is an optional parameter that indicates how much the periodic compaction script will throttle compactions if they begin taking too long. See the Fluo transient data recipe for more information.
If running compactions proves to be extremely costly and begins to affect application performance, it's best to only do a range compaction on the triples. This will clean up old, deleted triples and any notifications related to triples. If the Tablet Servers can handle the additional load, try adding additional ranges to clean up old data. Note that when the full table range is added, there is no need to include any of the other ranges.
To apply the above optimizations,
- Stop the Rya PCJ Updater app if it is running by executing the following command
./bin/fluo stop <app_name>
-
Once the application is stopped, add any optimization related properties to the fluo.properties file. These properties include the row sharding properties and the range compaction properties outlined above. Note that it is best to add all of the row shard properties to ensure an even data distribution among all of the Tablet Servers, and start off by adding only the triple transient range property for Range compactions.
-
Initialize the Rya PCJ Updater application by executing the following command in Fluo
./bin/fluo init <app_name>
- Apply the row shard splits by executing
./bin/fluo exec <app_name> org.apache.fluo.recipes.accumulo.cmds.OptimizeTable
-
Check the PCJ Updater table in the Accumulo UI to ensure that the table has the correct number of Tablet Servers
-
Start the application by executing the following command
./bin/fluo start <app_name>
- Apply the transient range compactions by executing the following command
./bin/fluo exec <app_name> org.apache.fluo.recipes.accumulo.cmds.CompactTransient <compaction_period> <multiplier>
Once the application is running, there are a number ways to assess its performance. The primary method is to track the number of outstanding notifications that are queued and waiting to be processed. This can be done by executing the Fluo wait command
./bin/fluo wait <app_name>
When this script executes, it polls Fluo every ten seconds to determine how many unprocessed notifications are queued up. If this number grows over time then the application needs more workers to handle the ingest load.
Before deploying more workers, first verify that the Tablet Servers can handle an increased scan load by checking the Accumulo UI to see if the Tablet Servers have any queued scans for the PCJ Updater table. If there are a large number of queued scans for the table, adding more workers might not be possible. It may be necessary to lower the ingest rate. If the Tablet Servers are keeping up, deploy additional workers by stopping the application, updating the fluo.properties to use more workers, and re-initializing and starting the application.
Other ways to assess the performance of the application is to monitor the scan rate through the Accumulo UI. If the scan rate is staying approximately constant over time (or increasing very slowly), then the application is performing as expected. In general, the scan rate increases because
- The number of intermediate results increases over time
- The number of "deleted" notifications that need to flushed increases over time
- The number of "deleted" triples that need to be flushed increases over time
The first item is unavoidable if there is no age off policy for the PCJ Updater application (which is currently the case).
However, 2 and 3 are avoidable by applying the range compaction optimizations discussed above. So it's important to monitor the
scan rate (and the number of old triples and notifications as discussed above) to assess the health of the application.
Finally an additional item to monitor is which iterators are running in the Fluo table. This can be done by regularly running the listscans command in the Accumulo shell. This gives a sense of how the Tablet Servers are being used. It helps determine whether they are spending most of their time finding new notifications (internal Fluo work), or issuing scans that are specific to Rya PCJ Updater Observers.