RedisQueue.php

Summary
RedisQueue.php
Redis QueueBrand shiny new queue handler using Redis
HostPortContains information about the connection used to connect to Redis if we’re using a TCP connection.
RedisLockA class to abstract a redis lock
__constructClass constructor
lockSet a redis lock with given expiration and timeout
unlockRemove a redis lock if one exists
RedisTimeoutException for when we time out waiting for Redis
__constructException class constructor
RedisContainerObject to contain the queue item being passed to Redis
__constructConstructor for the class
RedisQueueItemClass representation of a queue item as needed for Redis
__constructClass constructor
ageHow old is this queue item?
RedisQueueAbstraction for the actual Redis queue
__constructClass constructor
closeClose the Redis connection
syncSync the Redis queue using the script
fromTCPCreate a Redis connection and associated queue from a TCP connection
fromUnixCreate a Redis connection and associated queue from a Unix socket
nextIdLockCreate a Redis lock
nextIdFind the next queue item’s ID (locking it while we process)
putAdd an item to the queue
getRetrieve an item from the queue
markCompleteMark a queue item as having been completed
markIncompleteMark a queue item as having not been yet completed.
scrubCleanup the Redis queue
itemsBeingProcessedReturns how many items are currently being processed.

Redis Queue

Brand shiny new queue handler using Redis

postActiv

the micro-blogging software

Copyright

Copyright © 2016, Maiyannah Bishop

Derived from code copyright various sources

  • GNU Social © 2013-2016, Free Software Foundation, Inc
  • StatusNet © 2008-2012, StatusNet, Inc

License

This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License along with this program.  If not, see http://www.gnu.org/licenses/.

https://www.gnu.org/licenses/agpl.html

About

Simple-minded queue manager for storing items in the database

PHP version

Tested with PHP 5.6

File Authors

File Copyright

  • 2016 Neil E. Hodges

Web

HostPort

Contains information about the connection used to connect to Redis if we’re using a TCP connection.

Variables

  • host: IP address of host
  • port: port of host we’re connecting to

RedisLock

A class to abstract a redis lock

Defines

  • REDLOCK_UNLOCK - redis script to remove a lock
Summary
__constructClass constructor
lockSet a redis lock with given expiration and timeout
unlockRemove a redis lock if one exists

__construct

function __construct($redis,
$name)

Class constructor

Parameters

  • redis - redis object we are a redis lock for
  • name - a name to assign the lock

lock

function lock($expiration,  
$timeout =  1)

Set a redis lock with given expiration and timeout

Parameters

  • expiration
  • timeout (default 1)

Error State

  • if timeout <= 0 an UnexpectedValueException is raised

unlock

function unlock()

Remove a redis lock if one exists

RedisTimeout

Exception for when we time out waiting for Redis

Variables

  • processing_id_count - what it says on the tin
Summary
__constructException class constructor

__construct

public function __construct($processing_id_count)

Exception class constructor

RedisContainer

Object to contain the queue item being passed to Redis

Variables

  • item - thing we’re containing
  • created - timestamp of when the item was created
Summary
__constructConstructor for the class

__construct

public function __construct($item,  
$created =  null)

Constructor for the class

Parameters

itemthing we’re tossing in a container
createdtime we created the container (if null, sets to now)

RedisQueueItem

Class representation of a queue item as needed for Redis

Variables

iduid of the item
trieshow many times we’ve attempted to process the item
createdcreation timestamp
itemactual item the queueitem represents (notice posted, etc)
processing_id_countpidc for Redis
Summary
__constructClass constructor
ageHow old is this queue item?

__construct

public function __construct($id,  
$tries,  
$created,  
$item,  
$processing_id_count =  0)

Class constructor

age

public function age($now =  null)

How old is this queue item?

RedisQueue

Abstraction for the actual Redis queue

Defines

  • LOCK_TIMEOUT - 3 seconds
  • LOCK_EXPIRATION - 1 minute
  • PROCESSING_TIMEOUT - 5 minutes
  • REDIS_SYNC - Redis script to sync queue items

Variables

  • redis - object for the Redis connection
Summary
__constructClass constructor
closeClose the Redis connection
syncSync the Redis queue using the script
fromTCPCreate a Redis connection and associated queue from a TCP connection
fromUnixCreate a Redis connection and associated queue from a Unix socket
nextIdLockCreate a Redis lock
nextIdFind the next queue item’s ID (locking it while we process)
putAdd an item to the queue
getRetrieve an item from the queue
markCompleteMark a queue item as having been completed
markIncompleteMark a queue item as having not been yet completed.
scrubCleanup the Redis queue
itemsBeingProcessedReturns how many items are currently being processed.

__construct

function __construct($address,
$namespace,
$expiration)

Class constructor

Parameters

  • address - location of the Redis we’re connecting to, either a string containing the Unix socket location, or a HostPort object w/ TCP connection information
  • namespace - namespace for queue items
  • expiration - Expiration of the queue items

Error State

  • Failing to connect to Redis at the given address will raise an exception

close

public function close($sync =  false)

Close the Redis connection

Parameters

  • sync - whether or not to sync the queue before closing the connection (default false)

sync

public function sync($disk =  false)

Sync the Redis queue using the script

Parameters

  • disk - also save to disk?  t/f

Error State

  • will throw an exception if the connection has been closed

fromTCP

public static function fromTCP($host,
$port,
$namespace,
$expiration)

Create a Redis connection and associated queue from a TCP connection

Parameters

  • host - URI of the Redis host
  • port - TCP port on host to connect to
  • namespace - namespace of this queue’s items
  • expiration - expiration of queue items

fromUnix

public static function fromUnix($location,
$namespace,
$expiration)

Create a Redis connection and associated queue from a Unix socket

Parameters

  • location - location of the Unix socket
  • namespace - namespace of this queue’s items
  • expiration - expiration of queue items

nextIdLock

protected function nextIdLock()

Create a Redis lock

nextId

protected function nextId()

Find the next queue item’s ID (locking it while we process)

Error State

  • will throw an exception if the connection has been closed

put

public function put($item)

Add an item to the queue

Parameter

  • item - item to be added

Error State

  • If the function fails to add an item to the queue, it will rais an exception

get

public function get($timeout,  
$tries =  0)

Retrieve an item from the queue

Parameters

  • timeout - how long to wait for retrieval before considering retrieval to have failed
  • tries - how many times have we tried to retrieve this item?  (default 0)

Error States

  • Will throw an exception if the connection has been closed
  • If processing time exceeds $timeout, it will throw a timeout exception

markComplete

public function markComplete($item_id)

Mark a queue item as having been completed

Parameter

item_idid of the item to mark

Error State

  • will throw an exception if the connection has been closed

markIncomplete

public function markIncomplete($item_id)

Mark a queue item as having not been yet completed.

Parameter

item_idid of the item to mark

Error State

  • will throw an exception if the connection has been closed

scrub

public function scrub()

Cleanup the Redis queue

Used for when we’re testing the queue, not meant for use in production

Error State

  • will throw an exception if the connection has been closed

itemsBeingProcessed

public function itemsBeingProcessed()

Returns how many items are currently being processed.  This isn’t a precise count, but rather a metric used for determining when to call sync()

Error State

  • will throw an exception if the connection has been closed
function __construct($redis,
$name)
Class constructor
function lock($expiration,  
$timeout =  1)
Set a redis lock with given expiration and timeout
function unlock()
Remove a redis lock if one exists
public function __construct($processing_id_count)
Exception class constructor
public function __construct($item,  
$created =  null)
Constructor for the class
public function __construct($id,  
$tries,  
$created,  
$item,  
$processing_id_count =  0)
Class constructor
public function age($now =  null)
How old is this queue item?
function __construct($address,
$namespace,
$expiration)
Class constructor
public function close($sync =  false)
Close the Redis connection
public function sync($disk =  false)
Sync the Redis queue using the script
public static function fromTCP($host,
$port,
$namespace,
$expiration)
Create a Redis connection and associated queue from a TCP connection
public static function fromUnix($location,
$namespace,
$expiration)
Create a Redis connection and associated queue from a Unix socket
protected function nextIdLock()
Create a Redis lock
protected function nextId()
Find the next queue item’s ID (locking it while we process)
public function put($item)
Add an item to the queue
public function get($timeout,  
$tries =  0)
Retrieve an item from the queue
public function markComplete($item_id)
Mark a queue item as having been completed
public function markIncomplete($item_id)
Mark a queue item as having not been yet completed.
public function scrub()
Cleanup the Redis queue
public function itemsBeingProcessed()
Returns how many items are currently being processed.