在什么场合里,你会使用PHP消息队列呢
< 返回列表时间: 2020-06-27来源:OSCHINA
1. 什么是消息队列
消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式
2. 为什么使用消息队列
消息队列技术是分布式应用间交换信息的一种技术。消息队列可驻留在内存或磁盘上,队列存储消息直到它们被应用程序读出。通过消息队列,应用程序可独立地执行,它们不需要知道彼此的位置、或在继续执行前不需要等待接收程序接收此消息。
3. 什么场合使用消息队列
你首先需要弄清楚,消息队列与远程过程调用的区别,在很多读者咨询我的时候,我发现他们需要的是RPC(远程过程调用),而不是消息队列。
消息队列有同步或异步实现方式,通常我们采用异步方式使用消息队列,远程过程调用多采用同步方式。
MQ与RPC有什么不同? MQ通常传递无规则协议,这个协议由用户定义并且实现存储转发;而RPC通常是专用协议,调用过程返回结果。
4. 什么时候使用消息队列
同步需求,远程过程调用(PRC)更适合你。
异步需求,消息队列更适合你。
目前很多消息队列软件同时支持RPC功能,很多RPC系统也能异步调用。
消息队列用来实现下列需求 存储转发 分布式事务 发布订阅 基于内容的路由 点对点连接
以下是一个消息队列的运用实例
<?php /** * Created by PhpStorm. * User: lin * Date: 2017/6/9 * Time: 11:19 * 实现php共享内存消息队列 */ class ShmQueue { private $maxQSize = 0 ; //队列最大长度 private $front = 0 ; //队头指针 private $rear = 0 ; //队尾指针 private $blockSize = 256 ; // 块的大小(byte) private $memSize = 1280 ; // 最大共享内存(byte) private $shmId = 0 ; //根据这个id可以操作该共享内存片段 private $filePtr = APP_PATH . 'public/shmq.ptr' ; private $semId = 0 ; public function __construct () { $shmkey = ftok ( __FILE__ , 't' ); //产生系统id $this -> shmId = shmop_open ( $shmkey , "c" , 0644 , $this -> memSize ); //创建一个内存段 $this -> maxQSize = $this -> memSize / $this -> blockSize ; // 申請一个信号量 $this -> semId = sem_get ( $shmkey , 1 ); sem_acquire ( $this -> semId ); // 申请进入临界 $this -> init (); } private function init () { if ( file_exists ( $this -> filePtr ) ){ $contents = file_get_contents ( $this -> filePtr ); $data = explode ( '|' , $contents ); if ( isset ( $data [ 0 ]) && isset ( $data [ 1 ])){ $this -> front = ( int ) $data [ 0 ]; $this -> rear = ( int ) $data [ 1 ]; } } } public function getLength () { return (( $this -> rear - $this -> front + $this -> memSize ) % ( $this -> memSize ) ) / $this -> blockSize ; } public function enQueue ( $value ) { if ( $this -> ptrInc ( $this -> rear ) == $this -> front ){ // 队满 return false ; } //echo $this->front; $data = $this -> encode ( $value ); shmop_write ( $this -> shmId , $data , $this -> rear ); $this -> rear = $this -> ptrInc ( $this -> rear ); return $this -> decode ( $data ); } public function deQueue () { if ( $this -> front == $this -> rear ){ // 队空 throw new Exception ( " block size is null!" ); } $value = shmop_read ( $this -> shmId , $this -> front , $this -> blockSize - 1 ); $this -> front = $this -> ptrInc ( $this -> front ); return $this -> decode ( $value ); } private function ptrInc ( $ptr ) { return ( $ptr + $this -> blockSize ) % ( $this -> memSize ); } private function encode ( $value ) { $data = serialize ( $value ) . "__eof" ; //echo ''; //echo strlen($data); //echo ''; // echo $this->blockSize -1; // echo ''; if ( strlen ( $data ) > $this -> blockSize - 1 ){ throw new Exception ( strlen ( $data ) . " is overload block size!" ); } return $data ; } public function exist ( $value ){ //判断队头的数据 $data = shmop_read ( $this -> shmId , $this -> front , $this -> blockSize - 1 ); if ( $value == $this -> decode ( $data )){ return 1 ; } return 0 ; } private function decode ( $value ) { //return $value; $data = explode ( "__eof" , $value ); return unserialize ( $data [ 0 ]); } public function __destruct () { //保存队头和队尾指针 $data = $this -> front . '|' . $this -> rear ; file_put_contents ( $this -> filePtr , $data ); sem_release ( $this -> semId ); // 出临界区, 释放信号量 } }
如何调用
$shmq = new ShmQueue(); 入队: $data = 125; $shmq->enQueue($data); 出队: $shmq->deQueue();

热门排行