Skip to content

Commit

Permalink
agent server
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangjunfeng committed Jun 27, 2024
1 parent 64d2c83 commit 5a74fea
Show file tree
Hide file tree
Showing 136 changed files with 2,964 additions and 2,957 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private static Ignite openMvStore(String fileName) {
}
Ignite mvStore = Ignition.start(fileName);
return mvStore;
}
}

@Override
protected MongoDatabase openOrCreateDatabase(String databaseName) {
Expand Down Expand Up @@ -102,7 +102,7 @@ protected Document getServerDescription(){
}
}
Document response = super.getServerDescription();
ObjectId processId = new ObjectId(admin.cluster().id().toString().replaceAll("-", "").substring(0,24));
//ObjectId processId = new ObjectId(admin.cluster().id().toString().replaceAll("-", "").substring(0,24));
//response.append("topologyVersion", processId);
//response.append("hosts", hostSet);
//response.put("primary",primary);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public IgniteBinaryCollection(IgniteDatabase database, String collectionName, Co
if(collectionName.startsWith("igfs-internal-")) { // igfs
this.readOnly = true;
}
if(collectionName.startsWith("wc_")) { // web-console
if(collectionName.startsWith("readonly_")) { // web-console
this.readOnly = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public static String indexName(List<IndexKey> keys) {

@Override
protected Iterable<String> listCollectionNamespaces() {
return mvStore.cacheNames().stream().filter(c-> !c.startsWith(INDEX_DB_PREFIX))
return mvStore.cacheNames().stream().filter(c-> !c.startsWith(INDEX_DB_PREFIX) && !c.startsWith("igfs-internal-"))
.map(n->databaseName+'.'+n)
.collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
Expand All @@ -31,6 +32,7 @@
import org.apache.ignite.binary.BinaryObjectBuilder;
import org.apache.ignite.binary.BinaryObjectException;
import org.apache.ignite.internal.binary.BinaryArray;
import org.apache.ignite.internal.binary.BinaryEnumObjectImpl;
import org.apache.ignite.internal.binary.BinaryFieldMetadata;
import org.apache.ignite.internal.binary.BinaryObjectImpl;
import org.apache.ignite.internal.binary.BinaryTypeImpl;
Expand All @@ -48,7 +50,9 @@
import de.bwaldvogel.mongo.backend.Missing;
import de.bwaldvogel.mongo.backend.Utils;
import de.bwaldvogel.mongo.bson.BinData;
import de.bwaldvogel.mongo.bson.Bson;
import de.bwaldvogel.mongo.bson.Document;
import de.bwaldvogel.mongo.json.JsonConverter;
import de.bwaldvogel.mongo.wire.BsonConstants;
import de.bwaldvogel.mongo.wire.bson.BsonEncoder;

Expand Down Expand Up @@ -184,7 +188,7 @@ public static Object bsonObjectToJavaObject(Object $value){
if(item instanceof Number || item instanceof CharSequence) {

}
else if(false) {
else if(item instanceof Document) {
List<Object> list = new ArrayList<>($arr.size());
for(int i=0;i<$arr.size();i++) {
item = $arr.get(i);
Expand Down Expand Up @@ -284,14 +288,90 @@ public static Object toBinaryKey(Object key) {
}
return key;
}

/**
* Binary decoder value only support bson type
* @param key
* @param idField
* @return
*/
public static Object toBsonValue(Object $value) {
if($value!=null) {
if($value instanceof CharSequence || $value instanceof Number || $value instanceof UUID || $value instanceof Bson){
return $value;
}
else if($value.getClass().isArray()) {
if($value instanceof Object[]) {
Object [] arr = (Object[])$value;
if(arr.length>0) {
if(arr[0] instanceof BinaryObject) {
List<Object> $arr2 = new ArrayList<>(arr.length);
for(int i=0;i< arr.length;i++) {
$arr2.add(toBsonValue(arr[i]));
}
$value = $arr2;
}
}
}
return $value;
}
else if($value.getClass().isEnum()) {
return ((Enum)$value).name();
}
else if($value instanceof List){
List $arr = (List)$value;
List<Object> $arr2 = new ArrayList<>($arr.size());
for(int i=0;i<$arr.size();i++) {
Object $valueSlice = $arr.get(i);

$arr2.add(toBsonValue($valueSlice));
}
$value = $arr2;
}
else if($value instanceof Set){
Set $arr = (Set)$value;
Set<Object> $arr2 = new HashSet<>($arr.size());
Iterator<Object> it = $arr.iterator();
while(it.hasNext()) {
Object $valueSlice = it.next();
$arr2.add(toBsonValue($valueSlice));
}
$value = ($arr2);
}
else if($value instanceof Map){
Map<Object, Object> $arr = (Map)$value;
final Document docItem = new Document();
for(Map.Entry<Object, Object> ent: $arr.entrySet()) {
Object v = ent.getValue();
docItem.put(ent.getKey().toString(), toBsonValue(v));
}
$value = docItem;
}
else if($value instanceof BinaryObject){
BinaryObject $arrSlice = (BinaryObject)$value;
$value = binaryObjectToDocument($arrSlice);
}
else {
try {
byte t2 = BsonEncoder.determineType($value);
}
catch(Exception e) {
Document json = toKeyValuePairs($value);
json.append("_class", $value.getClass().getName());
$value = json;
}
}
}
return $value;
}

public static Object binaryObjectToDocument(BinaryObject bobj){
Collection<String> fields = null;
try {
if(bobj instanceof BinaryObjectImpl) {
BinaryObjectImpl bin = (BinaryObjectImpl)bobj;
if(!bin.hasSchema()) {
return bin.deserialize();
return toBsonValue(bin.deserialize());
}
}
else if(bobj instanceof BinaryArray) {
Expand All @@ -301,92 +381,39 @@ else if(bobj instanceof BinaryArray) {
}
return bin.deserialize();
}
else if(bobj instanceof BinaryEnumObjectImpl) {
BinaryEnumObjectImpl bin = (BinaryEnumObjectImpl)bobj;
return ((Enum)bin.deserialize()).name();
}

String typeName = bobj.type().typeName();
if(typeName.equals("Document") || typeName.equals("SerializationProxy")) {
return bobj.deserialize();
}

fields = bobj.type().fieldNames();
if(fields==null || fields.size()<=1) {
return bobj.deserialize();
return toBsonValue(bobj.deserialize());
}
}
catch(BinaryObjectException e) {
if(bobj instanceof BinaryEnumObjectImpl) {
BinaryEnumObjectImpl bin = (BinaryEnumObjectImpl)bobj;
return bin.enumName();
}
fields = bobj.type().fieldNames();
}

Document doc = new Document();
for(String field: fields){
String $key = field;
Object $value = bobj.field(field);
try {

if($value instanceof List){
List $arr = (List)$value;
List<Object> $arr2 = new ArrayList<>($arr.size());
for(int i=0;i<$arr.size();i++) {
Object $valueSlice = $arr.get(i);
if($valueSlice instanceof BinaryObject){
BinaryObject $arrSlice = (BinaryObject)$valueSlice;
$valueSlice = binaryObjectToDocument($arrSlice);
}
else if($valueSlice instanceof Map && $valueSlice.getClass()!=Document.class){
Map $map = (Map)$valueSlice;
$valueSlice = new Document($map);
}
$arr2.add($valueSlice);
}
$value = ($arr2);
}
else if($value instanceof Set){
Set $arr = (Set)$value;
Set $arr2 = new HashSet<>($arr.size());
Iterator it = $arr.iterator();
while(it.hasNext()) {
Object $valueSlice = it.next();
if($valueSlice instanceof BinaryObject){
BinaryObject $arrSlice = (BinaryObject)$valueSlice;
$valueSlice = binaryObjectToDocument($arrSlice);
}
$arr2.add($valueSlice);
}
$value = ($arr2);
}
else if($value instanceof Map){
Map<String, Object> $arr = (Map)$value;
final Document docItem = new Document($arr);
for(Map.Entry<String, Object> ent: $arr.entrySet()) {
Object v = ent.getValue();
if(v instanceof BinaryObject) {
BinaryObject $arrSlice = (BinaryObject)v;
v = binaryObjectToDocument($arrSlice);
ent.setValue(v);
}
else if(v instanceof Map && v.getClass()!=Document.class){
Map $map = (Map)v;
v = new Document($map);
ent.setValue(v);
}
}
$value = docItem;

}
else if($value instanceof BinaryObject){
BinaryObject $arr = (BinaryObject)$value;
$value = binaryObjectToDocument($arr);
try {
if($value!=null) {
doc.append($key, toBsonValue($value));
}

if($value instanceof Duration){
Duration $arr = (Duration)$value;
$value = $arr.toString();
}

if($value!=null) {
doc.append($key, $value);
}

} catch (Exception e) {
// TODO Auto-generated catch block
} catch (Exception e) {
e.printStackTrace();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
import java.io.IOException;
import java.io.Serializable;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.UUID;
import java.util.concurrent.ExecutorService;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.Ignition;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;

import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.plugin.CachePluginContext;
import org.apache.ignite.plugin.CachePluginProvider;
import org.apache.ignite.plugin.ExtensionRegistry;
Expand All @@ -41,6 +44,7 @@
import de.bwaldvogel.mongo.MongoServer;
import de.bwaldvogel.mongo.backend.ignite.IgniteBackend;


/**
* Security processor provider for tests.
*/
Expand All @@ -49,6 +53,8 @@ public class MongoServerPluginProvider implements PluginProvider<MongoPluginConf
public static MongoServer mongoServer;
public static IgniteBackend backend;

private static int counter = 0;

private String databaseName;

/** Ignite logger. */
Expand All @@ -58,8 +64,6 @@ public class MongoServerPluginProvider implements PluginProvider<MongoPluginConf
private MongoPluginConfiguration cfg;

private MongoPlugin mongoPlugin = new MongoPlugin();

private int counter = 0;


/** {@inheritDoc} */
Expand Down Expand Up @@ -148,14 +152,21 @@ public class MongoServerPluginProvider implements PluginProvider<MongoPluginConf
/** {@inheritDoc} */
@Override public void onIgniteStart() {

// start mongodb singerton when admin grid start
// start mongodb singerton when admin grid start
if(cfg!=null && mongoServer==null) {
try {
synchronized(MongoServerPluginProvider.class) {
if(mongoServer==null) {
mongoServer = new MongoServer(backend);
mongoServer.bind(cfg.getHost(),cfg.getPort());
log.info("mongoServer","listern on "+cfg.getHost()+":"+cfg.getPort());
mongoServer = new MongoServer(backend);
IgniteEx ignite = (IgniteEx) Ignition.ignite(databaseName);
ExecutorService workerPool = ignite.context().pools().getRestExecutorService();
if (workerPool == null) {
workerPool = ignite.context().pools().getServiceExecutorService();
}
InetSocketAddress addr = new InetSocketAddress(cfg.getHost(), cfg.getPort());
mongoServer.bind(addr,1, 0, workerPool);

log.info("mongoServer","listern on "+cfg.getHost()+":"+cfg.getPort());
}
}

Expand Down
2 changes: 1 addition & 1 deletion ignite-extensions/modules/mqtt-java-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<relativePath>../../parent-internal/pom.xml</relativePath>
</parent>

<groupId>io.stuart</groupId>
<groupId>org.apache.ignite</groupId>
<artifactId>stuart</artifactId>
<version>0.1.0</version>
<packaging>jar</packaging>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,9 @@ private void ackPluginsInfo() {
else {
for (PluginProvider plugin : plugins.values()) {
U.quietAndInfo(log, " ^-- " + plugin.name() + " " + plugin.version());
U.quietAndInfo(log, " ^-- " + plugin.copyright());
if(plugin.copyright()!=null) {
U.quietAndInfo(log, "\t ^-- " + plugin.copyright());
}
U.quietAndInfo(log, "");
}
}
Expand Down
Loading

0 comments on commit 5a74fea

Please sign in to comment.