CakePHP+Pheanstalk+Beanstalk+Supervisord

Written by James McDonald

December 28, 2016

UI Lockups due to long running tasks

Recently coded a CakePHP 3 application that would send about 30-60 emails when a user triggered the ‘notify’ action. When they did this it took probably 30 – 40 seconds for control to return to user.

So I was looking at how to fix this.

Basically a way of fixing this delay is to quickly put the notification tasks (jobs in beanstalk speak) into a queue and then have a worker script poll the queue for new jobs, then send them to a back end process to (asynchronously) do at it’s leisure instead of holding the user up waiting for it to complete.

What follows is a rough guide on how I got it all working. With the usual IANAP disclaimer.

 

 

Some references that I found most hepful:

This lady knows her PHP stuff:

Working with PHP and Beanstalkd

The makers of Pheanstalk: https://github.com/pda/pheanstalk#pheanstalk

If you want a really really good overview of pheanstalk / beanstalk check out this youtube video. This was especially helpful in showing how you should avoid hundreds of lines of procedural code and create modular classes to handle the different possible job classes. It also lays out a suggested structure for the jobs you submit to the pheanstalk queueing server.

Calling the pheanstalk to submit a job

The code to submit jobs to the beanstalkd queueing  software from with in a cake action. src/CustomClasses/SendJob.php

<?php

namespace App\CustomClasses;
use Pheanstalk\Pheanstalk;


class SendJob {
    
    private $host = '127.0.0.1';
    private $port = '11300';
    
    function __construct() {
        $this->pheanStalk = new Pheanstalk($this->host . ":" . $this->port);
    }
    
    function send($data){
        if($this->pheanStalk->getConnection()->isServiceListening()){
                $send_payload = json_encode($data);
                $this->pheanStalk->useTube($data['q'])->put($send_payload);
        } else {
            return 'The beanstalkd service is not available';
        }
        
       
    }
    
}

 

Sending Jobs to Beanstalk

From a controller or model you can send a job to beanstalk for queuing

<?php

use App\CustomClasses\SendJob;

class NotificationController extends Controller {
    // ...
    public function notify(){
        
        $data = [
                   'job' => 'send_email',
                   'q' => 'send_email',
                   'subject' => 'My Test Subject',
                   'body' => "A body",
                   'to_target' => 'james@jmits.com.au',
                   'more_stuff' => '... Limited by your imagination & requirements'
        ];
 
        $pheanstalk = new SendJob;
        $pheanstalk->sendJob($data)
        /* probably should have some error checking in here */

    }

Supervisor Configuration

This is the supervisord configuration for running the bin/cake worker shell (see below)

Put it in /etc/supervisord.conf or edit /etc/supervisord.conf and add an [include] section and point it to your supervisord config living in your cake projects source control.

CentOS 6.8 supervisor bug

I found that the version of supervisor installed from the package manager on CentOS 6.8 ( supervisor-2.1-9.el6.noarch ) has a bug. To fix this I backed /etc/init.d/supervisord  and then yum removed supervisor and then installed supervisor using pip install supervisor which pulled down the latest copy without the process_num not expanding bug. Once you have installed it via pip you copy supervisord back to /etc/init.d and then chkconfig –add supervisord && chkconfig supervisord on.

# in /etc/supervisord.conf

[include] files = /home/username/cakeproject/config/supervisor.d/*.ini

 

# put this in /home/username/cakeproject/config/supervisor.d/worker.ini
[program:bsw]
command=/home/username/cakeproject/bin/cake worker runbsw %(process_num)02d
directory=/home/username/cakeproject
process_name=%(program_name)s_%(process_num)02d
autostart=true
autorestart=true
numprocs=2
startretries=3
# stderr_logfile=/var/log/bsw.stderr
stdout_logfile=/var/log/bsw.log
redirect_stderr=true
stdout_logfile_backups=3
user=melbdc
# environment=PATH=/usr/local/bin:/usr/local/sbin:%(ENV_PATH)s

 

CakePHP Worker

In cake src/Shell/WorkerShell.php you can run this from the command line via bin/cake worker runbsw 01

<?php

namespace App\Shell;

use App\MelbDC\BeanstalkdWorker;
use Cake\Console\Shell;
use Cake\Log\LogTrait;

class WorkerShell extends Shell {

    use LogTrait;

    public function initialize() {
        parent::initialize();
        # $this->loadModel('Deliveries');
        # you can do al sorts of cakian stuff using
        # the cake cli
    }

    public function getOptionParser() {
        $parser = parent::getOptionParser();
        $parser
                ->addArgument('runbsw', [
                    'help' => 'Run the pheanstalk worker'
                ])->description(['Runs the worker for the beanstalkd qeueing system'])
                ->addArgument('procnum', [
                    'help' => 'Process number'
        ]);
        return $parser;
    }

    public function runbsw() {

        $this->log("Starting BeanstalkdWorker", 'info');

        $bsw = new BeanstalkdWorker;
        $bsw->run();

        $this->log("Stopping BeanstalkdWorker", 'info');
    }

}

WorkerShell calls Email or SMS send classes

The pheanstalk class that processes jobs received from the workers

<?php

namespace App\MelbDC;

use Pheanstalk\Pheanstalk;
use Cake\Mailer\Email;
use App\CustomClasses\SMSSendClass;
use Cake\Log\Log;
use Cake\ORM\TableRegistry;

class BeanstalkdWorker {

    function __construct() {
        $this->pheanStalk = new Pheanstalk('127.0.0.1');
    }

    function run() {

        $this->pheanStalk->watch('send_email')
                ->watch('send_sms')
                ->ignore('default');

        Log::write('info', "Worker start", 'notification');

        $ctr = 0;
        while ($job = $this->pheanStalk->reserve()) {

            $job_data = $job->getData();
            Log::write('info', "Worker: start loop", 'notification');
            $ctr++;

            $received = json_decode($job_data, true);
            $received['counter'] = $ctr;
            if ($this->$received['job']($received)) {
                $this->pheanStalk->delete($job);
                Log::info("run loop Sucess:", 'notification');
            } else {
                $this->pheanStalk->delete($job);
                Log::error("run loop Fail:", 'notification');
            }
            Log::write('info', "Worker: end loop", 'notification');
        }
    }

    function send_email($data) {
        $job_data = json_encode($data);
        try {
            $email = new Email();
            $email->addTo($data['notify_to']);
            $email->subject($data['subject']);
            if ($email->send($data['msg'])) {
                Log::write('info', "Mail success: " . $data['notify_to'], 'notification');

                return true;
            } else {
                return false;
                Log::write('error', "Mail fail:" . $data['notify_to'], 'notification');
            }
        } catch (Exception $e) {
            Log::write('error', ['e' => $e, 'msg' => "Mail exception"], 'notification');
        }
    }

    function send_sms($data) {

        $sms = new SMSSendClass;
        try {

            if ($sms->sendSMS($data)) {
                Log::write('info', "Worker send_sms success " . $data['notify_to'], 'notification');
                return true;
            } else {
                Log::write('error', "Worker send_sms fail " . $data['notify_to'], 'notification');
                return false;
            }
        } catch (Exception $e) {
            Log::write('error', ['e' => $e, 'msg' => "SMS exception"], 'notification');
        }
    }

}

0 Comments

Submit a Comment

Your email address will not be published. Required fields are marked *

You May Also Like…

How to Research a CPU Upgrade

How to Research a CPU Upgrade

Upgrade Time! Doing a lot of VMWare Workstation virtualization to create labs for self-study and training. Finding...