Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Evented IO #11

Open
RX14 opened this issue Sep 30, 2016 · 7 comments
Open

Evented IO #11

RX14 opened this issue Sep 30, 2016 · 7 comments

Comments

@RX14
Copy link

RX14 commented Sep 30, 2016

Does this library work with crystal's evented IO? i.e. Switching to other fibers when reading is blocked, or does it only provide Poll?

@hkochev
Copy link

hkochev commented Oct 1, 2016

If you review the Ruby library examples, they are implemented with Treads . I was not able to synchronize well with channels on more complex cases, so I decided to wait till Treading model " Crystalizes" in the language.

@washu
Copy link
Contributor

washu commented Feb 7, 2019

patch for evented io
require "zeromq"

Crystal Update for zmq sockets

module Crystal::EventLoop
def self.create_fd_write_event(sock : ZMQ::Socket, edge_triggered : Bool = false)
flags = LibEvent2::EventFlags::Write
flags |= LibEvent2::EventFlags::Persist | LibEvent2::EventFlags::ET
event = @@eb.new_event(sock.fd, flags, sock) do |s, flags, data|
sock_ref = data.as(ZMQ::Socket)
zmq_events = sock_ref.events
is_writable = zmq_events & ZMQ::POLLOUT
if is_writable && flags.includes?(LibEvent2::EventFlags::Write)
sock_ref.resume_write
elsif flags.includes?(LibEvent2::EventFlags::Timeout)
sock_ref.resume_write(timed_out: true)
end
end
event
end
def self.create_fd_read_event(sock : ZMQ::Socket, edge_triggered : Bool = false)
flags = LibEvent2::EventFlags::Read
flags |= LibEvent2::EventFlags::Persist | LibEvent2::EventFlags::ET
event = @@eb.new_event(sock.fd, flags, sock) do |s, flags, data|
sock_ref = data.as(ZMQ::Socket)
zmq_events = sock_ref.events
is_readable = zmq_events & ZMQ::POLLIN
if is_readable && flags.includes?(LibEvent2::EventFlags::Read)
sock_ref.resume_read
elsif flags.includes?(LibEvent2::EventFlags::Timeout)
sock_ref.resume_read(timed_out: true)
end
end
event
end

defined for signal handler to break a blocked service i.e. stop the loop, then ctx_terminate

def self.stopLoop
@@eb.loop_break
end
end
module ZMQ
class Socket
include IO::Syscall
@read_event : Crystal::Event?
@write_event : Crystal::Event?
def fd
get_socket_option(ZMQ::FD).as(Int32)
end

def events
  get_socket_option(ZMQ::EVENTS).as(Int32)
end

def close
@read_event.try &.free
@read_event = nil
@write_event.try &.free
@write_event = nil
@closed = true
LibZMQ.close @socket
end
# patches here for libevent support
private def add_read_event(timeout = @read_timeout)
event = @read_event ||= Crystal::EventLoop.create_fd_read_event(self,true)
event.add timeout
nil
end

private def add_write_event(timeout = @write_timeout)
  event = @write_event ||= Crystal::EventLoop.create_fd_write_event(self,true)
  event.add timeout
  nil
end

def send_message(message : AbstractMessage, flags = 0)
  # we always send in non block mode and add a wait writable
  loop do
    rc = LibZMQ.msg_send(message.address, @socket, flags | ZMQ::DONTWAIT) # NOTE: 0mq docs state that msg_send do not require message.close
    if rc == -1
      if Util.errno == Errno::EAGAIN
        wait_writable
      else
        raise Util.error_string
      end
    else
      return Util.resultcode_ok?(rc)
    end
  end
ensure
  if (writers = @writers) && !writers.empty?
    add_write_event
  end
end
def receive_strings(flags = 0)
  receive_messages(flags).map do |msg|
    str = msg.to_s
    msg.close()
    str
  end
end

def receive_message(flags = 0) : AbstractMessage
  loop do
    message = @message_type.new
    rc = LibZMQ.msg_recv(message.address, @socket, flags | ZMQ::DONTWAIT)
    if rc == -1
        if Util.errno == Errno::EAGAIN
          wait_readable
        else
          raise Util.error_string
        end
    else
      return message
    end
  end
ensure
  if (readers = @readers) && !readers.empty?
    add_read_event
  end
end
def receive_messages(flags = 0)
  loop do
    messages = [] of AbstractMessage
    message = @message_type.new
    rc = LibZMQ.msg_recv(message.address, @socket, flags | ZMQ::DONTWAIT)
    if rc == -1
      if Util.errno == Errno::EAGAIN
        wait_readable
      else
        raise Util.error_string
      end
    else
      messages << message
      if more_parts?
        loop do
          message = @message_type.new
          rc = LibZMQ.msg_recv(message.address, @socket, flags)
          if Util.resultcode_ok?(rc)
              messages << message
              return messages unless more_parts?
          else
            message.close
            messages.map(&.close)
            return messages.clear
          end
        end
      else
         return messages
      end
    end
  end
ensure
  if (readers = @readers) && !readers.empty?
    add_read_event
  end
end

end
end

If you see any obsvious issues let me know please. this is my first crystal patch

@RX14
Copy link
Author

RX14 commented Feb 15, 2019

@washu please use

```cr
code
```

to paste your code so it's copy-pastable

@washu
Copy link
Contributor

washu commented Feb 15, 2019

require "zeromq"

# Crystal Update for zmq sockets
module Crystal::EventLoop
  def self.create_fd_write_event(sock : ZMQ::Socket, edge_triggered : Bool = false)
      flags = LibEvent2::EventFlags::Write
      flags |= LibEvent2::EventFlags::Persist | LibEvent2::EventFlags::ET #we always do this as ZMQ is edge triggers
      event = @@eb.new_event(sock.fd, flags, sock) do |_, sflags, data|
        sock_ref = data.as(ZMQ::Socket)
        zmq_events = sock_ref.events
        is_writable = zmq_events & ZMQ::POLLOUT
        if is_writable && sflags.includes?(LibEvent2::EventFlags::Write)
          sock_ref.resume_write
        elsif sflags.includes?(LibEvent2::EventFlags::Timeout)
          sock_ref.resume_write(timed_out: true)
        end
      end
      event
  end
  def self.create_fd_read_event(sock : ZMQ::Socket, edge_triggered : Bool = false)
      flags = LibEvent2::EventFlags::Read
      flags |= LibEvent2::EventFlags::Persist | LibEvent2::EventFlags::ET #we always do this as ZMQ is edge triggers
      event = @@eb.new_event(sock.fd, flags, sock) do |_, sflags, data|
        sock_ref = data.as(ZMQ::Socket)
        zmq_events = sock_ref.events
        is_readable = zmq_events & ZMQ::POLLIN
        if is_readable && sflags.includes?(LibEvent2::EventFlags::Read)
          sock_ref.resume_read
        elsif sflags.includes?(LibEvent2::EventFlags::Timeout)
          sock_ref.resume_read(timed_out: true)
        end
      end
      event
  end
  def self.stop_loop
    @@eb.loop_break
  end
end
module ZMQ
  class Socket
    include IO::Syscall
    @read_event : Crystal::Event?
    @write_event : Crystal::Event?

    # file descriptor
    def fd
      get_socket_option(ZMQ::FD).as(Int32)
    end
    # event list
    def events
      get_socket_option(ZMQ::EVENTS).as(Int32)
    end
   def close
    @read_event.try &.free
    @read_event = nil
    @write_event.try &.free
    @write_event = nil
    @closed = true
     LibZMQ.close @socket
   end
    # patches here for libevent  support
    private def add_read_event(timeout = @read_timeout)
      event = @read_event ||= Crystal::EventLoop.create_fd_read_event(self,true)
      event.add timeout
      nil
    end

    private def add_write_event(timeout = @write_timeout)
      event = @write_event ||= Crystal::EventLoop.create_fd_write_event(self,true)
      event.add timeout
      nil
    end

    def send_message(message : AbstractMessage, flags = 0)
      # we always send in non block mode and add a wait writable, caller should close the message when done
      loop do
        rc = LibZMQ.msg_send(message.address, @socket, flags | ZMQ::DONTWAIT)
        if rc == -1
          if Util.errno == Errno::EAGAIN
            wait_writable
          else
            raise Util.error_string
          end
        else
          return Util.resultcode_ok?(rc)
        end
      end
    ensure
      if (writers = @writers) && !writers.empty?
        add_write_event
      end
    end
    def receive_strings(flags = 0)
      receive_messages(flags).map do |msg|
        str = msg.to_s
        msg.close()
        str
      end
    end

    def receive_message(flags = 0) : AbstractMessage
      loop do
        message = @message_type.new
        rc = LibZMQ.msg_recv(message.address, @socket, flags | ZMQ::DONTWAIT)
        if rc == -1
            if Util.errno == Errno::EAGAIN
              wait_readable
            else
              raise Util.error_string
            end
        else
          return message
        end
      end
    ensure
      if (readers = @readers) && !readers.empty?
        add_read_event
      end
    end
    def receive_messages(flags = 0)
      loop do
        messages = [] of AbstractMessage
        message = @message_type.new
        rc = LibZMQ.msg_recv(message.address, @socket, flags | ZMQ::DONTWAIT)
        if rc == -1
          if Util.errno == Errno::EAGAIN
            wait_readable
          else
            raise Util.error_string
          end
        else
          messages << message
          if more_parts?
            loop do
              message = @message_type.new
              rc = LibZMQ.msg_recv(message.address, @socket, flags)
              if Util.resultcode_ok?(rc)
                  messages << message
                  return messages unless more_parts?
              else
                message.close
                messages.map(&.close)
                return messages.clear
              end
            end
          else
             return messages
          end
        end
      end
    ensure
      if (readers = @readers) && !readers.empty?
        add_read_event
      end
    end
  end
end

@washu
Copy link
Contributor

washu commented Feb 15, 2019

i cleaned it up a little from ameba results for style and lint.

@vladfaust
Copy link

@washu could you open a PR please? I would love to see this shard properly implemented

@washu
Copy link
Contributor

washu commented Mar 12, 2019

I made a pr, however it fails to build due to project config for sodium it seems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants