Commit 955bf7da by Carsten Brandt

basic CRUD for elastic search WIP

parent a94886fa
...@@ -6,6 +6,8 @@ ...@@ -6,6 +6,8 @@
*/ */
namespace yii\elasticsearch; namespace yii\elasticsearch;
use Guzzle\Http\Client;
use Guzzle\Http\Exception\MultiTransferException;
use yii\base\NotSupportedException; use yii\base\NotSupportedException;
use yii\db\Exception; use yii\db\Exception;
use yii\helpers\Json; use yii\helpers\Json;
...@@ -78,14 +80,20 @@ class ActiveQuery extends \yii\base\Component ...@@ -78,14 +80,20 @@ class ActiveQuery extends \yii\base\Component
*/ */
public $asArray; public $asArray;
/** /**
* @var array the columns being selected. For example, `array('id', 'name')`.
* This is used to construct the SELECT clause in a SQL statement. If not set, if means selecting all columns.
* @see select()
*/
public $select;
/**
* @var array the query condition. * @var array the query condition.
* @see where() * @see where()
*/ */
public $where; public $where;
/** /**
* @var integer maximum number of records to be returned. If not set or less than 0, it means no limit. * @var integer maximum number of records to be returned. If not set or less than 0, it means no limit. TODO infinite possible in ES?
*/ */
public $limit; public $limit = 10;
/** /**
* @var integer zero-based offset from where the records are to be returned. * @var integer zero-based offset from where the records are to be returned.
* If not set, it means starting from the beginning. * If not set, it means starting from the beginning.
...@@ -128,12 +136,10 @@ class ActiveQuery extends \yii\base\Component ...@@ -128,12 +136,10 @@ class ActiveQuery extends \yii\base\Component
// TODO add support for orderBy // TODO add support for orderBy
$data = $this->executeScript('All'); $data = $this->executeScript('All');
$rows = array(); $rows = array();
print_r($data);
foreach($data as $dataRow) { foreach($data as $dataRow) {
$row = array(); $row = $dataRow['_source'];
$c = count($dataRow); $row['id'] = $dataRow['_id'];
for($i = 0; $i < $c; ) {
$row[$dataRow[$i++]] = $dataRow[$i++];
}
$rows[] = $row; $rows[] = $row;
} }
if (!empty($rows)) { if (!empty($rows)) {
...@@ -157,14 +163,11 @@ class ActiveQuery extends \yii\base\Component ...@@ -157,14 +163,11 @@ class ActiveQuery extends \yii\base\Component
{ {
// TODO add support for orderBy // TODO add support for orderBy
$data = $this->executeScript('One'); $data = $this->executeScript('One');
if ($data === array()) { if (!isset($data['_source'])) {
return null; return null;
} }
$row = array(); $row = $data['_source'];
$c = count($data); $row['id'] = $data['_id'];
for($i = 0; $i < $c; ) {
$row[$data[$i++]] = $data[$i++];
}
if ($this->asArray) { if ($this->asArray) {
$model = $row; $model = $row;
} else { } else {
...@@ -284,12 +287,13 @@ class ActiveQuery extends \yii\base\Component ...@@ -284,12 +287,13 @@ class ActiveQuery extends \yii\base\Component
{ {
if (($data = $this->findByPk($type)) === false) { if (($data = $this->findByPk($type)) === false) {
$modelClass = $this->modelClass; $modelClass = $this->modelClass;
/** @var Connection $db */ $http = $modelClass::getDb()->http();
$db = $modelClass::getDb();
$method = 'build' . $type; $url = '/' . $modelClass::indexName() . '/' . $modelClass::indexType() . '/_search';
$script = $db->getLuaScriptBuilder()->$method($this, $columnName); $query = $modelClass::getDb()->getQueryBuilder()->build($this);
return $db->executeCommand('EVAL', array($script, 0)); $response = $http->post($url, null, Json::encode($query))->send();
$data = Json::decode($response->getBody(true));
return $data['hits']['hits'];
} }
return $data; return $data;
} }
...@@ -301,46 +305,47 @@ class ActiveQuery extends \yii\base\Component ...@@ -301,46 +305,47 @@ class ActiveQuery extends \yii\base\Component
{ {
$modelClass = $this->modelClass; $modelClass = $this->modelClass;
if (is_array($this->where) && !isset($this->where[0]) && $modelClass::isPrimaryKey(array_keys($this->where))) { if (is_array($this->where) && !isset($this->where[0]) && $modelClass::isPrimaryKey(array_keys($this->where))) {
/** @var Connection $db */ /** @var Client $http */
$db = $modelClass::getDb(); $http = $modelClass::getDb()->http();
$pks = (array) reset($this->where); $pks = (array) reset($this->where);
$start = $this->offset === null ? 0 : $this->offset; $query = array('docs' => array());
$i = 0;
$data = array();
$url = '/' . $modelClass::indexName() . '/' . $modelClass::indexType() . '/';
foreach($pks as $pk) { foreach($pks as $pk) {
if (++$i > $start && ($this->limit === null || $i <= $start + $this->limit)) { $doc = array('_id' => $pk);
$request = $db->http()->get($url . $pk); if (!empty($this->select)) {
$response = $request->send(); $doc['fields'] = $this->select;
if ($response->getStatusCode() == 404) {
// ignore?
} else {
$data[] = Json::decode($response->getBody(true));
if ($type === 'One' && $this->orderBy === null) {
break;
}
}
} }
$query['docs'][] = $doc;
} }
$url = '/' . $modelClass::indexName() . '/' . $modelClass::indexType() . '/_mget';
$response = $http->post($url, null, Json::encode($query))->send();
$data = Json::decode($response->getBody(true));
$start = $this->offset === null ? 0 : $this->offset;
$data = array_slice($data['docs'], $start, $this->limit);
// TODO support orderBy // TODO support orderBy
switch($type) { switch($type) {
case 'All': case 'All':
return $data; return $data;
case 'One': case 'One':
return reset($data); return empty($data) ? null : reset($data);
case 'Column': case 'Column':
// TODO support indexBy
$column = array(); $column = array();
foreach($data as $dataRow) { foreach($data as $row) {
$row = array(); $row['_source']['id'] = $row['_id'];
$c = count($dataRow); if ($this->indexBy === null) {
for($i = 0; $i < $c; ) { $column[] = $row['_source'][$columnName];
$row[$dataRow[$i++]] = $dataRow[$i++]; } else {
if (is_string($this->indexBy)) {
$key = $row['_source'][$this->indexBy];
} else {
$key = call_user_func($this->indexBy, $row['_source']);
}
$models[$key] = $row;
} }
$column[] = $row[$columnName];
} }
return $column; return $column;
case 'Count': case 'Count':
...@@ -414,6 +419,24 @@ class ActiveQuery extends \yii\base\Component ...@@ -414,6 +419,24 @@ class ActiveQuery extends \yii\base\Component
} }
/** /**
* Sets the SELECT part of the query.
* @param string|array $columns the columns to be selected.
* Columns can be specified in either a string (e.g. "id, name") or an array (e.g. array('id', 'name')).
* Columns can contain table prefixes (e.g. "tbl_user.id") and/or column aliases (e.g. "tbl_user.id AS user_id").
* The method will automatically quote the column names unless a column contains some parenthesis
* (which means the column contains a DB expression).
* @return Query the query object itself
*/
public function select($columns)
{
if (!is_array($columns)) {
$columns = preg_split('/\s*,\s*/', trim($columns), -1, PREG_SPLIT_NO_EMPTY);
}
$this->select = $columns;
return $this;
}
/**
* Sets the ORDER BY part of the query. * Sets the ORDER BY part of the query.
* @param string|array $columns the columns (and the directions) to be ordered by. * @param string|array $columns the columns (and the directions) to be ordered by.
* Columns can be specified in either a string (e.g. "id ASC, name DESC") or an array * Columns can be specified in either a string (e.g. "id ASC, name DESC") or an array
......
...@@ -64,39 +64,43 @@ abstract class ActiveRecord extends \yii\db\ActiveRecord ...@@ -64,39 +64,43 @@ abstract class ActiveRecord extends \yii\db\ActiveRecord
*/ */
public static function updateAll($attributes, $condition = null, $params = array()) public static function updateAll($attributes, $condition = null, $params = array())
{ {
// TODO add support for further options as described in http://www.elasticsearch.org/guide/reference/api/bulk/
if (empty($attributes)) { if (empty($attributes)) {
return 0; return 0;
} }
$db = static::getDb(); if (count($condition) != 1 || !isset($condition[reset(static::primaryKey())])) {
$n=0; throw new NotSupportedException('UpdateAll is only supported by primary key in elasticsearch.');
foreach(static::fetchPks($condition) as $pk) { }
$newPk = $pk; if (isset($attributes[reset(static::primaryKey())])) {
$pk = static::buildKey($pk); throw new NotSupportedException('Updating the primary key is currently not supported by elasticsearch.');
$key = static::tableName() . ':a:' . $pk; }
// save attributes $query = '';
$args = array($key); foreach((array) reset($condition) as $pk) {
foreach($attributes as $attribute => $value) { if (is_array($pk)) {
if (isset($newPk[$attribute])) { $pk = reset($pk);
$newPk[$attribute] = $value; }
} $action = Json::encode(array(
$args[] = $attribute; "update" => array(
$args[] = $value; "_id" => $pk,
} "_type" => static::indexType(),
$newPk = static::buildKey($newPk); "_index" => static::indexName(),
$newKey = static::tableName() . ':a:' . $newPk; ),
// rename index if pk changed ));
if ($newPk != $pk) { $data = Json::encode(array(
$db->executeCommand('MULTI'); "doc" => $attributes
$db->executeCommand('HMSET', $args); ));
$db->executeCommand('LINSERT', array(static::tableName(), 'AFTER', $pk, $newPk)); $query .= $action . "\n" . $data . "\n";
$db->executeCommand('LREM', array(static::tableName(), 0, $pk)); // TODO implement pk change
$db->executeCommand('RENAME', array($key, $newKey));
$db->executeCommand('EXEC');
} else {
$db->executeCommand('HMSET', $args);
} }
$url = '/' . static::indexName() . '/' . static::indexType() . '/_bulk';
$response = static::getDb()->http()->post($url, array(), $query)->send();
$body = Json::decode($response->getBody(true));
$n=0;
foreach($body['items'] as $item) {
if ($item['update']['ok']) {
$n++; $n++;
} }
}
return $n; return $n;
} }
...@@ -117,19 +121,7 @@ abstract class ActiveRecord extends \yii\db\ActiveRecord ...@@ -117,19 +121,7 @@ abstract class ActiveRecord extends \yii\db\ActiveRecord
*/ */
public static function updateAllCounters($counters, $condition = null, $params = array()) public static function updateAllCounters($counters, $condition = null, $params = array())
{ {
if (empty($counters)) { throw new NotSupportedException('Update Counters is not supported by elasticsearch.');
return 0;
}
$db = static::getDb();
$n=0;
foreach(static::fetchPks($condition) as $pk) {
$key = static::tableName() . ':a:' . static::buildKey($pk);
foreach($counters as $attribute => $value) {
$db->executeCommand('HINCRBY', array($key, $attribute, $value));
}
$n++;
}
return $n;
} }
/** /**
...@@ -149,23 +141,36 @@ abstract class ActiveRecord extends \yii\db\ActiveRecord ...@@ -149,23 +141,36 @@ abstract class ActiveRecord extends \yii\db\ActiveRecord
*/ */
public static function deleteAll($condition = null, $params = array()) public static function deleteAll($condition = null, $params = array())
{ {
$db = static::getDb(); // TODO use delete By Query feature
$attributeKeys = array(); // http://www.elasticsearch.org/guide/reference/api/delete-by-query/
$pks = static::fetchPks($condition); if (count($condition) != 1 || !isset($condition[reset(static::primaryKey())])) {
$db->executeCommand('MULTI'); throw new NotSupportedException('DeleteAll is only supported by primary key in elasticsearch.');
foreach($pks as $pk) { }
$pk = static::buildKey($pk); $query = '';
$db->executeCommand('LREM', array(static::tableName(), 0, $pk)); foreach((array) reset($condition) as $pk) {
$attributeKeys[] = static::tableName() . ':a:' . $pk; if (is_array($pk)) {
} $pk = reset($pk);
if (empty($attributeKeys)) { }
$db->executeCommand('EXEC'); $query .= Json::encode(array(
return 0; "delete" => array(
"_id" => $pk,
"_type" => static::indexType(),
"_index" => static::indexName(),
),
)) . "\n";
}
$url = '/' . static::indexName() . '/' . static::indexType() . '/_bulk';
$response = static::getDb()->http()->post($url, array(), $query)->send();
$body = Json::decode($response->getBody(true));
$n=0;
foreach($body['items'] as $item) {
if ($item['delete']['ok']) {
$n++;
} }
$db->executeCommand('DEL', $attributeKeys);
$result = $db->executeCommand('EXEC');
return end($result);
} }
return $n;
}
/** /**
* Creates an [[ActiveQuery]] instance. * Creates an [[ActiveQuery]] instance.
* This method is called by [[find()]], [[findBySql()]] and [[count()]] to start a SELECT query. * This method is called by [[find()]], [[findBySql()]] and [[count()]] to start a SELECT query.
...@@ -189,16 +194,6 @@ abstract class ActiveRecord extends \yii\db\ActiveRecord ...@@ -189,16 +194,6 @@ abstract class ActiveRecord extends \yii\db\ActiveRecord
return static::getTableSchema()->name; return static::getTableSchema()->name;
} }
/**
* This method is ment to be overridden in redis ActiveRecord subclasses to return a [[RecordSchema]] instance.
* @return RecordSchema
* @throws \yii\base\InvalidConfigException
*/
public static function getRecordSchema()
{
throw new InvalidConfigException(__CLASS__.'::getRecordSchema() needs to be overridden in subclasses and return a RecordSchema.');
}
public static function primaryKey() public static function primaryKey()
{ {
return array('id'); return array('id');
......
...@@ -160,27 +160,18 @@ class Connection extends Component ...@@ -160,27 +160,18 @@ class Connection extends Component
// TODO HTTP request to localhost:9200/ // TODO HTTP request to localhost:9200/
} }
public function http() public function getQueryBuilder()
{ {
return new \Guzzle\Http\Client('http://localhost:9200/'); return new QueryBuilder($this);
}
public function get($url)
{
$c = $this->initCurl($url);
$result = curl_exec($c);
curl_close($c);
} }
private function initCurl($url) /**
* @return \Guzzle\Http\Client
*/
public function http()
{ {
$c = curl_init('http://localhost:9200/' . $url); $guzzle = new \Guzzle\Http\Client('http://localhost:9200/');
$fp = fopen("example_homepage.txt", "w"); //$guzzle->setDefaultOption()
return $guzzle;
curl_setopt($c, CURLOPT_FOLLOWLOCATION, false);
curl_setopt($c, CURLOPT_FILE, $fp);
curl_setopt($c, CURLOPT_HEADER, 0);
} }
} }
\ No newline at end of file
<?php
/**
* Created by JetBrains PhpStorm.
* User: cebe
* Date: 30.09.13
* Time: 11:39
* To change this template use File | Settings | File Templates.
*/
namespace yii\elasticsearch;
use yii\base\Component;
class Query extends Component
{
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment