Skip to content
This repository has been archived by the owner on Aug 24, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1 from simonhamp/patch-laravel-5.5
Browse files Browse the repository at this point in the history
Update for Laravel 5.5
  • Loading branch information
simonhamp authored Nov 16, 2017
2 parents 110981c + d79f089 commit 049824c
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 86 deletions.
30 changes: 24 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,36 @@
# laravel-zmq
# Laravel ZeroMQ

subscribe laravel worker to zmq notifications i.e to verify btcoind tx
A Laravel wrapper for `ext-zmq` that exposes a `zmq` broadcast driver to publish your Laravel events via ZeroMQ.

## Requirements
* zmq php ext
* laravel 5.2

- PHP 7.1
- Laravel 5.5
- ZeroMQ
- ext-zmq for PHP

## Installation
include the service provider into your laravel app

```bash
$ composer require pelim/laravel-zmq
```

`php artisan vendor:publish`
The service provider is loaded automatically in Laravel 5.5 using Package Autodiscovery.

Publish vendor files to create your `config/zmq.php` file

```bash
$ php artisan vendor:publish --provider="Pelim\ZmqServiceProvider"
```

Update your `config/zmq.php` with the appropriate socket details.

Set `BROADCASTING_DRIVER=zmq` in your `.env` and add the following ZeroMQ connection settings to your `config/broadcasting.php`:

```php
'connections' => [
'zmq' => [
'driver' => 'zmq',
],
]
```
19 changes: 13 additions & 6 deletions composer.json
Original file line number Diff line number Diff line change
@@ -1,24 +1,31 @@
{
"name": "pelim/laravel-zmq",
"description": "zmq support for Laravel5",
"description": "A ZeroMQ broadcast driver for Laravel 5",
"license": "MIT",
"authors": [
{
"name": "pelim",
"email": "[email protected]"
}
],

"require": {
"ext-zmq": "*",
"illuminate/broadcasting": "5.2.*",
"illuminate/contracts": "5.2.*"
"illuminate/broadcasting": "5.5.*",
"illuminate/contracts": "5.5.*",
"illuminate/http": "5.5.*"
},
"autoload": {
"psr-4": {
"Pelim\\LaravelZmq\\": "src/"
}
},
"minimum-stability": "dev",
"prefer-stable": false
"extra": {
"laravel": {
"providers": [
"Pelim\\LaravelZmq\\ZmqServiceProvider"
]
}
},
"minimum-stability": "dev",
"prefer-stable": false
}
8 changes: 4 additions & 4 deletions config/zmq.php
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
<?php


return [

'default' => 'publish',

'connections' => [

'publish' => [
'dsn' => 'tcp://127.0.0.1:5555',
'method' => \ZMQ::SOCKET_PUB,
],

'subscribe' => [
'dsn' => 'tcp://0.0.0.0:5555',
'method' => \ZMQ::SOCKET_SUB,
],

]
];
];
57 changes: 50 additions & 7 deletions src/Broadcasting/Broadcaster/ZmqBroadcaster.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@

namespace Pelim\LaravelZmq\Broadcasting\Broadcaster;

use Illuminate\Contracts\Broadcasting\Broadcaster;
use Pelim\LaravelZmq\Connector\ZmqConnector;
use Pelim\LaravelZmq\Zmq;
use Illuminate\Http\Request;
use Pelim\LaravelZmq\Connector\ZmqConnector;
use Illuminate\Broadcasting\Broadcasters\Broadcaster;
use Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException;

/**
* Class ZmqBroadcaster
* @package Pelim\LaravelZmq\Broadcasting\Broadcaster
*/
class ZmqBroadcaster implements Broadcaster
class ZmqBroadcaster extends Broadcaster
{

/**
* @var Zmq
*/
protected $zmq;


/**
* ZmqBroadcaster constructor.
Expand All @@ -27,12 +27,55 @@ public function __construct(Zmq $zmq)
{
$this->zmq = $zmq;
}

/**
* {@inheritdoc}
*/
public function broadcast(array $channels, $event, array $payload = [])
{
$this->zmq->publish($channels, $event, $payload, 'publish');
}
}

/**
* Authenticate the incoming request for a given channel.
*
* @param \Illuminate\Http\Request $request
* @return mixed
* @throws \Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException
*/
public function auth($request)
{
if (Str::startsWith($request->channel_name, ['private-', 'presence-']) &&
! $request->user()) {
throw new AccessDeniedHttpException;
}

$channelName = Str::startsWith($request->channel_name, 'private-')
? Str::replaceFirst('private-', '', $request->channel_name)
: Str::replaceFirst('presence-', '', $request->channel_name);

return parent::verifyUserCanAccessChannel(
$request,
$channelName
);
}

/**
* Return the valid authentication response.
*
* @param \Illuminate\Http\Request $request
* @param mixed $result
* @return mixed
*/
public function validAuthenticationResponse($request, $result)
{
if (is_bool($result)) {
return json_encode($result);
}

return json_encode(['channel_data' => [
'user_id' => $request->user()->getAuthIdentifier(),
'user_info' => $result,
]]);
}
}
6 changes: 3 additions & 3 deletions src/Connector/ZmqConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
* Class ZmqConnector
* @package Pelim\LaravelZmq\Connector
*/
abstract class ZmqConnector {

abstract class ZmqConnector
{
protected $connection;

public function __construct($connection)
Expand All @@ -24,4 +24,4 @@ protected function dsn()
{
return \Config::get(sprintf('zmq.connections.%s.dsn', $this->connection), 'tcp://127.0.0.1:5555');
}
}
}
13 changes: 7 additions & 6 deletions src/Connector/ZmqPublish.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
* Class ZmqConnector
* @package Pelim\LaravelZmq\Connector
*/
class ZmqPublish extends ZmqConnector {
class ZmqPublish extends ZmqConnector
{
/**
* ZmqPublish constructor.
* @param string $connection
Expand All @@ -18,17 +18,18 @@ public function __construct($connection = 'publish')
}

/**
* Connect to the socket for publishing.
* @return \ZMQSocket
*/
public function connect()
public function connect()
{
$context = new \ZMQContext();
$socket = new \ZMQSocket($context, \ZMQ::SOCKET_PUB);
$socket_method = \Config::get(sprintf('zmq.connections.%s.method', $this->connection), \ZMQ::SOCKET_PUB);
$socket = $context->getSocket($socket_method);
$socket->connect($this->dsn());

// @need some sleep at :-(
usleep(500);

return $socket;
}
}
}
16 changes: 11 additions & 5 deletions src/Connector/ZmqSubscribe.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
* Class ZmqConnector
* @package Pelim\LaravelZmq\Connector
*/
class ZmqSubscribe extends ZmqConnector {

class ZmqSubscribe extends ZmqConnector
{
/**
* ZmqPublish constructor.
* @param string $connection
Expand All @@ -17,12 +17,18 @@ public function __construct($connection = 'subscribe')
parent::__construct($connection);
}

/**
* Connect to the socket for subscribing.
*
* @return \ZMQSocket
*/
public function connect()
{
$context = new \ZMQContext();
$socket = new \ZMQSocket($context, \ZMQ::SOCKET_SUB);
$socket_method = \Config::get(sprintf('zmq.connections.%s.method', $this->connection), \ZMQ::SOCKET_SUB);
$socket = $context->getSocket($socket_method);
$socket->bind($this->dsn());

return $socket;
}
}
}
21 changes: 7 additions & 14 deletions src/Zmq.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ public function __construct(array $connections = [])
public function connection($connection = null)
{

if(!$connection) {
$connection = \Config::get('zmg.default');
if (! $connection) {
$connection = \Config::get('zmq.default');
}

return \App::make(sprintf('zmq.connection.%s', $connection))->connect();
}

Expand All @@ -48,7 +48,6 @@ public function connection($connection = null)
*/
public function subscribe(array $channels, \Closure $callback, $connection = 'subscribe')
{

$connection = $this->connection($connection);

foreach ($channels as $channel) {
Expand All @@ -60,7 +59,6 @@ public function subscribe(array $channels, \Closure $callback, $connection = 'su
}

while (true) {

$channel = $connection->recv();
$payload = $connection->recv();

Expand All @@ -70,11 +68,9 @@ public function subscribe(array $channels, \Closure $callback, $connection = 'su
$payload = [$payload];
}


call_user_func($callback, $payload, $channel);

usleep(10);

}
}

Expand All @@ -88,22 +84,19 @@ public function publish(array $channels, $event, $payload = [], $connection = 'p
{
$connection = $this->connection($connection);

if($payload) {
if ($payload) {
$payload = json_encode(['event' => $event, 'payload' => $payload]);
} else {
$payload = $event;
}


foreach($channels as $channel) {

foreach ($channels as $channel) {
\Log::debug('zmq.publish', [
'channel' => $channel,
'channel' => $channel->name,
'payload' => $payload
]);

$connection->send($channel, \ZMQ::MODE_SNDMORE)->send($payload);
}
}

}
}
Loading

0 comments on commit 049824c

Please sign in to comment.