alistairphillips.com

I’m : a web and mobile developer based in the Australia.


Queueing with PHP and Zend_Queue

As your site or application grows you'll often find the need to queueing, a classic example of this would be email members of a mailing list. It's a fact of life that scripts timeout so trying to send 10 000 emails from within your browser is not a good idea. A queue system would solve this, simply add new items to the end of the queue and process each in turn. Zend_Queue will do this for you and is new as of ZF1.9

The premise is that you create a queue with new Zend_Queue() and then 'send' and 'receive' from the queue. When you configure the queue you say how what backend store you'd like to use. These include database (via Zend_Db), arrays, memcache etc. The sample code creates an array based queue called 'barnyard' and then adds some animals in.

Code sample

    // Create our queue
    $options = array( 'name' => 'barnyard' );
    $queue = new Zend_Queue( 'Array', $options );

    // Add an item
    $queue->send( serialize( ( array( 'animal' => 'dog' ) ) ) );
    echo 'queue count = ' . count( $queue ) . '<br />';

    // Retrieve upto a maximum of 5 messages
    $messages = $queue->receive( 5 );

    foreach( $messages as $key => $val ) {
        echo 'message_body = ' . $val->body . '<br />';
        //$queue->deleteMessage( $val );

        // Now let's add a new message and and iterate over again
        $queue->send( serialize( ( array( 'animal' => 'cat' ) ) ) );
        $m2 = $queue->receive ( 5 );
        foreach( $m2 as $key2 => $val2 ) {
            echo 'message_body = ' . $val2->body . '<br />';
        }
    }

    echo 'queue count = ' . count( $queue ) . '<br />';

And the best part about this system is that once you retrieve a message it's flagged (in the appropriate backend store) so that if another process were to run it won't be returned as a list of awaiting messages. That's what adding the cat inside the loop was showing. The body of the message can't be anything more complex that a string but you can serialise an array or fill it up with JSON.

This SQL can be used to determine if any of the queue items are currently active. For example your main progressing script might execute this as the first thing it does. If it comes back with a count of 0 then grab some work items and processs away. Anything other than 0 means that another script is busy.

SELECT COUNT(*) queue_messages_in_progress FROM message WHERE handle IS NOT NULL OR timeout IS NOT NULL

MSSQL Tables

CREATE TABLE [dbo].[queue] (
    [queue_id] [int] IDENTITY (1, 1) NOT NULL ,
    [queue_name] [varchar] (100) COLLATE SQL_Latin1_General_CP1_CI_AS NOT NULL ,
    [timeout] [smallint] NOT NULL
) ON [PRIMARY]
GO

ALTER TABLE [dbo].[queue] WITH NOCHECK ADD
    CONSTRAINT [pk_queue_id] PRIMARY KEY  CLUSTERED
    (
        [queue_id]
    )  ON [PRIMARY]
GO

CREATE TABLE [dbo].[message] (
    [message_id] [bigint] IDENTITY (1, 1) NOT NULL ,
    [queue_id] [int] NOT NULL ,
    [handle] [varchar] (32) COLLATE SQL_Latin1_General_CP1_CI_AS NULL ,
    [body] [varchar] (8000) COLLATE SQL_Latin1_General_CP1_CI_AS NOT NULL ,
    [md5] [char] (32) COLLATE SQL_Latin1_General_CP1_CI_AS NOT NULL ,
    [timeout] [decimal](14, 4) NULL ,
    [created] [int] NOT NULL
) ON [PRIMARY]
GO

ALTER TABLE [dbo].[message] WITH NOCHECK ADD
    CONSTRAINT [pk_message_id] PRIMARY KEY  CLUSTERED
    (
        [message_id]
    )  ON [PRIMARY]
GO

ALTER TABLE [dbo].[message] ADD
    CONSTRAINT [fk_queueid_queue] FOREIGN KEY
    (
        [queue_id]
    ) REFERENCES [dbo].[queue] (
        [queue_id]
    ) ON DELETE CASCADE  ON UPDATE CASCADE
GO