ElasticSearch(九)e代驾使用Elasticsearch流程设计(Yii1版本)
作者:互联网
一、控制器层的更新、添加、删除
class AddKnowledgeAction extends CAction { //add and update public function actionPost() { if ($_POST) { //如果是post操作 $res = array('code'=>0,'message'=>''); $kid = Yii::app()->request->getPost('kid'); //这里是知识主键id $cid = Yii::app()->request->getPost('cid'); $title = Yii::app()->request->getPost('title'); $content = Yii::app()->request->getPost('content'); $auth_group = Yii::app()->request->getPost('auth_group'); $end_time = Yii::app()->request->getPost('end_time'); $keywords = Yii::app()->request->getPost('keywords'); if (empty($kid)) { //$kid不存在则说明是走add操作,否则是update if ($kid = CallCenterService::addKnowledge($cid, $title, $content, $auth_group, $end_time, $keywords)) { //这里表示添加成功 } } else { if (CallCenterService::updateKnowledge($kid, $cid, $title, $content, $auth_group, $end_time, $keywords)) { //这里表示修改成功 } } } } //delete public function actionDelete (){ $kid = Yii::app()->request->getQuery('kid'); $action = Yii::app()->request->getQuery('action'); //action => ['delete' => '物理删除', 'Invalid' => '逻辑删除']if ($kid && $action) { if (CallCenterService::delKnowledge($kid, $action)) { //表示删除成功 } } } }
二、服务层的更新、添加、删除
//服务层 class CallCenterService { private static $instance; public static $auth_group = null; public static function getInstance() { if (empty(self::$instance)) { self::$instance = new CallCenterService(); } return self::$instance; } /** * 添加知识 */ public static function addKnowledge($cid, $title, $content, $auth_group, $end_time, $keywords) { $model = new Knowledgenew; $operator = Yii::app()->user->id; $created = date('Y-m-d H:i:s'); $model->attributes = array( 'cid' => $cid, 'title' => $title, 'content' => $content, 'operator' => $operator, 'created' => $created, 'auth_group' => $auth_group, 'end_time' => $end_time, 'keywords' => $keywords, 'updated' => $created ); if ($model->save()) { $id = $model->id; //异步添加索引到es Knowledgenew::onKnowledgeChanged('add', array('id' => $id)); return $id; } else { } return false; } /** * 编辑知识 */ public static function updateKnowledge($kid, $cid, $title, $content , $auth_group, $end_time, $keywords) { $knowledge = Knowledgenew::getKnowledge($kid); if ($knowledge) { $model = new Knowledgenew; $model->updateByPk($kid, array( 'cid' => $cid, 'title' => $title, 'content' => $content, 'auth_group' => $auth_group, 'end_time' => isset($end_time) && !empty($end_time) ? $end_time : null, 'keywords' => $keywords, 'updated' => date('Y-m-d H:i:s') ) ); Knowledgenew::onKnowledgeChanged('update', array('id' => $kid)); return true; } return false; } /**删除一条知识 * @param $kid * @param string $action Invalid => 逻辑删除 ,delete =>物理删除 * @return bool */ public static function delKnowledge($kid, $action = 'invalid') { $knowledge = Knowledgenew::getKnowledge($kid); if ($knowledge) { $model = new Knowledgenew; if ($action == 'delete') { $model->deleteByPk($kid); } else { $model->updateByPk($kid,array('status'=>Knowledgenew::STATUS_DEL)); } //更新es Knowledgenew::onKnowledgeChanged('delete', array('id' => $kid)); //删除收藏夹中的相关知识 KnowledgenewCollection::model()->deleteAll("kid = $kid"); return true; } return false; } }
三、Model层的更新点击浏览次数场景及异步任务更新ES信息
//model层 class Knowledgenew extends CActiveRecord { const STATUS_NORMAL = 1; const STATUS_DEL = 2; //Invalid public function tableName() { return '{{knowledgenew}}'; } public static function model($className=__CLASS__) { return parent::model($className); } /** * 增加浏览数 */ public static function addClickNum($kid) { $model = self::model(); $model->updateCounters(array('click_num'=>1),'id=:id',array(':id'=>$kid)); Knowledgenew::onKnowledgeChanged('update', array('id' => $kid)); return true; } //更新es信息 public static function onKnowledgeChanged($action, $param){ //echo '更新知识库索引action='.$action.PHP_EOL; EdjLog::info('更新知识库索引action='.$action); $base_param = array('es_source' => 'Knowledgenew', 'es_action' => $action); Queue::model()->putin( //异步 array( 'method'=>'synchronize_elasticsearch', 'params'=>array_merge($base_param, $param) ), 'synchronize_elasticsearch' ); } }
四、异步Job队列生产
<?php /** * 基于redis的queue队列 */ class Queue { private static $_models; public $queue_max_length = array( ); public static function model($className=__CLASS__) { $model=null; if (isset(self::$_models[$className])) $model=self::$_models[$className]; else { $model=self::$_models[$className]=new $className(null); } return $model; } //确定redis private function select_redis($type) { return QueuePool::model()->get_zone($type); } private function trim($queue_name) { $type = str_replace("queue_", "", $queue_name); $max = 0; if (isset($this->queue_max_length[$type])) { $max = intval($this->queue_max_length[$type]); } if ($max>0) { $zone = $this->select_redis($type); if($zone) { $zone['redis']->lTrim($queue_name, 0, $max-1); } else { EdjLog::error("can not find zone, queue name: " . $type); return; } } } /** * 放入队列,统一队列对外暴露方法,增加类型默认放task队列,指定了就放对应的队列,同时如果不在指定类型内的,也放默认队列 * * @param unknown_type $params * @param unknown_type $type * @return mixed */ public function putin($params=null, $type){ $type = empty($type) ? 'error' : strtolower($type); $base_qname = QNameManagerService::model()->get_base_qname($type); if(!empty($base_qname)) { $this->queue_name = 'queue_'.$base_qname; }else{ $this->queue_name = 'queue_error'; } if ($params===null) { return $this->get(); } else { return $this->add($params); //如果add替换为processTask方法,则同步 } } /** * 取一条队列数据,封装多个队列,统一调用方法 * @param string $type * @return array */ public function getit($type='default') { $base_qname = QNameManagerService::model()->get_base_qname($type); if(!empty($base_qname)) { $this->queue_name = 'queue_'.$base_qname; }else{ return array(); } $zone = $this->select_redis($type); if($zone) { if($zone['brpop']) { $json = ''; $result = $zone['redis']->brPop($this->queue_name, $zone['brpop']); if(!empty($result) && isset($result[1])) { $json = $result[1]; } } else { $json = $zone['redis']->rPop($this->queue_name); } } else { EdjLog::error("can not find zone, queue name: " . $type); return array(); } return json_decode($json, true); } /** * 返回队列接收的类型列表 * @return array */ public function getQueueTypeList() { $list = QNameManager::model()->findall(); if($list) { return $list; } EdjLog::error("Error: get queue list from database"); return array(); } /** * 设置或者读取位置队列 * @param array $params * @return mixed */ public function position($params=null) { $this->queue_name='queue_position'; if ($params===null) { return $this->get(); } else { return $this->add($params); } } /** * 心跳队列 * @param string $params * @return mixed */ public function heartbeat($params=null) { $this->queue_name='queue_heartbeat'; if ($params===null) { return $this->get(); } else { return $this->add($params); } } /** * 最高优先级队列 * @param string $params * @return mixed */ public function task($params=null) { $this->queue_name='queue_task'; if ($params===null) { return $this->get(); } else { return $this->add($params); } } /** * 保存日志到数据库 * @param string $params * @return mixed */ public function dumplog($params=null) { $this->queue_name='queue_dumplog'; if ($params===null) { return $this->get(); } else { return $this->add($params); } } /** * 返回各个队列中的任务总数 */ public function length() { $queue = $this->getQueueTypeList(); $queue_length=array(); $reg = "/P[0-9]+$/"; foreach($queue as $item) { $base_qname = $item->base_qname; $zone = $this->select_redis($base_qname); $key = 'queue_'.$base_qname; if($zone) { $len = $zone['redis']->lLen($key); if(isset($item->max) && $len > $item->max) { $key = '!'.$key; } $pkey = ''; if(preg_match($reg, $zone['name'])) { $pkey = $key.'@'.$zone['name']; } else { $pkey = $key.'@'.$zone['name']."_P".$item->level; } $queue_length[$pkey] = $len; } else { EdjLog::error("can not find zone, queue name: " . $key); } } return $queue_length; } private function get() { $type = str_replace("queue_", "", $this->queue_name); $zone = $this->select_redis($type); if($zone) { if($zone['brpop']) { $json = ''; $result = $zone['redis']->brPop($this->queue_name, $zone['brpop']); if(!empty($result) && isset($result[1])) { $json = $result[1]; } } else { $json = $zone['redis']->rPop($this->queue_name); } } else { EdjLog::error("can not find zone, queue name: " . $type); return array(); } return json_decode($json, true); } private function add($params) { $json=json_encode($params); $type = str_replace("queue_", "", $this->queue_name); $zone = $this->select_redis($type); $return = 0; if($zone) { try { $return = $zone['redis']->lPush($this->queue_name, $json); } catch (Exception $e) { EdjLog::error("write redis error,msg:".$e->getMessage()); //echo $e->getMessage(); } } else { EdjLog::error("can not find zone, queue name: " . $type); } return $return; } //如果add 替换为此方法,则同步 public function processTask($task) { if(!isset($task['method'], $task['params'])) { $task_content = json_encode($task); EdjLog::error("can not run task due to no 'method' or 'params' specified, task is $task_content"); return; } $method=$task['method']; $params=$task['params']; $class = isset($task['class']) ? $task['class'] : "QueueProcess"; EdjLog::info("REDIS_QUEUE_OUT CLASS:$class METHOD:$method PARAMS:".json_encode($params)); try { //throw new Exception("Value must be 1 or below"); $queue_process=new $class(); // check this method is exist, if not throw ReflectionException new ReflectionMethod($queue_process, $method); call_user_func_array(array($queue_process, $method), array($params)); } catch(Exception $e) { $errmsg = $e->getMessage(); EdjLog::error("execption queue_run method:$method err: $errmsg"); } } public function getLengthByType($type){ $type = empty($type) ? 'error' : strtolower($type); $base_qname = QNameManagerService::model()->get_base_qname($type); $zone = $this->select_redis($base_qname); $key = 'queue_'.$base_qname; $len = 0; if($zone) { $len = $zone['redis']->lLen($key); } else { EdjLog::error("can not find zone, queue name: " . $base_qname); } return $len; } }
五、异步Job队列消费
<?php /** * 队列处理 */ Yii::import("application.ecenter.service.HttpUtils"); class QueueProcess { private static $_models; private $message; public static function model($className=__CLASS__) { $model=null; if (isset(self::$_models[$className])) $model=self::$_models[$className]; else { $model=self::$_models[$className]=new $className(null); } return $model; } public function synchronize_elasticsearch($param) { if (empty($param) || !isset($param['es_source'], $param['es_action'])) { return false; } $class_name = $param['es_source'].'Synchronizer'; $method_name = $param['es_action']; if (class_exists($class_name) && method_exists($class_name, $method_name)) { unset($param['es_source']); unset($param['es_action']); call_user_func(array($class_name, $method_name), $param); } else { EdjLog::error('synchronize method does not exist. class name '.$class_name.' method name '.$method_name); } } }
六、ES信息处理操作服务层
<?php /** * Created by PhpStorm. */ class KnowledgenewSynchronizer { static public $index = 'knowledge_index'; static public $type = 'knowledge'; static public $filed = ' id, keywords, title, content, auth_group, cid, operator, click_num, status, created, updated '; static public function add($param) { if (empty($param) || !isset($param['id'])) { return false; } $id = $param['id']; $sql = "select".self::$filed."from t_knowledgenew where id=:id"; $doc = Yii::app()->db->CreateCommand($sql)->queryRow(true,array('id'=>$id)); if (empty($doc)) { EdjLog::error('cannot find knowledge with id: '.$id); return false; } return ElasticsearchSynchronizer::addDocument(self::$index, self::$type, $id, $doc); } static public function delete($param) { if (empty($param) || !isset($param['id'])) { return false; } $id = $param['id']; return ElasticsearchSynchronizer::deleteDocument(self::$index, self::$type, $id); } static public function update($param) { if (empty($param) || !isset($param['id'])) { return false; } $id = $param['id']; $sql = "select".self::$filed."from t_knowledgenew where id=:id"; $doc = Yii::app()->db->CreateCommand($sql)->queryRow(true,array('id'=>$id)); if (empty($doc)) { EdjLog::error('cannot find knowledge with id: '.$id); return false; } return ElasticsearchSynchronizer::updateDocument(self::$index, self::$type, $id, $doc); } }
七、ES信息处理操作Model层
<?php use Elastica\Client; use Elastica\Query\QueryString; use Elastica\Query; use Elastica\Document; Class ElasticsearchSynchronizer {//测试 //const ES_HOST='search.n.edaijia.cn'; //const ES_PORT=9200; static public function addDocument($index, $type, $id, $doc) { $client = new Client(array('host' => self::ES_HOST, self::ES_PORT)); $type = $client->getIndex($index)->getType($type); try { $response = $type->addDocument(new Document($id, $doc)); if ($response->isOk()) { EdjLog::info("add document $id succeeded"); return true; } else { EdjLog::info("add document $id failed"); return false; } } catch (Exception $e) { print_r($e); EdjLog::error("add document $id failed with exception ".$e->getMessage()); return false; } } static public function updateDocument($index, $type, $id, $doc) { $client = new Client(array('host' => self::ES_HOST, 'port' => self::ES_PORT)); try { $response = $client->getIndex($index) ->getType($type) ->updateDocument(new Document($id, $doc)); if ($response->isOk()) { EdjLog::info("update document $id succeeded"); return true; } else { EdjLog::info("update document $id failed"); return false; } } catch (Exception $e) { EdjLog::error("update document $id failed with exception ".$e->getMessage()); return false; } } static public function deleteDocument($index, $type, $id) { $client = new Client(array('host' => self::ES_HOST, 'port' => self::ES_PORT)); try { $response = $client->getIndex($index)->getType($type)->deleteById($id); if ($response->isOk()) { EdjLog::info("delete document $id succeeded"); return true; } else { EdjLog::info("delete document $id failed"); return false; } } catch (Exception $e) { EdjLog::error("delete document $id failed with exception ".$e->getMessage()); return false; } } }
八、查询
/** * @param $keyword * @param int $page * @param int $size * @param str $search_type * 搜索知识 * 搜索标题和内容,多个关键词是并且关系,空格分隔 */ public static function searchKnowledge($keyword, $page = 0, $size = 50, $search_type = 'default') { //对搜索关键词空格隔开 // $keywords = explode(' ',trim($keyword)); $start = $page * $size; $client = new \Elastica\Client(array('host' => ElasticsearchSynchronizer::ES_HOST, 'port' => ElasticsearchSynchronizer::ES_PORT));//更改成线上主机和端口 $search = new \Elastica\Search($client); $search ->addIndex(KnowledgenewSynchronizer::$index)->addType(KnowledgenewSynchronizer::$type); // $query = new \Elastica\Query\Bool(); $query = new \Elastica\Query(); //设置必要查询 // foreach($keywords as $word) { // if($word) { // $knowledge_query = new \Elastica\Query\QueryString(); // $knowledge_query->setFields(array('title', 'content')); // $knowledge_query->setQuery('"' . $word . '"'); // $query->addMust($knowledge_query); // } // } $MultiMatch_obj = new \Elastica\Query\MultiMatch(); $MultiMatch_obj->setQuery($keyword); if ($search_type == 'default') { $MultiMatch_obj->setFields(array('keywords')); } else { $MultiMatch_obj->setTieBreaker(0.3); $MultiMatch_obj->setType('best_fields'); $MultiMatch_obj->setFields(array('keywords^901209','content')); $MultiMatch_obj->setOperator('or'); //这里是字符串,在es的扩展目录下 setMinimumShouldMatch方法把转int去掉// //$this->setParam('minimum_should_match', (int)$minimumShouldMatch); $MultiMatch_obj->setMinimumShouldMatch('30%'); } $query->setQuery($MultiMatch_obj); //命中全部纪录 $query = \Elastica\Query::create($query); $query->setSource(["id","cid","updated", "title", 'keywords',"content",'auth_group','status']); // $query->setSort([ // 'click_num' => ['order' => 'desc'] // ]); //设置起始页 $query->setFrom($start); $query->setSize($size); //设置高亮显示 $query->setHighlight(array( 'pre_tags' => array('<span style="color: red">'), 'post_tags' => array('</span>'), 'fields' => array( 'title' => array( 'fragment_size' => 5,//包含高亮词语,最多展示多少个 'number_of_fragments' => 1,//分几段展示,这里一段就可以 ), 'keywords' => array( 'fragment_size' => 10,//包含高亮词语,最多展示多少个 'number_of_fragments' => 1,//分几段展示,这里一段就可以 ), 'content' => array( 'fragment_size' => 10,//包含高亮词语,最多展示多少个 'number_of_fragments' => 1,//分几段展示,这里一段就可以 ), ), )); $search->setQuery($query); $results = array(); $totalResults = 0; try { $resultSet = $search->search(); $results = $resultSet->getResults(); $totalResults = $resultSet->getTotalHits(); } catch (Exception $e) { EdjLog::error("query elasticsearch failed"); } if ($totalResults <= 0) { return; } $poi_result = array(); foreach ($results as $result) { $highlight = $result->getHighlights(); $title_hightlight = isset($highlight['title'][0])?$highlight['title'][0]:''; $content_hightlight = isset($highlight['content'][0])?$highlight['content'][0]:''; $keywords_highlight = isset($highlight['keywords'][0])?$highlight['keywords'][0]:''; $poi_result[] = array( 'id' => $result->id, 'cid' => $result->cid, 'title' => $result->title, 'keywords' => $result->keywords, 'content' => $result->content, 'auth_group' => $result->auth_group, 'title_highlight'=>$title_hightlight, //高亮展示标题搜索关键词 'keywords_highlight'=>$keywords_highlight, //高亮展示标题搜索关键词 'content_highlight'=>$content_hightlight,//高亮展示内容搜索关键词 'updated'=>$result->updated, 'status' => $result->status, ); } //这里过滤了用户非权限列表 $poi_result = self::filterNoAuthKnowledge($poi_result); $totalResults = count($poi_result) ; $info = array('totalNum'=>$totalResults,'data'=>$poi_result); return json_encode($info); }
九、源码包
链接:https://pan.baidu.com/s/1lVcrb50HSLrJh3zvBOdH5g 提取码:9c9c
标签:return,zone,queue,Elasticsearch,代驾,Yii1,array,type,id 来源: https://blog.51cto.com/u_15230485/2821696