memche消息队列的原理就是在key上做文章,用以做一个连续的数字加上前缀记录序列化以后消息或者日志,然后通过定时程序将内容落地到文件或者数据库.
php实现消息队列的用处比如在做发送邮件时发送大量邮件很费时间的问题,那么可以采取队列.
方便实现队列的轻量级队列服务器是:
starling支持memcache协议的轻量级持久化服务器
https://github.com/starling/starling
Beanstalkd轻量、高效,支持持久化,每秒可处理3000左右的队列
http://kr.github.com/beanstalkd/
php中也可以使用memcache/memcached来实现消息队列,代码如下:
- <?php
-
-
-
-
- class QMC {
- const PREFIX = 'ASDFASDFFWQKE';
-
-
-
-
-
-
- static private function mc_init() {
- static $mc = null;
- if (is_null($mc)) {
- $mc = new Memcache;
- $mc->connect('127.0.0.1', 11211);
- }
- return $mc;
- }
-
-
-
-
-
-
-
- static public function set_counter( $key, $offset, $time=0 ){
- $mc = self::mc_init();
- $val = $mc->get($key);
- if( !is_numeric($val) || $val < 0 ){
- $ret = $mc->set( $key, 0, $time );
- if( !$ret ) return false;
- $val = 0;
- }
- $offset = intval( $offset );
- if( $offset > 0 ){
- return $mc->increment( $key, $offset );
- }elseif( $offset < 0 ){
- return $mc->decrement( $key, -$offset );
- }
- return $val;
- }
-
-
-
-
-
-
-
- static public function input( $key, $value ){
- $mc = self::mc_init();
- $w_key = self::PREFIX.$key.'W';
- $v_key = self::PREFIX.$key.self::set_counter($w_key, 1);
- return $mc->set( $v_key, $value );
- }
-
-
-
-
-
-
- static public function output( $key, $max=100 ){
- $out = array();
- $mc = self::mc_init();
- $r_key = self::PREFIX.$key.'R';
- $w_key = self::PREFIX.$key.'W';
- $r_p = self::set_counter( $r_key, 0 );
- $w_p = self::set_counter( $w_key, 0 );
- if( $r_p == 0 ) $r_p = 1;
- while( $w_p >= $r_p ){
- if( --$max < 0 ) break;
- $v_key = self::PREFIX.$key.$r_p;
- $r_p = self::set_counter( $r_key, 1 );
- $out[] = $mc->get( $v_key );
- $mc->delete($v_key);
- }
- return $out;
- }
- }
-
-
-
-
-
- ?>
基于PHP共享内存实现的消息队列,代码如下:
- <?php
-
-
-
-
-
-
-
-
- class ShmQueue
- {
- private $maxQSize = 0;
-
- private $front = 0;
- private $rear = 0;
-
- private $blockSize = 256;
- private $memSize = 25600;
- private $shmId = 0;
-
- private $filePtr = './shmq.ptr';
-
- private $semId = 0;
- public function __construct()
- {
- $shmkey = ftok(__FILE__, 't');
-
- $this->shmId = shmop_open($shmkey, "c", 0644, $this->memSize );
- $this->maxQSize = $this->memSize / $this->blockSize;
-
-
- $this->semId = sem_get($shmkey, 1);
- sem_acquire($this->semId);
-
- $this->init();
- }
-
- private function init()
- {
- if ( file_exists($this->filePtr) ){
- $contents = file_get_contents($this->filePtr);
- $data = explode( '|', $contents );
- if ( isset($data[0]) && isset($data[1])){
- $this->front = (int)$data[0];
- $this->rear = (int)$data[1];
- }
- }
- }
-
- public function getLength()
- {
- return (($this->rear - $this->front + $this->memSize) % ($this->memSize) )/$this->blockSize;
- }
-
- public function enQueue( $value )
- {
- if ( $this->ptrInc($this->rear) == $this->front ){
- return false;
- }
-
- $data = $this->encode($value);
- shmop_write($this->shmId, $data, $this->rear );
- $this->rear = $this->ptrInc($this->rear);
- return true;
- }
-
- public function deQueue()
- {
- if ( $this->front == $this->rear ){
- return false;
- }
- $value = shmop_read($this->shmId, $this->front, $this->blockSize-1);
- $this->front = $this->ptrInc($this->front);
- return $this->decode($value);
- }
-
- private function ptrInc( $ptr )
- {
- return ($ptr + $this->blockSize) % ($this->memSize);
- }
-
- private function encode( $value )
- {
- $data = serialize($value) . "__eof";
- echo '';
-
- echo strlen($data);
- echo '';
-
- echo $this->blockSize -1;
- echo '';
-
- if ( strlen($data) > $this->blockSize -1 ){
- throw new Exception(strlen($data)." is overload block size!");
- }
- return $data;
- }
-
- private function decode( $value )
- {
- $data = explode("__eof", $value);
- return unserialize($data[0]);
- }
-
- public function __destruct()
- {
- $data = $this->front . '|' . $this->rear;
- file_put_contents($this->filePtr, $data);
-
- sem_release($this->semId);
- }
- }
-
-
-
-
-
-
-
-
-
-
-
-
- ?>
对于一个很大的消息队列,频繁进行进行大数据库的序列化 和 反序列化,有太耗费。下面是我用PHP 实现的一个消息队列,只需要在尾部插入一个数据,就操作尾部,不用操作整个消息队列进行读取,与操作。但是,这个消息队列不是线程安全的,我只是尽量的避免了冲突的可能性。如果消息不是非常的密集,比如几秒钟才一个,还是可以考虑这样使用的。
如果你要实现线程安全的,一个建议是通过文件进行锁定,然后进行操作,下面是代码:
- class Memcache_Queue
- {
- private $memcache;
- private $name;
- private $prefix;
- function __construct($maxSize, $name, $memcache, $prefix = "__memcache_queue__")
- {
- if ($memcache == null) {
- throw new Exception("memcache object is null, new the object first.");
- }
- $this->memcache = $memcache;
- $this->name = $name;
- $this->prefix = $prefix;
- $this->maxSize = $maxSize;
- $this->front = 0;
- $this->real = 0;
- $this->size = 0;
- }
- function __get($name)
- {
- return $this->get($name);
- }
- function __set($name, $value)
- {
- $this->add($name, $value);
- return $this;
- }
- function isEmpty()
- {
- return $this->size == 0;
- }
- function isFull()
- {
- return $this->size == $this->maxSize;
- }
- function enQueue($data)
- {
- if ($this->isFull()) {
- throw new Exception("Queue is Full");
- }
- $this->increment("size");
- $this->set($this->real, $data);
- $this->set("real", ($this->real + 1) % $this->maxSize);
- return $this;
- }
- function deQueue()
- {
- if ($this->isEmpty()) {
- throw new Exception("Queue is Empty");
- }
- $this->decrement("size");
- $this->delete($this->front);
- $this->set("front", ($this->front + 1) % $this->maxSize);
- return $this;
- }
- function getTop()
- {
- return $this->get($this->front);
- }
- function getAll()
- {
- return $this->getPage();
- }
- function getPage($offset = 0, $limit = 0)
- {
- if ($this->isEmpty() || $this->size < $offset) {
- return null;
- }
- $keys[] = $this->getKeyByPos(($this->front + $offset) % $this->maxSize);
- $num = 1;
- for ($pos = ($this->front + $offset + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize)
- {
- $keys[] = $this->getKeyByPos($pos);
- $num++;
- if ($limit > 0 && $limit == $num) {
- break;
- }
- }
- return array_values($this->memcache->get($keys));
- }
- function makeEmpty()
- {
- $keys = $this->getAllKeys();
- foreach ($keys as $value) {
- $this->delete($value);
- }
- $this->delete("real");
- $this->delete("front");
- $this->delete("size");
- $this->delete("maxSize");
- }
- private function getAllKeys()
- {
- if ($this->isEmpty())
- {
- return array();
- }
- $keys[] = $this->getKeyByPos($this->front);
- for ($pos = ($this->front + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize)
- {
- $keys[] = $this->getKeyByPos($pos);
- }
- return $keys;
- }
- private function add($pos, $data)
- {
- $this->memcache->add($this->getKeyByPos($pos), $data);
- return $this;
- }
- private function increment($pos)
- {
- return $this->memcache->increment($this->getKeyByPos($pos));
- }
- private function decrement($pos)
- {
- $this->memcache->decrement($this->getKeyByPos($pos));
- }
- private function set($pos, $data)
- {
- $this->memcache->set($this->getKeyByPos($pos), $data);
- return $this;
- }
- private function get($pos)
- {
- return $this->memcache->get($this->getKeyByPos($pos));
- }
- private function delete($pos)
- {
- return $this->memcache->delete($this->getKeyByPos($pos));
- }
- private function getKeyByPos($pos)
- {
- return $this->prefix . $this->name . $pos;
- }
- }
|