Skip to content

Commit

Permalink
[ISSUE #4852] Support Jraft algorithm as meta storage (#4862)
Browse files Browse the repository at this point in the history
* init raft function

* init function

* finish this function.

* rename file and remove unnecessary plugin.

* properties modify

* properties modify

* properties modify
  • Loading branch information
karsonto authored May 30, 2024
1 parent 761366c commit 5a409fd
Show file tree
Hide file tree
Showing 19 changed files with 1,402 additions and 0 deletions.
55 changes: 55 additions & 0 deletions eventmesh-meta/eventmesh-meta-raft/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins {
id 'com.google.protobuf' version '0.8.17'
}

def grpcVersion = '1.50.2' // CURRENT_GRPC_VERSION
def protobufVersion = '3.21.5'
def protocVersion = protobufVersion

dependencies {
implementation ("io.grpc:grpc-protobuf:${grpcVersion}") {
exclude group: "com.google.protobuf", module: "protobuf-java"
}
implementation("com.google.protobuf:protobuf-java:${protobufVersion}")
implementation "io.grpc:grpc-stub:${grpcVersion}"
implementation "com.google.protobuf:protobuf-java-util:${protobufVersion}"
implementation "javax.annotation:javax.annotation-api:1.3.2"
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'

implementation project(":eventmesh-meta:eventmesh-meta-api")
implementation project(":eventmesh-common")
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.0.1'
implementation "com.alipay.sofa:jraft-core:1.3.14"
implementation "com.alipay.sofa:rpc-grpc-impl:1.3.14"
testImplementation 'org.junit.jupiter:junit-jupiter:5.6.0'
}

protobuf {
protoc { artifact = "com.google.protobuf:protoc:${protocVersion}" }
plugins {
grpc { artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" }
}
generateProtoTasks {
all()*.plugins {
grpc {}
}
}
}
19 changes: 19 additions & 0 deletions eventmesh-meta/eventmesh-meta-raft/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

pluginType=metaStorage
pluginName=raft
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.meta.raft;

import org.apache.eventmesh.meta.raft.consts.MetaRaftConstants;
import org.apache.eventmesh.meta.raft.rpc.RequestResponse;

import java.util.Map;
import java.util.concurrent.CompletableFuture;

import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Status;


public abstract class EventClosure implements Closure {

private CompletableFuture<RequestResponse> future;

private RequestResponse requestResponse;

private EventOperation eventOperation;

public static EventClosure createDefaultEventClosure() {
return new EventClosure() {

@Override
public void run(Status status) {

}
};
}

public void setFuture(CompletableFuture<RequestResponse> future) {
this.future = future;
}

public void setRequestResponse(RequestResponse requestResponse) {
this.requestResponse = requestResponse;
if (future != null) {
future.complete(getRequestResponse());
}
}

public RequestResponse getRequestResponse() {
return requestResponse;
}

public EventOperation getEventOperation() {
return eventOperation;
}

protected void failure(final String errorMsg, final String redirect) {
final RequestResponse response = RequestResponse.newBuilder().setSuccess(false).setErrorMsg(errorMsg)
.setRedirect(redirect).build();
setRequestResponse(response);
}

public void setEventOperation(EventOperation opreation) {
this.eventOperation = opreation;
}

protected void success(final Map<String, String> map) {

final RequestResponse response = RequestResponse.newBuilder().setValue(MetaRaftConstants.RESPONSE)
.setSuccess(true).putAllInfo(map).build();
setRequestResponse(response);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.meta.raft;

import org.apache.eventmesh.meta.raft.consts.MetaRaftConstants;
import org.apache.eventmesh.meta.raft.rpc.RequestResponse;

import java.io.Serializable;
import java.util.Map;


public class EventOperation implements Serializable {

private static final long serialVersionUID = -6597003954824547294L;

public static final byte PUT = 0x01;

public static final byte GET = 0x02;

public static final byte DELETE = 0x03;

private byte op;
private Map<String, String> data;

public static EventOperation createOpreation(RequestResponse response) {
if (response.getValue() == MetaRaftConstants.PUT) {
return new EventOperation(PUT, response.getInfoMap());
} else if (response.getValue() == MetaRaftConstants.GET) {
return new EventOperation(GET, response.getInfoMap());
} else if (response.getValue() == MetaRaftConstants.DELETE) {
return new EventOperation(DELETE, response.getInfoMap());

}
return null;
}

public EventOperation(byte op, Map<String, String> data) {
this.op = op;
this.data = data;
}

public byte getOp() {
return op;
}

public Map<String, String> getData() {
return data;
}

public boolean isReadOp() {
return GET == this.op;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.meta.raft;


import org.apache.eventmesh.meta.raft.rpc.RequestResponse;

/**
* MetaService.
*/
public interface JraftMetaService {

void handle(RequestResponse request, EventClosure closure);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.meta.raft;

import org.apache.eventmesh.meta.raft.rpc.RequestResponse;

import org.apache.commons.lang.StringUtils;

import java.nio.ByteBuffer;

import com.alipay.remoting.exception.CodecException;
import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.error.RaftError;

public class JraftMetaServiceImpl implements JraftMetaService {

JraftServer server;


public JraftMetaServiceImpl(JraftServer server) {
this.server = server;
}

@Override
public void handle(RequestResponse request, EventClosure closure) {
applyOperation(EventOperation.createOpreation(request), closure);
}

public void applyOperation(EventOperation opreation, EventClosure closure) {
if (!isLeader()) {
handlerNotLeaderError(closure);
return;
}
try {
closure.setEventOperation(opreation);
final Task task = new Task();
task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(opreation)));
task.setDone(closure);
this.server.getNode().apply(task);
} catch (CodecException e) {
String errorMsg = "Fail to encode EventOperation";
closure.failure(errorMsg, StringUtils.EMPTY);
closure.run(new Status(RaftError.EINTERNAL, errorMsg));
}
}


private String getRedirect() {
return this.server.redirect().getRedirect();
}

private boolean isLeader() {
return this.server.getFsm().isLeader();
}


private void handlerNotLeaderError(final EventClosure closure) {
closure.failure("Not leader.", getRedirect());
closure.run(new Status(RaftError.EPERM, "Not leader"));
}
}
Loading

0 comments on commit 5a409fd

Please sign in to comment.