StompQueueManager.php

Summary
StompQueueManager.php
StompQueueManager
__constructClass constructor
_connectLazy open a single connection to Stomp queue server.
_connectAllLazy open connections to all Stomp servers, if in manual failover mode.
_doConnectHandles connecting to a single STOMP server
_doEnqueueSaves a notice object reference into the queue item table on the given connection.
_reconnectAttempt to manually reconnect to the STOMP server for the given slot.
multiSiteTell the i/o master we only need a single instance to cover all sites running in this process.
sendControlSignalOptional; ping any running queue handler daemons with a notification such as announcing a new site to handle or requesting clean shutdown.
enqueueSaves an object into the queue item table.
isPersistentDetermine whether messages to this queue should be marked as persistent.
getSocketsSend any sockets we’re listening on to the IO manager to wait for input.
connectionFromSocketGet the Stomp connection object associated with the given socket.
handleInputWe’ve got input to handle on our socket! 
idleAttempt to reconnect in background if we lost a connection.
startInitialize our connection and subscribe to all the queues we’re going to need to handle...
finishClose out any active connections.
doSubscribeSet up all our raw queue subscriptions on the given connection
subscriptionsGrab a full list of stomp-side queue subscriptions.
handleItemHandle and acknowledge an event that’s come in through a queue.
isDeadLetterCheck if a redelivered message has been run through enough that we’re going to give up on it.
incDeliveryCountUpdate count of times we’ve re-encountered this message recently, triggered when we get a message marked as ‘redelivered’.
handleControlSignalProcess a control signal broadcast.
switchSiteSwitch site, if necessary, and reset current handler assignments
updateSiteConfig(Re)load runtime configuration for a given site by nickname, triggered by a broadcast to the ‘statusnet-control’ topic.
queueNameCombines the queue_basename from configuration with the group name for this queue to give eg:
breakoutModeGet the breakout mode for the given queue on the current site.
begin
ack
commitCommit a STOMP transaction
rollbackRollback a STOMP transaction

StompQueueManager

Varibales

  • protected $servers;
  • protected $username;
  • protected $password;
  • protected $base;
  • protected $control;
  • protected $useTransactions;
  • protected $useAcks;
  • protected $sites = array();
  • protected $subscriptions = array();
  • protected $cons = array(); // all open connections
  • protected $disconnect = array();
  • protected $transaction = array();
  • protected $transactionCount = array();
  • protected $defaultIdx = 0;
Summary
__constructClass constructor
_connectLazy open a single connection to Stomp queue server.
_connectAllLazy open connections to all Stomp servers, if in manual failover mode.
_doConnectHandles connecting to a single STOMP server
_doEnqueueSaves a notice object reference into the queue item table on the given connection.
_reconnectAttempt to manually reconnect to the STOMP server for the given slot.
multiSiteTell the i/o master we only need a single instance to cover all sites running in this process.
sendControlSignalOptional; ping any running queue handler daemons with a notification such as announcing a new site to handle or requesting clean shutdown.
enqueueSaves an object into the queue item table.
isPersistentDetermine whether messages to this queue should be marked as persistent.
getSocketsSend any sockets we’re listening on to the IO manager to wait for input.
connectionFromSocketGet the Stomp connection object associated with the given socket.
handleInputWe’ve got input to handle on our socket! 
idleAttempt to reconnect in background if we lost a connection.
startInitialize our connection and subscribe to all the queues we’re going to need to handle...
finishClose out any active connections.
doSubscribeSet up all our raw queue subscriptions on the given connection
subscriptionsGrab a full list of stomp-side queue subscriptions.
handleItemHandle and acknowledge an event that’s come in through a queue.
isDeadLetterCheck if a redelivered message has been run through enough that we’re going to give up on it.
incDeliveryCountUpdate count of times we’ve re-encountered this message recently, triggered when we get a message marked as ‘redelivered’.
handleControlSignalProcess a control signal broadcast.
switchSiteSwitch site, if necessary, and reset current handler assignments
updateSiteConfig(Re)load runtime configuration for a given site by nickname, triggered by a broadcast to the ‘statusnet-control’ topic.
queueNameCombines the queue_basename from configuration with the group name for this queue to give eg:
breakoutModeGet the breakout mode for the given queue on the current site.
begin
ack
commitCommit a STOMP transaction
rollbackRollback a STOMP transaction

__construct

function __construct()

Class constructor

_connect

protected function _connect()

Lazy open a single connection to Stomp queue server.  If multiple servers are configured, we let the Stomp client library worry about finding a working connection among them.

_connectAll

protected function _connectAll()

Lazy open connections to all Stomp servers, if in manual failover mode.  This means the queue servers don’t speak to each other, so we have to listen to all of them to make sure we get all events.

_doConnect

protected function _doConnect($server)

Handles connecting to a single STOMP server

Parameters

  • $server

Returns

  • $con

Error State

  • Throws a ServerException if unable to connect

_doEnqueue

protected function _doEnqueue($object,  
$queue,  
$idx,  
$siteNickname = null)

Saves a notice object reference into the queue item table on the given connection.

Returns

  • boolean true on success

Error State

  • throws StompException on connection or send error

_reconnect

protected function _reconnect($idx)

Attempt to manually reconnect to the STOMP server for the given slot.  If successful, set up our subscriptions on it.

Parameters

  • $idx

Error States

  • If the connection fails in an error state that is caught and logged

multiSite

public static function multiSite()

Tell the i/o master we only need a single instance to cover all sites running in this process.

sendControlSignal

public function sendControlSignal($event,  
$param = '')

Optional; ping any running queue handler daemons with a notification such as announcing a new site to handle or requesting clean shutdown.  This avoids having to restart all the daemons manually to update configs and such.

Currently only relevant for multi-site queue managers such as STOMP.

Parameters

  • string $event - event key
  • string $param - optional parameter to append to key

Returns

  • boolean success

enqueue

public function enqueue($object,  
$queue,  
$siteNickname = null)

Saves an object into the queue item table.

Parameters

  • mixed $object
  • string $queue
  • string $siteNickname optional override to drop into another site’s queue

Returns

  • boolean true on success

Error State

  • StompException on connection or send error

isPersistent

protected function isPersistent($queue)

Determine whether messages to this queue should be marked as persistent.  Actual persistent storage depends on the queue server’s configuration.

Parameters

string $queue

Returns

  • bool

getSockets

public function getSockets()

Send any sockets we’re listening on to the IO manager to wait for input.

Returns

  • array of resources

connectionFromSocket

protected function connectionFromSocket($socket)

Get the Stomp connection object associated with the given socket.

Parameters

  • resource $socket

Returns

  • int - index into connections list

Error State

  • Throws an exception if the socket doesn’t have an associated STOMP connection

handleInput

public function handleInput($socket)

We’ve got input to handle on our socket!  Read any waiting Stomp frame(s) and process them.

Parameters

  • resource $socket

Returns

  • boolean ok on success

Error State

  • raises a StompException if the connection is lost

idle

function idle()

Attempt to reconnect in background if we lost a connection.

start

public function start($master)

Initialize our connection and subscribe to all the queues we’re going to need to handle...  If multiple queue servers are configured for failover, we’ll listen to all of them.

Side effects: in multi-site mode, may reset site configuration.

Parameters

  • IoMaster $master process/event controller

Returns

  • bool return false on failure

finish

public function finish()

Close out any active connections.

@return bool return false on failure

doSubscribe

protected function doSubscribe(LiberalStomp $con)

Set up all our raw queue subscriptions on the given connection

Parameters

  • LiberalStomp $con

subscriptions

protected function subscriptions()

Grab a full list of stomp-side queue subscriptions.  Will include:

  • control broadcast channel
  • shared group queues for active groups
  • per-handler and per-site breakouts from $config[‘queue’][‘breakout’] that are rooted in the active groups.

Returns

  • array of strings

handleItem

protected function handleItem($frame)

Handle and acknowledge an event that’s come in through a queue.

If the queue handler reports failure, the message is requeued for later.  Missing notices or handler classes will drop the message.

Side effects: in multi-site mode, may reset site configuration to match the site that queued the event.

Parameters

  • StompFrame $frame

Returns

  • bool success

isDeadLetter

protected function isDeadLetter($frame,
$message)

Check if a redelivered message has been run through enough that we’re going to give up on it.

Parameters

  • StompFrame $frame
  • array $message unserialized message body

Returns

  • boolean true if we should discard

incDeliveryCount

function incDeliveryCount($msgId)

Update count of times we’ve re-encountered this message recently, triggered when we get a message marked as ‘redelivered’.

Requires a CLI-friendly cache configuration.

@param string $msgId message-id header from message @return int number of retries recorded

handleControlSignal

protected function handleControlSignal($idx,
$frame)

Process a control signal broadcast.

Parameters

  • int $idx connection index
  • array $frame Stomp frame

Returns

  • bool true to continue; false to stop further processing.

switchSite

function switchSite($site)

Switch site, if necessary, and reset current handler assignments

Parameters

  • string $site

updateSiteConfig

protected function updateSiteConfig($nickname)

(Re)load runtime configuration for a given site by nickname, triggered by a broadcast to the ‘statusnet-control’ topic.

Configuration changes in database should update, but config files might not.

@param array $frame Stomp frame @return bool true to continue; false to stop further processing.

queueName

protected function queueName($queue)

Combines the queue_basename from configuration with the group name for this queue to give eg:

  • /queue/statusnet/main
  • /queue/statusnet/main/distrib
  • /queue/statusnet/xmpp/xmppout/site01

Parameters

  • string $queue

Returns

  • string

breakoutMode

protected function breakoutMode($queue)

Get the breakout mode for the given queue on the current site.

@param string $queue @return string one of ‘shared’, ‘handler’, ‘site’

begin

protected function begin($idx)

ack

protected function ack($idx,
$frame)

commit

protected function commit($idx)

Commit a STOMP transaction

Parameters

int $idxID of transaction

Error State

  • An empty transaction state indicates we aren’t in a transaction and will raise an exception.

rollback

protected function rollback($idx)

Rollback a STOMP transaction

Parameters

  • int $idx - id of transaction

Error State

  • An empty transaction state indicates we aren’t in a transaction and will raise an exception.
function __construct()
Class constructor
protected function _connect()
Lazy open a single connection to Stomp queue server.
protected function _connectAll()
Lazy open connections to all Stomp servers, if in manual failover mode.
protected function _doConnect($server)
Handles connecting to a single STOMP server
protected function _doEnqueue($object,  
$queue,  
$idx,  
$siteNickname = null)
Saves a notice object reference into the queue item table on the given connection.
protected function _reconnect($idx)
Attempt to manually reconnect to the STOMP server for the given slot.
public static function multiSite()
Tell the i/o master we only need a single instance to cover all sites running in this process.
public function sendControlSignal($event,  
$param = '')
Optional; ping any running queue handler daemons with a notification such as announcing a new site to handle or requesting clean shutdown.
public function enqueue($object,  
$queue,  
$siteNickname = null)
Saves an object into the queue item table.
protected function isPersistent($queue)
Determine whether messages to this queue should be marked as persistent.
public function getSockets()
Send any sockets we’re listening on to the IO manager to wait for input.
protected function connectionFromSocket($socket)
Get the Stomp connection object associated with the given socket.
public function handleInput($socket)
We’ve got input to handle on our socket! 
function idle()
Attempt to reconnect in background if we lost a connection.
public function start($master)
Initialize our connection and subscribe to all the queues we’re going to need to handle...
public function finish()
Close out any active connections.
protected function doSubscribe(LiberalStomp $con)
Set up all our raw queue subscriptions on the given connection
protected function subscriptions()
Grab a full list of stomp-side queue subscriptions.
protected function handleItem($frame)
Handle and acknowledge an event that’s come in through a queue.
protected function isDeadLetter($frame,
$message)
Check if a redelivered message has been run through enough that we’re going to give up on it.
function incDeliveryCount($msgId)
Update count of times we’ve re-encountered this message recently, triggered when we get a message marked as ‘redelivered’.
protected function handleControlSignal($idx,
$frame)
Process a control signal broadcast.
function switchSite($site)
Switch site, if necessary, and reset current handler assignments
protected function updateSiteConfig($nickname)
(Re)load runtime configuration for a given site by nickname, triggered by a broadcast to the ‘statusnet-control’ topic.
protected function queueName($queue)
Combines the queue_basename from configuration with the group name for this queue to give eg:
protected function breakoutMode($queue)
Get the breakout mode for the given queue on the current site.
protected function begin($idx)
protected function ack($idx,
$frame)
protected function commit($idx)
Commit a STOMP transaction
protected function rollback($idx)
Rollback a STOMP transaction