Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

[REVIEW-ONLY] Python binding for high level apis #613

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

patilvikram
Copy link
Contributor

@patilvikram patilvikram commented Apr 27, 2017

@PramodSSImmaneni @chinmaykolhatkar

I have created this REVIEW-ONLY PR for extending Python support for extending High Level APIs. Currently this is in early stage so I will add documentations which will help you setup environment to launch python app from your machine. Also I will include simple example in this PR.

@@ -21,12 +21,13 @@
import java.io.IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this change here? Can you rebase?

</executions>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it need to be redefined? Can it not be inherited from the parent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed this one

python/pom.xml Outdated
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<!--<scope>provided</scope>-->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these not provided?

@@ -0,0 +1,14 @@

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this go into standard malhar/examples?

@pramodin
Copy link
Contributor

There are some binary files checked in, example py4j-0.10.4.jar, py4j-0.10.4-src.zip, py4j-0.10.4.tar.gz. Are these needed?

* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.python.operator;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Package path doesn't have stream. What is the benefit of having this class here instead of in python package.

int pythonServerStartAttempts = 5;
while (!this.py4jListener.isPythonServerStarted() && !this.pythonWorkerProxy.isFunctionEnabled() && pythonServerStartAttempts > 0) {
try {
Thread.sleep(5000L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timeout should be configurable

Thread.sleep(5000L);
LOG.debug("Waiting for Python Worker Registration");
--pythonServerStartAttempts;
} catch (InterruptedException var9) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable name.. sounds generated

this.pythonWorkerProxy.setFunction(this.operationType.getType());
}
if (!this.py4jListener.isPythonServerStarted()) {
LOG.error("Python server could not be started");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this throw an exception and fail

@@ -77,7 +78,7 @@
* @param <O> type of the output
* @return new stream of type O
*/
<O, STREAM extends ApexStream<O>> STREAM addOperator(Operator op, Operator.InputPort<T> inputPort, Operator.OutputPort<O> outputPort, Option... opts);
<O, STREAM extends ApexStream<O>> STREAM addOperator(Operator op, Operator.InputPort<T> inputPort, Operator.OutputPort<O> outputPort, Option... opts);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid formatting change if there is no other change here.

* @param serializedFunction stores Serialized Function data
* @return new stream of type T
*/
<STREAM extends ApexStream<T>> STREAM map_func(byte[] serializedFunction, Option... opts);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function name not in java style


}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Formatting change may be unnecessary

foundPort = serverSocket.getLocalPort();
serverSocket.close();
return foundPort;
} catch (IOException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finally to close socket

Copy link
Contributor

@tushargosavi tushargosavi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are few binaries files present in the pull request, need to discuss with community about which types of binary files are acceptable in repository.


public class InMemoryDataInputOperator<T> implements InputOperator
{

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra space


from pyapex import createApp
a=createApp('python_app').fromKafka08('localhost:2181','test_topic') \
.setFilter('filter_operator',f) \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer functions names such as map/filter/flatmap rather than setMap, setFilter, setFlatmap.


```

def f(a):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Give meaningful names to functions in documentations. how about filterCondition?

@Override
public void beginWindow(long l)
{

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra space

pom.xml Outdated
@@ -25,7 +25,7 @@
<parent>
<groupId>org.apache.apex</groupId>
<artifactId>apex</artifactId>
<version>3.4.0</version>
<version>3.6.0</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apache Apex dependency is already moved to 3.6, you can undo this change.

} else {
pythonEnvPath = py4jDependencyFile.getAbsolutePath();
}
LOG.debug("Final python environment path with Py4j depenency path: " + pythonEnvPath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use {} in LOG statements


import java.util.Map;

public interface PythonWorker<T>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add java documentation for public interfaces / classes and methods

stream/pom.xml Outdated
@@ -94,6 +94,35 @@
<artifactId>cglib</artifactId>
<version>3.2.1</version>
</dependency>

<dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove these dependencies from stream package.

@@ -212,4 +210,6 @@
*/
WindowedStream<T> window(WindowOption windowOption, TriggerOption triggerOption, Duration allowLateness);


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra lines added

@@ -80,7 +81,7 @@ public void testAddOperator()
// Assert the stream is from first operator to second operator
Assert.assertEquals("first", stream.getSource().getOperatorMeta().getName());
Assert.assertTrue(1 == stream.getSinks().size());
Assert.assertEquals("second", stream.getSinks().get(0).getOperatorWrapper().getName());
Assert.assertEquals("second", ((List<LogicalPlan.InputPortMeta>)(stream.getSinks())).get(0).getOperatorMeta().getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this part of your code change?

@tushargosavi
Copy link
Contributor

@patilvikram please make sure that jenkins / travis build passes.

@patilvikram patilvikram force-pushed the python_binding_for_review branch 5 times, most recently from 92a1f1b to cf79a17 Compare June 20, 2017 10:07
@patilvikram patilvikram force-pushed the python_binding_for_review branch 7 times, most recently from 5336cd4 to a043f96 Compare July 11, 2017 15:28
def __init__(self, trigger_type, count):
super(CountTrigger, self).__init__(trigger_type)
self.count = count

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove Extra lines from bottom of this file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add documentation to code

setup(name='pyapex',
version='0.0.4',
py_modules=['pyapex','pyapex.runtime','pyapex.functions'],
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove

fi
if [ -f "$MVN_GENERATED_PATH" ]; then
# development launch mode
DT_CORE_JAR="$BUILD_DIR/malhar-python-3.8.0-SNAPSHOT.jar"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this versions dyanmically stamped

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check how I can include current artifact in the classpath files like MVN_GENERATED_PATH

#ln -sf $PYAPEX_HOME "$script_dir/pyapex"
if [ -z "$PYTHONPATH" ]
then
export PYTHONPATH="$PYAPEX_HOME/deps/pyapex-0.0.4-src.zip"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add versions dynamically here

{
private static final Logger LOG = LoggerFactory.getLogger(PythonGenericOperator.class);
protected byte[] serializedFunction = null;
private PythonServer server = null;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make Python Server transient

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed this one

{

private static final Logger LOG = LoggerFactory.getLogger(PythonWindowedOperator.class);
private PythonServer server = null;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make PythonServer server transient

assert False, "Invalid Option"


if __name__ == "__main__":
Copy link
Contributor Author

@patilvikram patilvikram Jul 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update this one in pyshell to call apexcli commands directly for kill / shutdown or any other updates

@darrengarvey
Copy link

Did this branch die? Has it been superceded by another?

I got here from this ticket which was linked from the roadmap.

@patilvikram
Copy link
Contributor Author

Did this branch die? Has it been superceded by another?

I got here from this ticket which was linked from the roadmap.

@darrengarvey Right now its still at the dangling stage, the review is not yet completed. Let me know if you have any questions about this issue or PR.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants