Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not log the exception before attempting an non-blocking dial #114

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ _nng_py.c
Release
_nng.c
*.pyd
__pycache__
**/__pycache__/
*.swp
.pytest_cache
dist
Expand Down
6 changes: 3 additions & 3 deletions build_pynng.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
# e.g.: python setup.py build_ext -I<inc_path> -L<lib_path> -l<lib>
#elif True:
# incdirs = None
# libraries = ['pthread' 'mbedtls' 'nng']
# libraries = ['pthread', 'mbedtls', 'nng']
# objects = None
else:
incdirs = ['nng/include']
objects = ['./nng/build/libnng.a', "./mbedtls/prefix/lib/libmbedtls.a",
"./mbedtls/prefix/lib/libmbedx509.a", "./mbedtls/prefix/lib/libmbedcrypto.a"]
objects = ['./nng/build/libnng.a', "./mbedtls/prefix/lib64/libmbedtls.a",
"./mbedtls/prefix/lib64/libmbedx509.a", "./mbedtls/prefix/lib64/libmbedcrypto.a"]
libraries = ['pthread']
machine = os.uname().machine
# this is a pretty heuristic... but let's go with it anyway.
Expand Down
115 changes: 26 additions & 89 deletions nng_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,16 @@ struct nng_sockaddr_inproc {
uint16_t sa_family;
char sa_name[(128)];
};
typedef struct nng_sockaddr_inproc nng_sockaddr_inproc;
struct nng_sockaddr_path {
uint16_t sa_family;
char sa_path[(128)];
};
typedef struct nng_sockaddr_path nng_sockaddr_path;
typedef struct nng_sockaddr_path nng_sockaddr_ipc;
struct nng_sockaddr_in6 {
uint16_t sa_family;
uint16_t sa_port;
uint8_t sa_addr[16];
uint32_t sa_scope;
};
typedef struct nng_sockaddr_in6 nng_sockaddr_in6;
typedef struct nng_sockaddr_in6 nng_sockaddr_udp6;
typedef struct nng_sockaddr_in6 nng_sockaddr_tcp6;
struct nng_sockaddr_in {
uint16_t sa_family;
uint16_t sa_port;
Expand All @@ -48,25 +43,41 @@ struct nng_sockaddr_zt {
uint64_t sa_nodeid;
uint32_t sa_port;
};
struct nng_sockaddr_abstract {
uint16_t sa_family;
uint16_t sa_len;
uint8_t sa_name[107];
};
struct nng_sockaddr_storage {
uint16_t sa_family;
uint64_t sa_pad[16];
};
typedef struct nng_sockaddr_inproc nng_sockaddr_inproc;
typedef struct nng_sockaddr_path nng_sockaddr_path;
typedef struct nng_sockaddr_path nng_sockaddr_ipc;
typedef struct nng_sockaddr_in nng_sockaddr_in;
typedef struct nng_sockaddr_in nng_sockaddr_udp;
typedef struct nng_sockaddr_in nng_sockaddr_tcp;
typedef struct nng_sockaddr_in6 nng_sockaddr_in6;
typedef struct nng_sockaddr_zt nng_sockaddr_zt;
typedef struct nng_sockaddr_abstract nng_sockaddr_abstract;
typedef struct nng_sockaddr_storage nng_sockaddr_storage;
typedef union nng_sockaddr {
uint16_t s_family;
nng_sockaddr_ipc s_ipc;
nng_sockaddr_inproc s_inproc;
nng_sockaddr_in6 s_in6;
nng_sockaddr_in s_in;
nng_sockaddr_zt s_zt;
nng_sockaddr_abstract s_abstract;
nng_sockaddr_storage s_storage;
} nng_sockaddr;
enum nng_sockaddr_family {
NNG_AF_UNSPEC = 0,
NNG_AF_INPROC = 1,
NNG_AF_IPC = 2,
NNG_AF_INET = 3,
NNG_AF_INET6 = 4,
NNG_AF_ZT = 5
NNG_AF_ZT = 5,
NNG_AF_ABSTRACT = 6
};
typedef struct nng_iov {
void * iov_buf;
Expand All @@ -75,23 +86,6 @@ typedef struct nng_iov {
extern void nng_fini(void);
extern int nng_close(nng_socket);
extern int nng_socket_id(nng_socket);
extern void nng_closeall(void);
extern int nng_setopt(nng_socket, const char *, const void *, size_t);
extern int nng_setopt_bool(nng_socket, const char *, bool);
extern int nng_setopt_int(nng_socket, const char *, int);
extern int nng_setopt_ms(nng_socket, const char *, nng_duration);
extern int nng_setopt_size(nng_socket, const char *, size_t);
extern int nng_setopt_uint64(nng_socket, const char *, uint64_t);
extern int nng_setopt_string(nng_socket, const char *, const char *);
extern int nng_setopt_ptr(nng_socket, const char *, void *);
extern int nng_getopt(nng_socket, const char *, void *, size_t *);
extern int nng_getopt_bool(nng_socket, const char *, bool *);
extern int nng_getopt_int(nng_socket, const char *, int *);
extern int nng_getopt_ms(nng_socket, const char *, nng_duration *);
extern int nng_getopt_size(nng_socket, const char *, size_t *);
extern int nng_getopt_uint64(nng_socket, const char *, uint64_t *);
extern int nng_getopt_ptr(nng_socket, const char *, void **);
extern int nng_getopt_string(nng_socket, const char *, char **);
extern int nng_socket_set(nng_socket, const char *, const void *, size_t);
extern int nng_socket_set_bool(nng_socket, const char *, bool);
extern int nng_socket_set_int(nng_socket, const char *, int);
Expand Down Expand Up @@ -129,24 +123,6 @@ extern int nng_dialer_close(nng_dialer);
extern int nng_listener_close(nng_listener);
extern int nng_dialer_id(nng_dialer);
extern int nng_listener_id(nng_listener);
extern int nng_dialer_setopt(nng_dialer, const char *, const void *, size_t);
extern int nng_dialer_setopt_bool(nng_dialer, const char *, bool);
extern int nng_dialer_setopt_int(nng_dialer, const char *, int);
extern int nng_dialer_setopt_ms(nng_dialer, const char *, nng_duration);
extern int nng_dialer_setopt_size(nng_dialer, const char *, size_t);
extern int nng_dialer_setopt_uint64(nng_dialer, const char *, uint64_t);
extern int nng_dialer_setopt_ptr(nng_dialer, const char *, void *);
extern int nng_dialer_setopt_string(nng_dialer, const char *, const char *);
extern int nng_dialer_getopt(nng_dialer, const char *, void *, size_t *);
extern int nng_dialer_getopt_bool(nng_dialer, const char *, bool *);
extern int nng_dialer_getopt_int(nng_dialer, const char *, int *);
extern int nng_dialer_getopt_ms(nng_dialer, const char *, nng_duration *);
extern int nng_dialer_getopt_size(nng_dialer, const char *, size_t *);
extern int nng_dialer_getopt_sockaddr(
nng_dialer, const char *, nng_sockaddr *);
extern int nng_dialer_getopt_uint64(nng_dialer, const char *, uint64_t *);
extern int nng_dialer_getopt_ptr(nng_dialer, const char *, void **);
extern int nng_dialer_getopt_string(nng_dialer, const char *, char **);
extern int nng_dialer_set(nng_dialer, const char *, const void *, size_t);
extern int nng_dialer_set_bool(nng_dialer, const char *, bool);
extern int nng_dialer_set_int(nng_dialer, const char *, int);
Expand All @@ -166,28 +142,6 @@ extern int nng_dialer_get_string(nng_dialer, const char *, char **);
extern int nng_dialer_get_ptr(nng_dialer, const char *, void **);
extern int nng_dialer_get_ms(nng_dialer, const char *, nng_duration *);
extern int nng_dialer_get_addr(nng_dialer, const char *, nng_sockaddr *);
extern int nng_listener_setopt(
nng_listener, const char *, const void *, size_t);
extern int nng_listener_setopt_bool(nng_listener, const char *, bool);
extern int nng_listener_setopt_int(nng_listener, const char *, int);
extern int nng_listener_setopt_ms(nng_listener, const char *, nng_duration);
extern int nng_listener_setopt_size(nng_listener, const char *, size_t);
extern int nng_listener_setopt_uint64(nng_listener, const char *, uint64_t);
extern int nng_listener_setopt_ptr(nng_listener, const char *, void *);
extern int nng_listener_setopt_string(
nng_listener, const char *, const char *);
extern int nng_listener_getopt(nng_listener, const char *, void *, size_t *);
extern int nng_listener_getopt_bool(nng_listener, const char *, bool *);
extern int nng_listener_getopt_int(nng_listener, const char *, int *);
extern int nng_listener_getopt_ms(
nng_listener, const char *, nng_duration *);
extern int nng_listener_getopt_size(nng_listener, const char *, size_t *);
extern int nng_listener_getopt_sockaddr(
nng_listener, const char *, nng_sockaddr *);
extern int nng_listener_getopt_uint64(
nng_listener, const char *, uint64_t *);
extern int nng_listener_getopt_ptr(nng_listener, const char *, void **);
extern int nng_listener_getopt_string(nng_listener, const char *, char **);
extern int nng_listener_set(
nng_listener, const char *, const void *, size_t);
extern int nng_listener_set_bool(nng_listener, const char *, bool);
Expand Down Expand Up @@ -220,16 +174,6 @@ extern int nng_ctx_close(nng_ctx);
extern int nng_ctx_id(nng_ctx);
extern void nng_ctx_recv(nng_ctx, nng_aio *);
extern void nng_ctx_send(nng_ctx, nng_aio *);
extern int nng_ctx_getopt(nng_ctx, const char *, void *, size_t *);
extern int nng_ctx_getopt_bool(nng_ctx, const char *, bool *);
extern int nng_ctx_getopt_int(nng_ctx, const char *, int *);
extern int nng_ctx_getopt_ms(nng_ctx, const char *, nng_duration *);
extern int nng_ctx_getopt_size(nng_ctx, const char *, size_t *);
extern int nng_ctx_setopt(nng_ctx, const char *, const void *, size_t);
extern int nng_ctx_setopt_bool(nng_ctx, const char *, bool);
extern int nng_ctx_setopt_int(nng_ctx, const char *, int);
extern int nng_ctx_setopt_ms(nng_ctx, const char *, nng_duration);
extern int nng_ctx_setopt_size(nng_ctx, const char *, size_t);
extern int nng_ctx_get(nng_ctx, const char *, void *, size_t *);
extern int nng_ctx_get_bool(nng_ctx, const char *, bool *);
extern int nng_ctx_get_int(nng_ctx, const char *, int *);
Expand All @@ -254,6 +198,7 @@ extern char *nng_strdup(const char *);
extern void nng_strfree(char *);
extern int nng_aio_alloc(nng_aio **, void (*)(void *), void *);
extern void nng_aio_free(nng_aio *);
extern void nng_aio_reap(nng_aio *);
extern void nng_aio_stop(nng_aio *);
extern int nng_aio_result(nng_aio *);
extern size_t nng_aio_count(nng_aio *);
Expand All @@ -276,6 +221,8 @@ extern void nng_sleep_aio(nng_duration, nng_aio *);
extern int nng_msg_alloc(nng_msg **, size_t);
extern void nng_msg_free(nng_msg *);
extern int nng_msg_realloc(nng_msg *, size_t);
extern int nng_msg_reserve(nng_msg *, size_t);
extern size_t nng_msg_capacity(nng_msg *);
extern void * nng_msg_header(nng_msg *);
extern size_t nng_msg_header_len(const nng_msg *);
extern void * nng_msg_body(nng_msg *);
Expand Down Expand Up @@ -317,16 +264,6 @@ extern void nng_msg_clear(nng_msg *);
extern void nng_msg_header_clear(nng_msg *);
extern void nng_msg_set_pipe(nng_msg *, nng_pipe);
extern nng_pipe nng_msg_get_pipe(const nng_msg *);
extern int nng_msg_getopt(nng_msg *, int, void *, size_t *);
extern int nng_pipe_getopt(nng_pipe, const char *, void *, size_t *);
extern int nng_pipe_getopt_bool(nng_pipe, const char *, bool *);
extern int nng_pipe_getopt_int(nng_pipe, const char *, int *);
extern int nng_pipe_getopt_ms(nng_pipe, const char *, nng_duration *);
extern int nng_pipe_getopt_size(nng_pipe, const char *, size_t *);
extern int nng_pipe_getopt_sockaddr(nng_pipe, const char *, nng_sockaddr *);
extern int nng_pipe_getopt_uint64(nng_pipe, const char *, uint64_t *);
extern int nng_pipe_getopt_ptr(nng_pipe, const char *, void **);
extern int nng_pipe_getopt_string(nng_pipe, const char *, char **);
extern int nng_pipe_get(nng_pipe, const char *, void *, size_t *);
extern int nng_pipe_get_bool(nng_pipe, const char *, bool *);
extern int nng_pipe_get_int(nng_pipe, const char *, int *);
Expand Down Expand Up @@ -369,6 +306,7 @@ enum nng_unit_enum {
NNG_UNIT_EVENTS = 4
};
extern uint64_t nng_stat_value(nng_stat *);
extern bool nng_stat_bool(nng_stat *);
extern const char *nng_stat_string(nng_stat *);
extern const char *nng_stat_desc(nng_stat *);
extern uint64_t nng_stat_timestamp(nng_stat *);
Expand Down Expand Up @@ -591,9 +529,8 @@ int nng_tls_config_version(
const char *nng_tls_engine_name(void);
const char *nng_tls_engine_description(void);
bool nng_tls_engine_fips_mode(void);
int nng_tls_register(void);
#define NNG_FLAG_ALLOC 1u // Recv to allocate receive buffer
#define NNG_FLAG_NONBLOCK 2u // Non-blocking operations
#define NNG_MAJOR_VERSION 1
#define NNG_MINOR_VERSION 4
#define NNG_PATCH_VERSION 0
#define NNG_MINOR_VERSION 5
#define NNG_PATCH_VERSION 2
4 changes: 2 additions & 2 deletions pynng/_aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ async def wait_for_aio():
except curio.CancelledError:
if fut.cancelled():
lib.nng_aio_cancel(aio.aio)

err = lib.nng_aio_result(aio.aio)
if err == lib.NNG_ECANCELED:
raise curio.CancelledError()
check_err(err)

def callback():
if not fut.cancelled():
fut.set_result(True)
Expand Down
66 changes: 60 additions & 6 deletions pynng/nng.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ def __set__(self, instance, value):
self.__class__._setter(instance, self.option, value)


class ArbitraryOption(_NNGOption):
"""Descriptor for getting/setting arbitrary options"""
_getter = options._getopt_arbitrary
_setter = options._setopt_arbitrary


class IntOption(_NNGOption):
"""Descriptor for getting/setting integer options"""
_getter = options._getopt_int
Expand Down Expand Up @@ -279,7 +285,10 @@ class Socket:
tcp_nodelay = BooleanOption('tcp-nodelay')
tcp_keepalive = BooleanOption('tcp-keepalive')

tls_config = PointerOption('tls-config')
#tls_config = PointerOption('tls-config')
# Sockets are Transport agnostic.
# The tls-config is transport specific and has to be set on the listener/dialer
tls_config = None

def __init__(self, *,
dial=None,
Expand Down Expand Up @@ -373,8 +382,6 @@ def dial(self, address, *, block=None):
try:
return self.dial(address, block=True)
except pynng.ConnectionRefused:
msg = 'Synchronous dial failed; attempting asynchronous now'
logger.exception(msg)
return self.dial(address, block=False)
else:
return self._dial(address, flags=lib.NNG_FLAG_NONBLOCK)
Expand All @@ -386,11 +393,21 @@ def _dial(self, address, flags=0):

"""
dialer = ffi.new('nng_dialer *')
ret = lib.nng_dial(self.socket, to_char(address), dialer, flags)
if self.tls_config:
ret = lib.nng_dialer_create(dialer, self.socket, to_char(address))
else:
ret = lib.nng_dial(self.socket, to_char(address), dialer, flags)
check_err(ret)
# we can only get here if check_err doesn't raise
d_id = lib.nng_dialer_id(dialer[0])
py_dialer = Dialer(dialer, self)
if self.tls_config:
py_dialer.tls_config = self.tls_config
lib.nng_dialer_start(dialer[0], flags)
# FIXME: Set the tls_config to None here
# If one wants another dialer with the same tls_config it
# has to be set again which might be confusing
self.tls_config = None
self._dialers[d_id] = py_dialer
return py_dialer

Expand All @@ -401,11 +418,21 @@ def listen(self, address, flags=0):

"""
listener = ffi.new('nng_listener *')
ret = lib.nng_listen(self.socket, to_char(address), listener, flags)
if self.tls_config:
ret = lib.nng_listener_create(listener, self.socket, to_char(address))
else:
ret = lib.nng_listen(self.socket, to_char(address), listener, flags)
check_err(ret)
# we can only get here if check_err doesn't raise
l_id = lib.nng_listener_id(listener[0])
py_listener = Listener(listener, self)
if self.tls_config:
py_listener.tls_config = self.tls_config
lib.nng_listener_start(listener[0], flags)
# FIXME: Set the tls_config to None here
# If one wants another listener with the same tls_config it
# has to be set again which might be confusing
self.tls_config = None
self._listeners[l_id] = py_listener
return py_listener

Expand Down Expand Up @@ -886,11 +913,38 @@ def subscribe(self, topic):
desired behavior, just pass :class:`bytes` in as the topic.

"""
options._setopt_string(self, b'sub:subscribe', topic)
options._setopt_arbitrary(self, b'sub:subscribe', topic)

def unsubscribe(self, topic):
"""Unsubscribe to the specified topic.

.. Note::

If you pass a :class:`str` as the ``topic``, it will be
automatically encoded with :meth:`str.encode`. If this is not the
desired behavior, just pass :class:`bytes` in as the topic.

"""
options._setopt_arbitrary(self, b'sub:unsubscribe', topic)

def subscribe_string(self, topic):
"""Subscribe to the specified topic.

Topics are matched by looking at the first bytes of any received
message.

.. Note::

If you pass a :class:`str` as the ``topic``, it will be
automatically encoded with :meth:`str.encode`. If this is not the
desired behavior, just pass :class:`bytes` in as the topic.

"""
options._setopt_string(self, b'sub:subscribe', topic)

def unsubscribe_string(self, topic):
"""Unsubscribe to the specified topic.

.. Note::

If you pass a :class:`str` as the ``topic``, it will be
Expand Down
Loading