Skip to content

Commit

Permalink
add tests for publishserver/publishclient
Browse files Browse the repository at this point in the history
  • Loading branch information
Sxderp committed Jul 30, 2024
1 parent d3df62d commit 4d1692c
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 11 deletions.
92 changes: 86 additions & 6 deletions tests/pytests/functional/transport/tcp/test_pub_server.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,104 @@
import os
import stat
import time

import tornado.gen

import salt.transport.tcp


async def test_pub_channel_ipc(master_opts, minion_opts, io_loop):
def presence_callback(client):
pass

def remove_presence_callback(client):
pass

master_opts = master_opts.copy()
master_opts.update(transport="tcp")
minion_opts.update(transport="tcp")

pub_path = os.path.join(master_opts["sock_dir"], "master_event_pub.ipc")
pull_path = os.path.join(master_opts["sock_dir"], "master_event_pull.ipc")

if os.path.exists(pub_path):
os.path.unlink(pub_path)

server = salt.transport.tcp.PublishServer(
master_opts,
pub_path=pub_path,
pull_path=pull_path,
)

client = salt.transport.tcp.PublishClient(
minion_opts,
io_loop,
path=pub_path,
)

payloads = []

publishes = []

async def publish_payload(payload, callback):
await server.publish_payload(payload)
payloads.append(payload)

async def on_recv(message):
publishes.append(message)

io_loop.add_callback(
server.publisher,
publish_payload,
presence_callback,
remove_presence_callback,
io_loop=io_loop,
)

# Wait for socket to bind.
await tornado.gen.sleep(3)

perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP
assert (os.stat(pub_path).st_mode & perms) == perms

await client.connect()
client.on_recv(on_recv)

await server.publish({"meh": "bah"})

start = time.monotonic()
try:
while not publishes:
await tornado.gen.sleep(0.3)
if time.monotonic() - start > 30:
assert False, "Message not published after 30 seconds"
assert payloads
finally:
server.close()
server.pub_server.close()
client.close()


async def test_pub_channel(master_opts, minion_opts, io_loop):
def presence_callback(client):
pass

def remove_presence_callback(client):
pass

master_opts["transport"] = "tcp"
master_opts = master_opts.copy()
master_opts.update(transport="tcp", ipc_mode="tcp")
minion_opts.update(master_ip="127.0.0.1", transport="tcp")

server = salt.transport.tcp.TCPPublishServer(
server = salt.transport.tcp.PublishServer(
master_opts,
pub_host="127.0.0.1",
pub_port=master_opts["publish_port"],
pull_path=os.path.join(master_opts["sock_dir"], "publish_pull.ipc"),
pull_host="127.0.0.1",
pull_port=master_opts["tcp_master_publish_pull"],
)

client = salt.transport.tcp.TCPPubClient(
client = salt.transport.tcp.PublishClient(
minion_opts,
io_loop,
host="127.0.0.1",
Expand All @@ -42,13 +117,17 @@ async def on_recv(message):
publishes.append(message)

io_loop.add_callback(
server.publisher, publish_payload, presence_callback, remove_presence_callback
server.publisher,
publish_payload,
presence_callback,
remove_presence_callback,
io_loop=io_loop,
)

# Wait for socket to bind.
await tornado.gen.sleep(3)

await client.connect(master_opts["publish_port"])
await client.connect()
client.on_recv(on_recv)

await server.publish({"meh": "bah"})
Expand All @@ -59,6 +138,7 @@ async def on_recv(message):
await tornado.gen.sleep(0.3)
if time.monotonic() - start > 30:
assert False, "Message not published after 30 seconds"
assert payloads
finally:
server.close()
server.pub_server.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import os
import random
import stat
import time
from contextlib import contextmanager

Expand Down Expand Up @@ -246,21 +247,41 @@ def test_zeromq_filtering_broadcast(salt_master, salt_minion):
), f"{len(results)}, != {len(expect)}, difference: {expect.difference(results)} {results}"


async def test_pub_channel(master_opts, io_loop):
async def test_pub_channel_ipc(master_opts, minion_opts, io_loop):

master_opts = master_opts.copy()
master_opts.update(transport="zeromq")
minion_opts.update(transport="zeromq")

pub_path = os.path.join(master_opts["sock_dir"], "master_event_pub.ipc")
pull_path = os.path.join(master_opts["sock_dir"], "master_event_pull.ipc")

if os.path.exists(pub_path):
os.path.unlink(pub_path)

server = salt.transport.zeromq.PublishServer(
master_opts,
pub_host="127.0.0.1",
pub_port=4506,
pull_path=os.path.join(master_opts["sock_dir"], "publish_pull.ipc"),
pub_path=pub_path,
pull_path=pull_path,
)

client = salt.transport.zeromq.PublishClient(
minion_opts,
io_loop,
path=pub_path,
)

payloads = []

publishes = []

async def publish_payload(payload):
await server.publish_payload(payload)
payloads.append(payload)

async def on_recv(message):
publishes.append(message)

io_loop.add_callback(
server.publisher,
publish_payload,
Expand All @@ -269,18 +290,81 @@ async def publish_payload(payload):

await asyncio.sleep(3)

perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP
assert (os.stat(pub_path).st_mode & perms) == perms

await client.connect()
client.on_recv(on_recv)

await server.publish(salt.payload.dumps({"meh": "bah"}))

start = time.monotonic()
try:
while not publishes:
await asyncio.sleep(0.3)
if time.monotonic() - start > 30:
assert False, "No message received after 30 seconds"
assert payloads
finally:
server.close()
client.close()


async def test_pub_channel(master_opts, minion_opts, io_loop):

master_opts = master_opts.copy()
master_opts.update(transport="zeromq", ipc_mode="tcp")
minion_opts.update(master_ip="127.0.0.1", transport="zeromq")

server = salt.transport.zeromq.PublishServer(
master_opts,
pub_host="127.0.0.1",
pub_port=master_opts["publish_port"],
pull_path="127.0.0.1",
pull_port=master_opts["tcp_master_publish_pull"],
)

client = salt.transport.zeromq.PublishClient(
minion_opts,
io_loop,
host="127.0.0.1",
port=master_opts["publish_port"],
)

payloads = []

publishes = []

async def publish_payload(payload):
await server.publish_payload(payload)
payloads.append(payload)

async def on_recv(message):
publishes.append(message)

io_loop.add_callback(
server.publisher,
publish_payload,
ioloop=io_loop,
)

await asyncio.sleep(3)

await client.connect()
client.on_recv(on_recv)

await server.publish(salt.payload.dumps({"meh": "bah"}))

start = time.monotonic()
try:
while not payloads:
while not publishes:
await asyncio.sleep(0.3)
if time.monotonic() - start > 30:
assert False, "No message received after 30 seconds"
assert payloads
finally:
server.close()
client.close()


async def test_pub_channel_filtering(master_opts, io_loop):
Expand Down

0 comments on commit 4d1692c

Please sign in to comment.