diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 00000000..8aceb3eb --- /dev/null +++ b/.gitattributes @@ -0,0 +1 @@ +*.kt linguist-language=Kotlin diff --git a/.github/workflows/android.yml b/.github/workflows/android.yml deleted file mode 100644 index 2a6ff2d6..00000000 --- a/.github/workflows/android.yml +++ /dev/null @@ -1,17 +0,0 @@ -name: Android CI - -on: [push] - -jobs: - build: - - runs-on: ubuntu-18.04 - - steps: - - uses: actions/checkout@v1 - - name: set up JDK 1.8 - uses: actions/setup-java@v1 - with: - java-version: 1.8 - - name: Build with Gradle - run: ./gradlew build diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 00000000..fca34af8 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,37 @@ +language: android +sudo: required +jdk: oraclejdk8 + +before_cache: + -rm -f $HOME/.gradle/caches/modules-2/modules-2.lock + -rm -fr $HOME/.gradle/caches/*/plugin-resolution/ +cache: + directories: + - $HOME/.gradle/caches/ + - $HOME/.gradle/wrapper/ + +env: + global: + - ANDROID_API=29 + - EMULATOR_API=29 + - ANDROID_BUILD_TOOLS=29.0.2 + - ADB_INSTALL_TIMEOUT=5 # minutes + +android: + components: + - tools + - platform-tools + - build-tools-$ANDROID_BUILD_TOOLS + - android-$ANDROID_API + - android-$EMULATOR_API_LEVEL + - extra-google-m2repository + - extra-android-m2repository # for design library + - addon-google_apis-google-19 # google play services + - sys-img-armeabi-v7a-addon-google_apis-google-$ANDROID_API_LEVEL + - sys-img-armeabi-v7a-addon-google_apis-google-$EMULATOR_API_LEVEL + licenses: + - android-sdk-preview-license-.+ + - android-sdk-license-.+ + - google-gdk-license-.+ + script: + - ./gradlew build diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 8e3d3197..e6491bd4 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,13 +1,13 @@ # Contributing Guidelines -We would be happy ot have more volunterers for the android worker. This library constitutes and important component of on-device machine learning and you will be a part of this huge community -You can always join us on slack if you find an issue in which you are intertested. +We would be happy to have more volunterers for the android worker. This library constitutes and important component of on-device machine learning and you will be a part of this huge community +You can always join us on slack if you find an issue in which you are interested. Here are some important resources: * [Openmined.org](https://www.openmined.org/) describes the role and long term aim of the organisation, * [Pysyft](https://github.com/OpenMined/PySyft) repository is the hub of all encrypted algorithms and training protocols. The android worker acts as a client for it! - * [Syft.js](https://github.com/OpenMined/syft.js) is web interface fro Pysyft + * [Syft.js](https://github.com/OpenMined/syft.js) is the web interface for Pysyft ## Testing diff --git a/README.md b/README.md index 53513058..8c979f29 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,26 @@ -# KotlinSyft +# KotlinSyft +![License](https://img.shields.io/github/license/openmined/KotlinSyft) +![Language](https://img.shields.io/github/languages/top/openmined/KotlinSyft) +![Size](https://img.shields.io/github/repo-size/openmined/KotlinSyft) + + +## Introduction This is the android worker Library for [PySyft](https://github.com/OpenMined/PySyft) -## License -KotlinSyft is available under the Apache 2 license. See the LICENSE file for more info. +Of course, [PySyft](https://github.com/openmined/pysyft) has the ability to run in its own environment but the final training procedure needs to deployed on the mobile workers using Torchscript. + +**KotlinSyft employs P2P connectivity for realization of distributed pysyft protocols.** + +### Local Development + +1. Fork and clone +2. Open Android Studio and import project +4. Do your work. +5. Push to your fork +6. Submit a PR to openmined/KotlinSyft + +### Contributing + +Read [CONTRIBUTING.md](https://github.com/OpenMined/KotlinSyft/blob/master/CONTRIBUTING.md) + diff --git a/demo-app/build.gradle b/demo-app/build.gradle index bbf037d7..22e6e303 100644 --- a/demo-app/build.gradle +++ b/demo-app/build.gradle @@ -15,23 +15,31 @@ android { versionName "1.0" testInstrumentationRunner "androidx.test.runner.AndroidJUnitRunner" } + compileOptions { + sourceCompatibility = 1.8 + targetCompatibility = 1.8 + } buildTypes { release { minifyEnabled false proguardFiles getDefaultProguardFile('proguard-android-optimize.txt'), 'proguard-rules.pro' } } + packagingOptions { + exclude 'META-INF/LICENSE.md' + exclude 'META-INF/LICENSE-notice.md' + + } } dependencies { - implementation"org.jetbrains.kotlin:kotlin-stdlib-jdk7:$kotlin_version" + implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk7:$kotlin_version" implementation 'androidx.appcompat:appcompat:1.1.0' implementation 'androidx.core:core-ktx:1.1.0' // TODO During the first stages of the project, include the library here. Later on we should use the library from the repository implementation project(path: ':syftlib') - testImplementation 'junit:junit:4.12' androidTestImplementation 'androidx.test.ext:junit:1.1.1' androidTestImplementation 'androidx.test.espresso:espresso-core:3.2.0' } diff --git a/logo/pysyft_android.png b/logo/pysyft_android.png new file mode 100644 index 00000000..86fcdb4d Binary files /dev/null and b/logo/pysyft_android.png differ diff --git a/settings.gradle b/settings.gradle index 7b91c1d0..18367a04 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,4 +1,4 @@ include ':demo-app' include ':syftlib' -rootProject.name='KotlinSyft' +rootProject.name = 'KotlinSyft' diff --git a/syftlib/build.gradle b/syftlib/build.gradle index b360d0d0..19f8531a 100644 --- a/syftlib/build.gradle +++ b/syftlib/build.gradle @@ -23,15 +23,32 @@ android { proguardFiles getDefaultProguardFile('proguard-android-optimize.txt'), 'proguard-rules.pro' } } + compileOptions { + sourceCompatibility = 1.8 + targetCompatibility = 1.8 + } + + lintOptions { + lintConfig file("lint.xml") + } + + packagingOptions { + exclude 'META-INF/LICENSE.md' + exclude 'META-INF/LICENSE-notice.md' + } } dependencies { implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk7:$kotlin_version" implementation 'androidx.appcompat:appcompat:1.1.0' implementation 'androidx.core:core-ktx:1.1.0' + implementation 'org.webrtc:google-webrtc:1.0.30039' - testImplementation 'junit:junit:4.12' + implementation 'com.google.code.gson:gson:2.8.5' + debugImplementation "org.mockito:mockito-junit-jupiter:3.2.4" androidTestImplementation 'androidx.test.ext:junit:1.1.1' androidTestImplementation 'androidx.test.espresso:espresso-core:3.2.0' + testImplementation 'org.junit.jupiter:junit-jupiter:5.5.2' + testImplementation "org.mockito:mockito-core:3.2.4" } diff --git a/syftlib/lint.xml b/syftlib/lint.xml new file mode 100644 index 00000000..46b01482 --- /dev/null +++ b/syftlib/lint.xml @@ -0,0 +1,6 @@ + + + + + + diff --git a/syftlib/src/main/AndroidManifest.xml b/syftlib/src/main/AndroidManifest.xml index 03c3ef20..7fa04dd8 100644 --- a/syftlib/src/main/AndroidManifest.xml +++ b/syftlib/src/main/AndroidManifest.xml @@ -1,2 +1 @@ - + diff --git a/syftlib/src/main/java/org/openmined/syft/Socket.kt b/syftlib/src/main/java/org/openmined/syft/Socket.kt new file mode 100644 index 00000000..279d5332 --- /dev/null +++ b/syftlib/src/main/java/org/openmined/syft/Socket.kt @@ -0,0 +1,18 @@ +package org.openmined.syft + +import java.net.Socket + + +class Socket(private val url: String, private val workerId: String, keepAliveTimeout: Int = 20000) { + + val socket = Socket(url, 888) + val timerId: String? = null + + fun send(type: String, data: String) { +//TODO decide okhttp vs scarlet for websocket connections + } + + fun stop() { + socket.close() + } +} \ No newline at end of file diff --git a/syftlib/src/main/java/org/openmined/syft/WebRTC.kt b/syftlib/src/main/java/org/openmined/syft/WebRTC.kt new file mode 100644 index 00000000..8d568f01 --- /dev/null +++ b/syftlib/src/main/java/org/openmined/syft/WebRTC.kt @@ -0,0 +1,311 @@ +package org.openmined.syft + +import android.util.Log +import org.webrtc.DataChannel +import org.webrtc.IceCandidate +import org.webrtc.MediaStream +import org.webrtc.PeerConnection +import org.webrtc.PeerConnectionFactory +import org.webrtc.RtpReceiver +import org.webrtc.SdpObserver +import org.webrtc.SessionDescription +import java.nio.ByteBuffer + + +typealias SDP_Type = SessionDescription.Type + +const val WEBRTC_JOIN_ROOM = "webrtc: join-room" +private const val TAG = "WebRTCClient" + +class WebRTCClient( + peerOptions: PeerConnectionFactory.Options, + private val peerConfig: PeerConnection.RTCConfiguration, + private val socket: Socket +) { + + private val peerConnectionFactory = PeerConnectionFactory.builder() + .setOptions(peerOptions) + .createPeerConnectionFactory() + + private val peers = HashMap() + private lateinit var workerId: String + private lateinit var scopeId: String + + fun start(workerId: String, scopeId: String) { + Log.d(TAG, "Joining room $scopeId") + + this.workerId = workerId + this.scopeId = scopeId + //TODO send as json {workerID,scopeId} + socket.send(WEBRTC_JOIN_ROOM, "{$workerId,$scopeId}") + } + + private fun createConnection(newWorkerId: String) { + Log.d(TAG, "Creating Connection as answer") + val pcObserver = PeerConnectionObserver(newWorkerId, SDP_Type.ANSWER) + val pc = peerConnectionFactory.createPeerConnection(peerConfig, pcObserver) + peers[newWorkerId] = Peer(pc, null, pcObserver, SDPObserver(newWorkerId, SDP_Type.ANSWER)) + } + + fun stop() { + Log.d(TAG, "WebRTC disconnecting from peers") + peers.forEach { (newWorkerId, peer) -> + if (peer.channel != null) + removePeer(newWorkerId) + } + } + + private fun removePeer(newWorkerId: String) { + if (!peers.keys.contains(newWorkerId)) return + + Log.d(TAG, "`Closing connection to $newWorkerId") + try { + peers[newWorkerId]!!.channel?.close() + //uncomment this if we need to close the connection as well + //peers[workerId]!!.connection.close() + } catch (e: Exception) { + Log.e(TAG, "error removing peer $newWorkerId", e) + } + peers.remove(newWorkerId) + } + + // Given a message, this function allows you to "broadcast" a message to all peers + // Alternatively, you may send a targeted message to one specific peer (specified by the "to" param) + fun sendMessage(message: String, to: String?) { + Log.d(TAG, "sending message $message") + + if (to != null && !to.contentEquals(workerId) && peers[to]?.channel != null) + send(peers[to]!!.channel!!, message) + else + peers.filterValues { it.channel != null } + .forEach { (_, peer) -> send(peer.channel!!, message) } + } + + private fun send(channel: DataChannel, msg: String) { + try { + channel.send(DataChannel.Buffer(ByteBuffer.wrap(msg.toByteArray()), false)) + } catch (e: Exception) { + Log.e(TAG, "error sending message", e) + } + } + + private fun sendInternalMessage(type: String, message: String?, to: String) { + //TODO implement this + } + + fun receiveNewPeer(newWorkerId: String) { + Log.d(TAG, "Adding new peer") + val pcObserver = PeerConnectionObserver(newWorkerId, SDP_Type.OFFER) + val pc = peerConnectionFactory.createPeerConnection(peerConfig, pcObserver) + peers[newWorkerId] = Peer(pc, null, pcObserver, SDPObserver(newWorkerId, SDP_Type.OFFER)) + // add DataChannel constraints in init if needed. Currently default initialization + peers[newWorkerId]?.apply { + channel = pc?.createDataChannel("dataChannel", DataChannel.Init()) + dataChannelObserver = DataChannelObserver(channel) + channel?.registerObserver(dataChannelObserver) + connection?.createOffer(sdpObserver, null) + } + } + + fun receiveInternalMessage(type: String, newWorkerId: String, sessionDescription: String) { + + when (type) { + "candidate" -> { + Log.d(TAG, "remote candidate received") + if (!peers.containsKey(newWorkerId)) + createConnection(newWorkerId) + peers[newWorkerId]?.connection?.addIceCandidate( + IceCandidate( + null, + -1, + sessionDescription + ) + ) + } + "offer" -> { + Log.d(TAG, "remote offer received") + if (!peers.containsKey(newWorkerId)) + createConnection(newWorkerId) + + peers[newWorkerId]?.apply { + connection?.setRemoteDescription( + sdpObserver, + SessionDescription(SessionDescription.Type.OFFER, sessionDescription) + ) + connection?.createAnswer(sdpObserver, null) + } + } + "answer" -> { + Log.d(TAG, "remote answer received") + peers[newWorkerId]?.apply { + connection?.setRemoteDescription( + sdpObserver, + SessionDescription(SessionDescription.Type.ANSWER, sessionDescription) + ) + } + } + } + + } + + inner class SDPObserver( + private val newWorkerId: String, + private val creatorType: SDP_Type + ) : SdpObserver { + + override fun onSetFailure(p0: String?) { + Log.d(TAG, "error setting description") + } + + override fun onSetSuccess() { + val connection = peers[newWorkerId]?.connection ?: return + val sendIce = { + peers[newWorkerId]!!.candidateQueue.forEach { + sendInternalMessage( + "candidate", + it.sdp, + newWorkerId + ) + } + } + + if (creatorType == SDP_Type.OFFER) { + // For offering peer connection we first create offer and set + // local SDP, then after receiving answer set remote SDP. + if (connection.remoteDescription == null) { + // We've just set our local SDP so time to send it + Log.d( + TAG, + "successfully finished setting ${creatorType.canonicalForm()} as Local description" + ) + Log.d(TAG, "sending ${creatorType.canonicalForm()} and stored ICE candidates") + sendInternalMessage( + creatorType.canonicalForm(), + connection.localDescription.description, + newWorkerId + ) + } else { + Log.d(TAG, "successfully set Remote description") + sendIce() + } + } else { + // For answering peer connection we set remote SDP and then + // create answer and set local SDP. + if (connection.localDescription != null) { + // We've just set our local SDP so time to send answer and local ICE candidates. + Log.d( + TAG, + "successfully finished setting ${creatorType.canonicalForm()} as Local description" + ) + Log.d(TAG, "sending ${creatorType.canonicalForm()} and stored ICE candidates") + sendInternalMessage( + creatorType.canonicalForm(), + connection.localDescription.description, + newWorkerId + ) + sendIce() + } else { + // We've just set remote SDP - do nothing for now - + // answer will be created soon. + Log.d(TAG, "successfully finished setting offer as Remote description") + } + + } + } + + override fun onCreateSuccess(sessionDescription: SessionDescription) { + // This is called when answer or offer is created + Log.d(TAG, "created") + if (peers[newWorkerId]?.connection != null && peers[newWorkerId]?.connection?.localDescription != null) { + Log.e(TAG, "multiple SDP creation") + return + } + peers[newWorkerId]?.connection?.setLocalDescription(this, sessionDescription) + } + + override fun onCreateFailure(error: String?) { + Log.e(TAG, "error creating : $error") + } + + } + + inner class PeerConnectionObserver( + private val newWorkerId: String, + private val creatorType: SDP_Type + ) : PeerConnection.Observer { + + override fun onIceCandidate(new_candidate: IceCandidate?) { + if (new_candidate != null) { + Log.d(TAG, "Saving new ICE Candidate") + peers[newWorkerId]?.candidateQueue?.add(new_candidate) + } + } + + override fun onDataChannel(dc: DataChannel) { + Log.d(TAG, "Calling onDataChannel ${dc.label()}") + dc.registerObserver(DataChannelObserver(dc)) + peers[newWorkerId]?.channel = dc + + } + + override fun onIceConnectionReceivingChange(p0: Boolean) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + override fun onIceConnectionChange(p0: PeerConnection.IceConnectionState?) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + override fun onIceGatheringChange(gatheringState: PeerConnection.IceGatheringState) {} + + override fun onSignalingChange(signalingState: PeerConnection.SignalingState) { + Log.d(TAG, "Signalling State: $signalingState") + } + + override fun onIceCandidatesRemoved(p0: Array?) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + override fun onRenegotiationNeeded() { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. + } + + override fun onAddTrack(p0: RtpReceiver?, p1: Array?) {} + override fun onRemoveStream(p0: MediaStream?) {} + override fun onAddStream(p0: MediaStream?) {} + + } + + class DataChannelObserver(private val dataChannel: DataChannel?) : DataChannel.Observer { + override fun onMessage(buffer: DataChannel.Buffer) { + if (buffer.binary) { + Log.d(TAG, "Data channel received binary message at $dataChannel") + return + } + + Log.d(TAG, "Data channel message ${buffer.data}") + } + + override fun onBufferedAmountChange(previous_amount: Long) { + Log.d( + TAG, + "buffered amount changed from $previous_amount to ${dataChannel?.bufferedAmount()}" + ) + } + + override fun onStateChange() { + Log.d(TAG, "Data channel state changed to ${dataChannel?.state()}") + } + + } + + class Peer( + var connection: PeerConnection?, + var channel: DataChannel?, + val peerConnectionObserver: PeerConnectionObserver, + val sdpObserver: SDPObserver + ) { + val candidateQueue = mutableListOf() + lateinit var dataChannelObserver: DataChannelObserver + } +} \ No newline at end of file diff --git a/syftlib/src/test/java/syft/WebRTCClientTest.kt b/syftlib/src/test/java/syft/WebRTCClientTest.kt new file mode 100644 index 00000000..60ae52e6 --- /dev/null +++ b/syftlib/src/test/java/syft/WebRTCClientTest.kt @@ -0,0 +1,5 @@ +package org.openmined.syft + +private const val TAG = "WebRTC test" + +class WebRTCClientTest \ No newline at end of file