Skip to content

Commit

Permalink
dist agent support
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangjunfeng committed Jul 19, 2024
1 parent 5a74fea commit 13933f6
Show file tree
Hide file tree
Showing 67 changed files with 2,407 additions and 985 deletions.
4 changes: 2 additions & 2 deletions ignite-extensions/modules/ignite-graph/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
<okhttp.version>3.14.4</okhttp.version>
<scala.version>2.11.12</scala.version>
<spark.version>2.4.7</spark.version>
<tinkerpop.version>3.7.0</tinkerpop.version>
<gremlin.version>3.7.0</gremlin.version>
<tinkerpop.version>3.7.2</tinkerpop.version>
<gremlin.version>3.7.2</gremlin.version>

</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public CacheConfiguration<String, BinaryObject> createCacheCfg(final String tabl
final CacheConfiguration<String, BinaryObject> cfg = new CacheConfiguration<>();
cfg.setName(table);
cfg.setStoreKeepBinary(false);
cfg.setIndexedTypes(new Class[] { String.class, BinaryObject.class });

cfg.setCacheMode(cacheMode);
cfg.setQueryEntities(qes);
// add@byron
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,8 @@ public IgniteConnection(String namespace,String cfg) {
public IgniteAdmin getAdmin() {
return admin;
}


public String namespace() {
return admin.namespace();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ protected IgniteElement(IgniteGraph graph,
Long updatedAt,
Map<String, Object> properties,
boolean propertiesFullyLoaded) {
this.namespace = graph.getIgniteGraphConfiguration().getGraphNamespace();
this.namespace = graph.admin.namespace();
this.id = id;
this.label = label;
this.createdAt = createdAt;
Expand Down Expand Up @@ -124,13 +124,13 @@ public boolean arePropertiesFullyLoaded() {
return propertiesFullyLoaded;
}

public void copyFrom(IgniteElement element) {
public void copyFrom(IgniteElement element) {
if (element.label != null) this.label = element.label;
if (element.createdAt != null) this.createdAt = element.createdAt;
if (element.updatedAt != null) this.updatedAt = element.updatedAt;
if (element.properties != null
&& (element.propertiesFullyLoaded || this.properties == null)) {
this.properties = new ConcurrentHashMap<>(element.properties);
this.properties = element.properties;
this.propertiesFullyLoaded = element.propertiesFullyLoaded;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,20 +105,20 @@ public class IgniteGraph implements Graph {
* This method is invoked by Gremlin's GraphFactory
* and defines the starting point for further tasks.
*/
public static IgniteGraph open(final Configuration properties) throws IgniteGraphException {
return new IgniteGraph(properties);
public static IgniteGraph open(String name,Configuration properties) throws IgniteGraphException {
return new IgniteGraph(name,properties);
}

public static IgniteGraph getGraph(String name) throws IgniteGraphException {
return graphInstances.get(name);
}

public IgniteGraph(Configuration properties) {
this(new IgniteGraphConfiguration(properties));
public IgniteGraph(String name,Configuration properties) {
this(new IgniteGraphConfiguration(name,properties));
}

public IgniteGraph(IgniteGraphConfiguration config) throws IgniteGraphException {
this(config, IgniteGraphUtils.getConnection(config));
this(config, IgniteGraphUtils.getConnection(config.getNamespace(),config));
}

public IgniteGraph(IgniteGraphConfiguration config, IgniteConnection connection) throws IgniteGraphException {
Expand Down Expand Up @@ -154,7 +154,7 @@ public IgniteGraph(IgniteGraphConfiguration config, IgniteConnection connection)
.expireAfterAccess(config.getRelationshipCacheTtlSecs(), TimeUnit.SECONDS)
.build();

graphInstances.put(config.getGraphNamespace(), this);
graphInstances.put(connection.namespace(), this);

}

Expand All @@ -176,7 +176,7 @@ private void init() throws IgniteGraphException {
IgniteGraphUtils.createTables(config, admin);

} catch (Exception e) {
logger.error("Failed to create table for graph "+ config.getGraphNamespace(), e);
logger.error("Failed to create table for graph "+ admin.namespace(), e);
throw new IgniteGraphException(e);
}

Expand Down Expand Up @@ -355,8 +355,7 @@ public Vertex vertex(Object id) {

public Vertex findOrCreateVertex(Object id) {
return findVertex(id, true);
}

}


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ public class IgniteGraphConfiguration extends AbstractConfiguration implements S
private static final long serialVersionUID = -7150699702127992270L;

private final PropertiesConfiguration conf;
private final String namespace;

public static final Class<? extends Graph> IGNITE_GRAPH_CLASS = IgniteGraph.class;

public static final String IGNITE_GRAPH_CLASSNAME = IGNITE_GRAPH_CLASS.getCanonicalName();

public static class Keys {
public static final String GRAPH_NAMESPACE = "gremlin.ignite.namespace";
public static class Keys {
public static final String GRAPH_CLASS = "gremlin.graph";
public static final String GRAPH_PROPERTY_TYPE = "gremlin.graph.propertyType";
public static final String GLOBAL_CACHE_MAX_SIZE = "gremlin.ignite.globalCacheMaxSize";
Expand All @@ -51,11 +51,13 @@ public static class Keys {
* A minimal configuration for the IgniteGraph
*/
public IgniteGraphConfiguration() {
this.namespace = "default";
conf = new PropertiesConfiguration();
conf.setProperty(Keys.GRAPH_CLASS, IGNITE_GRAPH_CLASSNAME);
}

public IgniteGraphConfiguration(Configuration config) {
public IgniteGraphConfiguration(String namespace,Configuration config) {
this.namespace = namespace;
conf = new PropertiesConfiguration();
conf.setProperty(Keys.GRAPH_CLASS, IGNITE_GRAPH_CLASSNAME);
if (config != null) {
Expand All @@ -64,20 +66,6 @@ public IgniteGraphConfiguration(Configuration config) {
}
}

public String getGraphNamespace() {
return conf.getString(Keys.GRAPH_NAMESPACE, "default");
}

public IgniteGraphConfiguration setGraphNamespace(String name) {
if (!isValidGraphName(name)) {
throw new IllegalArgumentException("Invalid graph namespace."
+ " Only alphanumerics and underscores are allowed");
}

conf.setProperty(Keys.GRAPH_NAMESPACE, name);
return this;
}

private static boolean isValidGraphName(String name) {
return name.matches("^[A-Za-z0-9_]+$");
}
Expand Down Expand Up @@ -133,4 +121,8 @@ protected void clearPropertyDirect(String key) {
conf.clearProperty(key);
}

public String getNamespace() {
return namespace;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,22 @@ public final class IgniteGraphUtils {

private static final Map<String, IgniteConnection> connections = new ConcurrentHashMap<>();

public static IgniteConnection getConnection(IgniteGraphConfiguration config) {

IgniteConnection conn;

String namespace = config.getGraphNamespace();
public static IgniteConnection getConnection(String namespace,IgniteGraphConfiguration config) {

/* Check whether connection already exists */
conn = connections.get(namespace);
IgniteConnection conn = connections.get(namespace);
if (conn != null) return conn;

String igniteCfg = config.getString("gremlin.graph.ignite.cfg");

conn = new IgniteConnection(namespace,igniteCfg);

connections.put(config.getGraphNamespace(), conn);
connections.put(namespace, conn);
return conn;
}

public static String getTableName(IgniteGraphConfiguration config, String name) {
String ns = config.getGraphNamespace();
String ns = config.getNamespace();
return ns + "_" + name;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,8 @@ public Graph processResultGraphPersist(final GraphComputer.ResultGraph resultGra
this.addPropertiesToOriginalGraph();
return this.graph;
} else {
IgniteGraphConfiguration config = this.graph.configuration();
config.setGraphNamespace(resultGraph.name());
final IgniteGraph newGraph = IgniteGraph.open(config);
IgniteGraphConfiguration config = this.graph.configuration();
final IgniteGraph newGraph = IgniteGraph.open(resultGraph.name(),config);
this.graph.vertices().forEachRemaining(vertex -> {
final Vertex newVertex = newGraph.addVertex(T.id, vertex.id(), T.label, vertex.label());
vertex.properties().forEachRemaining(vertexProperty -> {
Expand All @@ -174,9 +173,8 @@ public Graph processResultGraphPersist(final GraphComputer.ResultGraph resultGra
this.addPropertiesToOriginalGraph();
return this.graph;
} else {
IgniteGraphConfiguration config = this.graph.configuration();
config.setGraphNamespace(resultGraph.name());
final IgniteGraph newGraph = IgniteGraph.open(config);
IgniteGraphConfiguration config = this.graph.configuration();
final IgniteGraph newGraph = IgniteGraph.open(resultGraph.name(),config);
this.graph.vertices().forEachRemaining(vertex -> {
final Vertex newVertex = newGraph.addVertex(T.id, vertex.id(), T.label, vertex.label());
vertex.properties().forEachRemaining(vertexProperty -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public void load(Edge edge, IgniteResult result) {
for (IgniteColumn column : result.getColumns()) {
String colName = column.getColName();
switch (colName) {
case IgniteConstants.ID_COL_NAME:
break;
case IgniteConstants.LABEL_COL_NAME:
label = column.getColValue().toString();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public void load(Vertex vertex, IgniteResult result) {
for (IgniteColumn column : result.getColumns()) {
String colName = column.getColName();
switch (colName) {
case IgniteConstants.ID_COL_NAME:
break;
case IgniteConstants.LABEL_COL_NAME:
label = column.getColValue().toString();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ protected void addGraph(final String name, final String configurationFile) {
try {

Configurations configs = new Configurations();
final Graph newGraph = IgniteGraph.open(configs.properties(configurationFile));
final Graph newGraph = IgniteGraph.open(name,configs.properties(configurationFile));
putGraph(name, newGraph);

logger.info("Graph [{}] was successfully configured via [{}].", name, configurationFile);
Expand Down
Loading

0 comments on commit 13933f6

Please sign in to comment.