Commit 613758dd by Carsten Brandt

refactored elasticsearch COnnection and Command

parent 8792f21f
......@@ -13,13 +13,15 @@ nbproject
Thumbs.db
# composer vendor dir
/yii/vendor
/vendor
# composer itself is not needed
composer.phar
# composer.lock should not be committed as we always want the latest versions
/composer.lock
# Mac DS_Store Files
.DS_Store
# local phpunit config
/phpunit.xml
\ No newline at end of file
/phpunit.xml
......@@ -7,6 +7,7 @@ php:
services:
- redis-server
- memcached
- elasticsearch
before_script:
- composer self-update && composer --version
......
......@@ -84,7 +84,7 @@ class ActiveQuery extends Query implements ActiveQueryInterface
public function all($db = null)
{
$command = $this->createCommand($db);
$result = $command->queryAll();
$result = $command->search();
if (empty($result['hits'])) {
return [];
}
......@@ -154,7 +154,7 @@ class ActiveQuery extends Query implements ActiveQueryInterface
if ($field == ActiveRecord::PRIMARY_KEY_NAME) {
$command = $this->createCommand($db);
$command->queryParts['fields'] = [];
$rows = $command->queryAll()['hits'];
$rows = $command->search()['hits'];
$result = [];
foreach ($rows as $row) {
$result[] = $row['_id'];
......
......@@ -370,11 +370,10 @@ class ActiveRecord extends \yii\db\ActiveRecord
}
// TODO do this via command
$url = '/' . static::index() . '/' . static::type() . '/_bulk';
$response = static::getDb()->http()->post($url, null, $bulk)->send();
$body = Json::decode($response->getBody(true));
$url = [static::index(), static::type(), '_bulk'];
$response = static::getDb()->post($url, [], $bulk);
$n=0;
foreach($body['items'] as $item) {
foreach($response['items'] as $item) {
if ($item['update']['ok']) {
$n++;
}
......@@ -421,11 +420,10 @@ class ActiveRecord extends \yii\db\ActiveRecord
}
// TODO do this via command
$url = '/' . static::index() . '/' . static::type() . '/_bulk';
$response = static::getDb()->http()->post($url, null, $bulk)->send();
$body = Json::decode($response->getBody(true));
$url = [static::index(), static::type(), '_bulk'];
$response = static::getDb()->post($url, [], $bulk);
$n=0;
foreach($body['items'] as $item) {
foreach($response['items'] as $item) {
if ($item['delete']['found'] && $item['delete']['ok']) {
$n++;
}
......
......@@ -8,68 +8,56 @@
namespace yii\elasticsearch;
use Guzzle\Http\Exception\ClientErrorResponseException;
use Yii;
use yii\base\Component;
use yii\base\Exception;
use yii\base\InvalidConfigException;
use yii\helpers\Json;
/**
* elasticsearch Connection is used to connect to an elasticsearch cluster version 0.20 or higher
*
*
* @author Carsten Brandt <mail@cebe.cc>
* @since 2.0
*/
class Connection extends Component
abstract class Connection extends Component
{
/**
* @event Event an event that is triggered after a DB connection is established
*/
const EVENT_AFTER_OPEN = 'afterOpen';
// TODO add autodetection of cluster nodes
// http://localhost:9200/_cluster/nodes
public $nodes = array(
array(
'host' => 'localhost',
'port' => 9200,
)
);
// http://www.elasticsearch.org/guide/en/elasticsearch/client/php-api/current/_configuration.html#_example_configuring_http_basic_auth
public $auth = [];
// TODO use timeouts
/**
* @var float timeout to use for connection to redis. If not set the timeout set in php.ini will be used: ini_get("default_socket_timeout")
* @var bool whether to autodetect available cluster nodes on [[open()]]
*/
public $connectionTimeout = null;
public $autodetectCluster = true;
/**
* @var float timeout to use for redis socket when reading and writing data. If not set the php default value will be used.
* @var array cluster nodes
* This is populated with the result of a cluster nodes request when [[autodetectCluster]] is true.
* @see http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/cluster-nodes-info.html#cluster-nodes-info
*/
public $dataTimeout = null;
public $nodes = [
['http_address' => 'inet[/127.0.0.1:9200]'],
];
/**
* @var array the active node. key of [[nodes]]. Will be randomly selected on [[open()]].
*/
public $activeNode;
// TODO http://www.elasticsearch.org/guide/en/elasticsearch/client/php-api/current/_configuration.html#_example_configuring_http_basic_auth
public $auth = [];
public function init()
{
if ($this->nodes === array()) {
throw new InvalidConfigException('elasticsearch needs at least one node.');
foreach($this->nodes as $node) {
if (!isset($node['http_address'])) {
throw new InvalidConfigException('Elasticsearch node needs at least a http_address configured.');
}
}
}
/**
* Creates a command for execution.
* @param string $query the SQL statement to be executed
* @return Command the DB command
*/
public function createCommand($config = [])
{
$this->open();
$config['db'] = $this;
$command = new Command($config);
return $command;
}
/**
* Closes the connection when this component is being serialized.
* @return array
*/
......@@ -85,7 +73,7 @@ class Connection extends Component
*/
public function getIsActive()
{
return false; // TODO implement
return $this->activeNode !== null;
}
/**
......@@ -95,48 +83,37 @@ class Connection extends Component
*/
public function open()
{
// TODO select one node to be the active one.
foreach($this->nodes as $key => $node) {
if (is_array($node)) {
$this->nodes[$key] = new Node($node);
}
if ($this->activeNode !== null) {
return;
}
/* if ($this->_socket === null) {
if (empty($this->dsn)) {
throw new InvalidConfigException('Connection.dsn cannot be empty.');
}
$dsn = explode('/', $this->dsn);
$host = $dsn[2];
if (strpos($host, ':')===false) {
$host .= ':6379';
if (empty($this->nodes)) {
throw new InvalidConfigException('elasticsearch needs at least one node to operate.');
}
if ($this->autodetectCluster) {
$node = reset($this->nodes);
$host = $node['http_address'];
if (strncmp($host, 'inet[/', 6) == 0) {
$host = substr($host, 6, -1);
}
$db = isset($dsn[3]) ? $dsn[3] : 0;
\Yii::trace('Opening DB connection: ' . $this->dsn, __CLASS__);
$this->_socket = @stream_socket_client(
$host,
$errorNumber,
$errorDescription,
$this->connectionTimeout ? $this->connectionTimeout : ini_get("default_socket_timeout")
);
if ($this->_socket) {
if ($this->dataTimeout !== null) {
stream_set_timeout($this->_socket, $timeout=(int)$this->dataTimeout, (int) (($this->dataTimeout - $timeout) * 1000000));
}
if ($this->password !== null) {
$this->executeCommand('AUTH', array($this->password));
}
$this->executeCommand('SELECT', array($db));
$this->initConnection();
} else {
\Yii::error("Failed to open DB connection ({$this->dsn}): " . $errorNumber . ' - ' . $errorDescription, __CLASS__);
$message = YII_DEBUG ? 'Failed to open DB connection: ' . $errorNumber . ' - ' . $errorDescription : 'Failed to open DB connection.';
throw new Exception($message, $errorDescription, (int)$errorNumber);
$response = $this->httpRequest('get', 'http://' . $host . '/_cluster/nodes');
$this->nodes = $response['nodes'];
if (empty($this->nodes)) {
throw new Exception('cluster autodetection did not find any active node.');
}
}*/
// TODO implement
}
$this->selectActiveNode();
Yii::trace('Opening connection to elasticsearch. Nodes in cluster: ' . count($this->nodes)
. ', active node: ' . $this->nodes[$this->activeNode]['http_address'], __CLASS__);
$this->initConnection();
}
/**
* select active node randomly
*/
public function selectActiveNode()
{
$keys = array_keys($this->nodes);
$this->activeNode = $keys[rand(0, count($keys) - 1)];
}
/**
......@@ -145,14 +122,9 @@ class Connection extends Component
*/
public function close()
{
// TODO implement
/* if ($this->_socket !== null) {
\Yii::trace('Closing DB connection: ' . $this->dsn, __CLASS__);
$this->executeCommand('QUIT');
stream_socket_shutdown($this->_socket, STREAM_SHUT_RDWR);
$this->_socket = null;
$this->_transaction = null;
}*/
Yii::trace('Closing connection to elasticsearch. Active node was: '
. $this->nodes[$this->activeNode]['http_address'], __CLASS__);
$this->activeNode = null;
}
/**
......@@ -174,9 +146,17 @@ class Connection extends Component
return 'elasticsearch';
}
public function getNodeInfo()
/**
* Creates a command for execution.
* @param array $config the configuration for the Command class
* @return Command the DB command
*/
public function createCommand($config = [])
{
// TODO HTTP request to localhost:9200/
$this->open();
$config['db'] = $this;
$command = new Command($config);
return $command;
}
public function getQueryBuilder()
......@@ -184,13 +164,58 @@ class Connection extends Component
return new QueryBuilder($this);
}
/**
* @return \Guzzle\Http\Client
*/
public function http()
public function get($url, $options = [], $body = null, $validCodes = [])
{
$this->open();
return $this->httpRequest('get', $this->createUrl($url, $options), $body);
}
public function head($url, $options = [], $body = null)
{
$this->open();
return $this->httpRequest('head', $this->createUrl($url, $options), $body);
}
public function post($url, $options = [], $body = null)
{
$this->open();
return $this->httpRequest('post', $this->createUrl($url, $options), $body);
}
public function put($url, $options = [], $body = null)
{
$this->open();
return $this->httpRequest('put', $this->createUrl($url, $options), $body);
}
public function delete($url, $options = [], $body = null)
{
$this->open();
return $this->httpRequest('delete', $this->createUrl($url, $options), $body);
}
private function createUrl($path, $options = [])
{
$url = implode('/', array_map(function($a) {
return urlencode(is_array($a) ? implode(',', $a) : $a);
}, $path));
if (!empty($options)) {
$url .= '?' . http_build_query($options);
}
return $url;
}
protected abstract function httpRequest($type, $url, $body = null);
public function getNodeInfo()
{
return $this->get([]);
}
public function getClusterState()
{
$guzzle = new \Guzzle\Http\Client('http://localhost:9200/');
//$guzzle->setDefaultOption()
return $guzzle;
return $this->get(['_cluster', 'state']);
}
}
\ No newline at end of file
<?php
/**
*
*
* @author Carsten Brandt <mail@cebe.cc>
*/
namespace yii\elasticsearch;
use Guzzle\Http\Exception\ClientErrorResponseException;
use yii\base\Exception;
use yii\helpers\Json;
class GuzzleConnection extends Connection
{
/**
* @var \Guzzle\Http\Client
*/
private $_http;
protected function httpRequest($type, $url, $body = null)
{
if ($this->_http === null) {
$this->_http = new \Guzzle\Http\Client('http://localhost:9200/');// TODO use active node
//$guzzle->setDefaultOption()
}
$requestOptions = [];
if ($type == 'head') {
$requestOptions['exceptions'] = false;
}
if ($type == 'get' && $body !== null) {
$type = 'post';
}
try{
$response = $this->_http->createRequest(
strtoupper($type)
, $url,
null,
$body,
$requestOptions
)->send();
} catch(ClientErrorResponseException $e) {
if ($e->getResponse()->getStatusCode() == 404) {
return false;
}
throw new Exception("elasticsearch error:\n\n"
. $body . "\n\n" . $e->getMessage()
. print_r(Json::decode($e->getResponse()->getBody(true)), true), 0, $e);
}
if ($type == 'head') {
return $response->getStatusCode() == 200;
}
return Json::decode($response->getBody(true));
}
}
\ No newline at end of file
......@@ -84,7 +84,7 @@ class Query extends Component implements QueryInterface
*/
public function all($db = null)
{
$result = $this->createCommand($db)->queryAll();
$result = $this->createCommand($db)->search();
// TODO publish facet results
$rows = $result['hits'];
if ($this->indexBy === null && $this->fields === null) {
......@@ -118,7 +118,7 @@ class Query extends Component implements QueryInterface
public function one($db = null)
{
$options['size'] = 1;
$result = $this->createCommand($db)->queryAll($options);
$result = $this->createCommand($db)->search($options);
// TODO publish facet results
if (empty($result['hits'])) {
return false;
......@@ -175,7 +175,7 @@ class Query extends Component implements QueryInterface
{
$command = $this->createCommand($db);
$command->queryParts['fields'] = [$field];
$rows = $command->queryAll()['hits'];
$rows = $command->search()['hits'];
$result = [];
foreach ($rows as $row) {
$result[] = isset($row['fields'][$field]) ? $row['fields'][$field] : null;
......@@ -196,7 +196,9 @@ class Query extends Component implements QueryInterface
// only when no facety are registerted.
// http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-count.html
$count = $this->createCommand($db)->queryCount()['total'];
$options = [];
$options['search_type'] = 'count';
$count = $this->createCommand($db)->search($options)['total'];
if ($this->limit === null && $this->offset === null) {
return $count;
} elseif ($this->offset !== null) {
......
......@@ -14,7 +14,7 @@ return [
'elasticsearch' => [
'class' => 'yii\elasticsearch\Connection',
'hosts' => [
['hostname' => 'localhost', 'port' => 9200],
['http_address' => '127.0.0.1:9200'],
// configure more hosts if you have a cluster
],
],
......
......@@ -24,4 +24,9 @@ class ActiveRecord extends \yii\elasticsearch\ActiveRecord
{
return self::$db;
}
public static function index()
{
return 'yiitest';
}
}
......@@ -43,10 +43,12 @@ class ActiveRecordTest extends ElasticSearchTestCase
/** @var Connection $db */
$db = ActiveRecord::$db = $this->getConnection();
// delete all indexes
$db->http()->delete('_all')->send();
// delete index
if ($db->createCommand()->indexExists('yiitest')) {
$db->createCommand()->deleteIndex('yiitest');
}
$db->http()->post('items', null, Json::encode([
$db->post(['yiitest'], [], Json::encode([
'mappings' => [
"item" => [
"_source" => [ "enabled" => true ],
......@@ -56,19 +58,7 @@ class ActiveRecordTest extends ElasticSearchTestCase
]
]
],
]))->send();
$db->http()->post('customers', null, Json::encode([
'mappings' => [
"item" => [
"_source" => [ "enabled" => true ],
"properties" => [
// this is for the boolean test
"status" => ["type" => "boolean"],
]
]
],
]))->send();
]));
$customer = new Customer();
$customer->id = 1;
......@@ -281,10 +271,10 @@ class ActiveRecordTest extends ElasticSearchTestCase
public function testBooleanAttribute()
{
$db = $this->getConnection();
$db->createCommand()->deleteIndex('customers');
$db->http()->post('customers', null, Json::encode([
$db->createCommand()->deleteIndex('yiitest');
$db->post(['yiitest'], [], Json::encode([
'mappings' => [
"item" => [
"customer" => [
"_source" => [ "enabled" => true ],
"properties" => [
// this is for the boolean test
......@@ -292,7 +282,7 @@ class ActiveRecordTest extends ElasticSearchTestCase
]
]
],
]))->send();
]));
$customerClass = $this->getCustomerClass();
$customer = new $customerClass();
......
......@@ -2,13 +2,28 @@
namespace yiiunit\extensions\elasticsearch;
use yii\redis\Connection;
use yii\elasticsearch\Connection;
use yii\elasticsearch\GuzzleConnection;
/**
* @group elasticsearch
*/
class ElasticSearchConnectionTest extends ElasticSearchTestCase
{
// TODO
public function testOpen()
{
$connection = new GuzzleConnection();
$connection->autodetectCluster;
$connection->nodes = [
['http_address' => 'inet[/127.0.0.1:9200]'],
];
$this->assertNull($connection->activeNode);
$connection->open();
$this->assertNotNull($connection->activeNode);
$this->assertArrayHasKey('name', reset($connection->nodes));
$this->assertArrayHasKey('hostname', reset($connection->nodes));
$this->assertArrayHasKey('version', reset($connection->nodes));
$this->assertArrayHasKey('http_address', reset($connection->nodes));
}
}
\ No newline at end of file
......@@ -4,6 +4,7 @@ namespace yiiunit\extensions\elasticsearch;
use Yii;
use yii\elasticsearch\Connection;
use yii\elasticsearch\GuzzleConnection;
use yiiunit\TestCase;
Yii::setAlias('@yii/elasticsearch', __DIR__ . '/../../../../extensions/elasticsearch');
......@@ -42,7 +43,7 @@ class ElasticSearchTestCase extends TestCase
{
$databases = $this->getParam('databases');
$params = isset($databases['elasticsearch']) ? $databases['elasticsearch'] : array();
$db = new Connection;
$db = new GuzzleConnection();
if ($reset) {
$db->open();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment