Skip to content

Commit

Permalink
Add yampi
Browse files Browse the repository at this point in the history
  • Loading branch information
yitzchak committed Aug 6, 2023
1 parent e8da60f commit a0682f3
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 0 deletions.
56 changes: 56 additions & 0 deletions src/lisp/yampi/example.lisp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
(ql:quickload :yampi)

;;; Start up several terminals. Then in one terminal do
;;; cando --load example.lisp --eval "(defvar ch (make-server \"~/connect.sexp\"))"
;;; In each of the other terminals do
;;; cando --load example.lisp --eval "(defvar ch (make-client \"~/connect.sexp\"))"
;;; In the clients you can do things like (yampi:send ch nil +job-request+)
;;; The server will respond with a job id.
;;; In the server you can also broadcast a stop message (yampi:send ch nil +stop+)

(defconstant +stop+ 0)
(defconstant +job-request+ 1)
(defconstant +job-done+ 2)
(defconstant +job-abort+ 3)

(defclass server (yampi:server)
((next-job-id :accessor next-job-id
:initform 0)))

(defmethod yampi:receive ((channel server) identity (code (eql +job-request+)) &rest parts)
(declare (ignore parts))
(format *debug-io* "~&Received job-request from ~s~%" identity)
(yampi:send channel identity code
(format nil "~a" (incf (next-job-id channel)))))

(defmethod yampi:receive ((channel server) identity (code (eql +job-done+)) &rest parts)
(declare (ignore parts))
(format *debug-io* "~&Received job-done from ~s~%" identity))

(defmethod yampi:receive ((channel server) identity (code (eql +job-abort+)) &rest parts)
(declare (ignore parts))
(format *debug-io* "~&Received job-abort from ~s~%" identity))

(defun make-server (connection-path)
(let ((channel (make-instance 'server)))
(yampi:start channel connection-path :endpoint "tcp://0.0.0.0:*")
channel))

(defclass client (yampi:client) ())

(defmethod yampi:receive ((channel client) identity (code (eql +job-request+)) &rest parts)
(format *debug-io* "~&Received job-request reply ~s~%" parts))

(defmethod yampi:receive ((channel client) identity (code (eql +stop+)) &rest parts)
(declare (ignore parts))
(format *debug-io* "~&Received stop~%")
(yampi:stop channel))

(defun make-client (connection-path)
(let ((channel (make-instance 'client)))
(yampi:start channel connection-path)
(yampi:subscribe channel +stop+)
channel))

(defmethod yampi:receive (channel identity code &rest parts)
(format *debug-io* "~&No handler for code ~a~%" code))
163 changes: 163 additions & 0 deletions src/lisp/yampi/message.lisp
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
(in-package #:yampi)

(defconstant +zmq-poll-timeout+ 500)

(defun read-binary-part (socket msg)
(pzmq:msg-recv msg socket)
(cffi:foreign-array-to-lisp (pzmq:msg-data msg)
(list :array :uint8 (pzmq:msg-size msg))
:element-type '(unsigned-byte 8)))

(defun read-string-part (socket msg)
(pzmq:msg-recv msg socket)
(handler-case
(cffi:foreign-string-to-lisp (pzmq:msg-data msg)
:count (pzmq:msg-size msg)
:encoding :utf-8)
(babel-encodings:character-decoding-error ()
"")))

(defun more-parts-p (msg)
(not (zerop (pzmq::%msg-more msg))))

(defgeneric start (channel connection-path &key endpoint))

(defgeneric stop (channel))

(defgeneric send (channel identity code &rest parts))

(defgeneric receive (channel identity code &rest parts))

(defclass channel ()
((context :reader context
:initform (pzmq:ctx-new))
(control :accessor control)
(broadcast :accessor broadcast)
(thread :accessor thread)))

(defmethod stop ((channel channel))
(bordeaux-threads:interrupt-thread (thread channel)
(lambda ()
(throw 'shutdown nil)))
(pzmq:close (control channel))
(pzmq:close (broadcast channel))
(pzmq:ctx-destroy (context channel)))

(defclass server (channel) ())

(defmethod initialize-instance :after ((instance server) &rest initargs &key)
(declare (ignore initargs))
(setf (control instance) (pzmq:socket (context instance) :router)
(broadcast instance) (pzmq:socket (context instance) :pub))
instance)

(defmethod start ((channel server) connection-path &key endpoint)
(with-accessors ((control control)
(broadcast broadcast)
(thread thread))
channel
(pzmq:bind control endpoint)
(pzmq:bind broadcast endpoint)
(with-open-file (stream connection-path :direction :output
:if-does-not-exist :create
:if-exists :supersede)
(with-standard-io-syntax
(write `(:control ,(pzmq:getsockopt control :last-endpoint)
:broadcast ,(pzmq:getsockopt broadcast :last-endpoint))
:stream stream)))
(setf thread
(bordeaux-threads:make-thread
(lambda ()
(pzmq:with-poll-items items ((control :pollin))
(catch 'shutdown
(loop with identity
with code
with parts
for poll = (pzmq:poll items +zmq-poll-timeout+)
unless (zerop poll)
do (pzmq:with-message msg
(setf identity (read-binary-part control msg))
(unless (more-parts-p msg)
(error "Need more parts"))
(setf code (read-binary-part control msg))
(unless (= (length code) 1)
(error "Code should only have length 1"))
(setf parts (loop while (more-parts-p msg)
collect (read-string-part control msg)))
(apply #'receive channel identity (aref code 0) parts))))))))))

(defmethod send ((channel server) (identity null) code &rest parts)
(let ((broadcast (broadcast channel)))
(pzmq:send broadcast
(make-array 1 :initial-element code :element-type '(unsigned-byte 8))
:sndmore (and parts t))
(loop for (part . remaining) on parts
do (pzmq:send broadcast part
:sndmore (and remaining t)))))

(defmethod send ((channel server) identity code &rest parts)
(let ((control (control channel)))
(pzmq:send control identity :sndmore t)
(pzmq:send control
(make-array 1 :initial-element code :element-type '(unsigned-byte 8))
:sndmore (and parts t))
(loop for (part . remaining) on parts
do (pzmq:send control part
:sndmore (and remaining t)))))

(defclass client (channel) ())

(defmethod initialize-instance :after ((instance client) &rest initargs &key)
(declare (ignore initargs))
(setf (control instance) (pzmq:socket (context instance) :dealer)
(broadcast instance) (pzmq:socket (context instance) :sub))
instance)

(defmethod start ((channel client) connection-path &key endpoint)
(with-accessors ((control control)
(broadcast broadcast)
(thread thread))
channel
(with-open-file (stream connection-path)
(let ((data (with-standard-io-syntax (read stream nil nil))))
(pzmq:connect control (getf data :control))
(pzmq:connect broadcast (getf data :broadcast))))
(setf thread
(bordeaux-threads:make-thread
(lambda ()
(flet ((recv (socket)
(pzmq:with-message msg
(setf code (read-binary-part socket msg))
(unless (= (length code) 1)
(error "Code should only have length 1"))
(setf parts (loop while (more-parts-p msg)
collect (read-string-part socket msg)))
(apply #'receive channel nil (aref code 0) parts))))
(pzmq:with-poll-items items ((control :pollin) (broadcast :pollin))
(catch 'shutdown
(loop with code
with parts
for poll = (pzmq:poll items +zmq-poll-timeout+)
when (pzmq:revents items 0)
do (recv control)
when (pzmq:revents items 1)
do (recv broadcast))))))))))

(defmethod send ((channel client) (identity null) code &rest parts)
(let ((control (control channel)))
(pzmq:send control
(make-array 1 :initial-element code :element-type '(unsigned-byte 8))
:sndmore (and parts t))
(loop for (part . remaining) on parts
do (pzmq:send control part
:sndmore (and remaining t)))))

(defun subscribe (channel code)
(pzmq:setsockopt (broadcast channel)
:subscribe
(make-array 1 :initial-element code :element-type '(unsigned-byte 8))))

(defun unsubscribe (channel code)
(pzmq:setsockopt (broadcast channel)
:unsubscribe
(make-array 1 :initial-element code :element-type '(unsigned-byte 8))))
10 changes: 10 additions & 0 deletions src/lisp/yampi/packages.lisp
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
(defpackage #:yampi
(:use :cl)
(:export #:client
#:receive
#:send
#:server
#:start
#:stop
#:subscribe
#:unsubscribe))
12 changes: 12 additions & 0 deletions src/lisp/yampi/yampi.asd
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
(in-package :asdf-user)

(defsystem "yampi"
:description "Yet Another (Yitzi's Awesome) Message Passing Interface"
:version "0.1.0"
:author "Tarn W. Burton"
:licence "LGPL-3.0"
:depends-on ("bordeaux-threads"
"pzmq")
:serial t
:components ((:file "packages")
(:file "message")))

0 comments on commit a0682f3

Please sign in to comment.