Commit 43333d9c by Qiang Xue

Implemented master balancing.

parent 91e0c5ae
...@@ -324,7 +324,7 @@ class Schema extends Object ...@@ -324,7 +324,7 @@ class Schema extends Object
public function quoteValue($str) public function quoteValue($str)
{ {
if (is_string($str)) { if (is_string($str)) {
return $this->db->getReadPdo()->quote($str); return $this->db->getSlavePdo()->quote($str);
} else { } else {
return $str; return $str;
} }
......
...@@ -161,9 +161,9 @@ class Command extends \yii\base\Component ...@@ -161,9 +161,9 @@ class Command extends \yii\base\Component
$sql = $this->getSql(); $sql = $this->getSql();
if ($forRead || $forRead === null && $this->db->getSchema()->isReadQuery($sql)) { if ($forRead || $forRead === null && $this->db->getSchema()->isReadQuery($sql)) {
$pdo = $this->db->getReadPdo(); $pdo = $this->db->getSlavePdo();
} else { } else {
$pdo = $this->db->getWritePdo(); $pdo = $this->db->getMasterPdo();
} }
try { try {
......
...@@ -282,30 +282,70 @@ class Connection extends Component ...@@ -282,30 +282,70 @@ class Connection extends Component
*/ */
public $enableSavepoint = true; public $enableSavepoint = true;
/** /**
* @var Cache|string the cache object or the ID of the cache application component that is used to store
* the health status of the DB servers specified in [[masters]] and [[slaves]].
* This is used only when read/write splitting is enabled or [[masters]] is not empty.
*/
public $serverStatusCache = 'cache';
/**
* @var integer the retry interval in seconds for dead servers listed in [[masters]] and [[slaves]].
* This is used together with [[serverStatusCache]].
*/
public $serverRetryInterval = 600;
/**
* @var boolean whether to enable read/write splitting by using [[slaves]] to read data. * @var boolean whether to enable read/write splitting by using [[slaves]] to read data.
* Note that if [[slaves]] is empty, read/write splitting will NOT be enabled no matter what value this property takes.
*/ */
public $enableSlave = true; public $enableSlave = true;
/** /**
* @var array list of slave connection configurations. When [[enableSlave]] is true, one of these slave * @var array list of slave connection configurations. Each configuration is used to create a slave DB connection.
* configurations will be used to create a DB connection to perform read queries. * When [[enableSlave]] is true, one of these configurations will be chosen and used to create a DB connection
* for performing read queries only.
* @see enableSlave * @see enableSlave
* @see slaveConfig
*/ */
public $slaves = []; public $slaves = [];
/** /**
* @var integer the timeout in seconds for determining whether a slave is dead or not * @var array the configuration that should be merged with every slave configuration listed in [[slaves]].
* @see enableSlave * For example,
*
* ```php
* [
* 'username' => 'slave',
* 'password' => 'slave',
* 'attributes' => [
* // use a smaller connection timeout
* PDO::ATTR_TIMEOUT => 10,
* ],
* ]
* ```
*/ */
public $slaveTimeout = 10; public $slaveConfig = [];
/** /**
* @var Cache|string the cache object or the ID of the cache application component that is used to store * @var array list of master connection configurations. Each configuration is used to create a master DB connection.
* the health status of the slaves. * When [[open()]] is called, one of these configurations will be chosen and used to create a DB connection
* @see enableSlave * which will be used by this object.
* Note that when this property is not empty, the connection setting (e.g. "dsn", "username") of this object will
* be ignored.
* @see masterConfig
*/ */
public $slaveCache = 'cache'; public $masters = [];
/** /**
* @var integer the retry interval in seconds for dead slaves. * @var array the configuration that should be merged with every master configuration listed in [[masters]].
* For example,
*
* ```php
* [
* 'username' => 'master',
* 'password' => 'master',
* 'attributes' => [
* // use a smaller connection timeout
* PDO::ATTR_TIMEOUT => 10,
* ],
* ]
* ```
*/ */
public $slaveRetryInterval = 600; public $masterConfig = [];
/** /**
* @var Transaction the currently active transaction * @var Transaction the currently active transaction
...@@ -320,7 +360,7 @@ class Connection extends Component ...@@ -320,7 +360,7 @@ class Connection extends Component
*/ */
private $_driverName; private $_driverName;
/** /**
* @var Connection the currently active slave * @var Connection the currently active slave connection
*/ */
private $_slave = false; private $_slave = false;
...@@ -367,7 +407,20 @@ class Connection extends Component ...@@ -367,7 +407,20 @@ class Connection extends Component
*/ */
public function open() public function open()
{ {
if ($this->pdo === null) { if ($this->pdo !== null) {
return;
}
if (!empty($this->masters)) {
$db = $this->openFromPool($this->masters, $this->masterConfig);
if ($db !== null) {
$this->pdo = $db->pdo;
return;
} else {
throw new InvalidConfigException('None of the master DB servers is available.');
}
}
if (empty($this->dsn)) { if (empty($this->dsn)) {
throw new InvalidConfigException('Connection::dsn cannot be empty.'); throw new InvalidConfigException('Connection::dsn cannot be empty.');
} }
...@@ -380,8 +433,7 @@ class Connection extends Component ...@@ -380,8 +433,7 @@ class Connection extends Component
Yii::endProfile($token, __METHOD__); Yii::endProfile($token, __METHOD__);
} catch (\PDOException $e) { } catch (\PDOException $e) {
Yii::endProfile($token, __METHOD__); Yii::endProfile($token, __METHOD__);
throw new Exception($e->getMessage(), $e->errorInfo, (int) $e->getCode(), $e); throw new Exception($e->getMessage(), $e->errorInfo, (int)$e->getCode(), $e);
}
} }
} }
...@@ -643,7 +695,7 @@ class Connection extends Component ...@@ -643,7 +695,7 @@ class Connection extends Component
if (($pos = strpos($this->dsn, ':')) !== false) { if (($pos = strpos($this->dsn, ':')) !== false) {
$this->_driverName = strtolower(substr($this->dsn, 0, $pos)); $this->_driverName = strtolower(substr($this->dsn, 0, $pos));
} else { } else {
$this->_driverName = strtolower($this->getReadPdo()->getAttribute(PDO::ATTR_DRIVER_NAME)); $this->_driverName = strtolower($this->getSlavePdo()->getAttribute(PDO::ATTR_DRIVER_NAME));
} }
} }
return $this->_driverName; return $this->_driverName;
...@@ -662,12 +714,17 @@ class Connection extends Component ...@@ -662,12 +714,17 @@ class Connection extends Component
* Returns the PDO instance for read queries. * Returns the PDO instance for read queries.
* When [[enableSlave]] is true, one of the slaves will be used for read queries, and its PDO instance * When [[enableSlave]] is true, one of the slaves will be used for read queries, and its PDO instance
* will be returned by this method. If no slave is available, the [[writePdo]] will be returned. * will be returned by this method. If no slave is available, the [[writePdo]] will be returned.
* @return PDO the PDO instance for read queries. * @param boolean $fallbackToMaster whether to return a master PDO in case none of the slave connections is available.
* @return PDO the PDO instance for read queries. Null is returned if no server is available.
*/ */
public function getReadPdo() public function getSlavePdo($fallbackToMaster = true)
{ {
$db = $this->getSlave(); $db = $this->getSlave(false);
return $db ? $db->pdo : $this->getWritePdo(); if ($db === null) {
return $fallbackToMaster ? $this->getMasterPdo() : null;
} else {
return $db->pdo;
}
} }
/** /**
...@@ -675,35 +732,36 @@ class Connection extends Component ...@@ -675,35 +732,36 @@ class Connection extends Component
* This method will open the master DB connection and then return [[pdo]]. * This method will open the master DB connection and then return [[pdo]].
* @return PDO the PDO instance for write queries. * @return PDO the PDO instance for write queries.
*/ */
public function getWritePdo() public function getMasterPdo()
{ {
$this->open(); $this->open();
return $this->pdo; return $this->pdo;
} }
/** /**
* Returns the currently active slave. * Returns the currently active slave connection.
* If this method is called the first time, it will try to open a slave connection when [[enableSlave]] is true. * If this method is called the first time, it will try to open a slave connection when [[enableSlave]] is true.
* @param boolean $fallbackToMaster whether to return a master connection in case none of the slave connections is available.
* @return Connection the currently active slave. Null is returned if there is slave available. * @return Connection the currently active slave. Null is returned if there is slave available.
*/ */
public function getSlave() public function getSlave($fallbackToMaster = true)
{ {
if (!$this->enableSlave) { if (!$this->enableSlave) {
return null; return $fallbackToMaster ? $this : null;
} }
if ($this->_slave !== false) { if ($this->_slave === false) {
return $this->_slave; $this->_slave = $this->openFromPool($this->slaves, $this->slaveConfig);
} else {
return $this->_slave = $this->openSlave($this->slaves);
} }
return $this->_slave === null && $fallbackToMaster ? $this : $this->_slave;
} }
/** /**
* Executes the provided callback by using the master connection. * Executes the provided callback by using the master connection.
* *
* This method is provided so that you can temporarily force using the master connection to perform * This method is provided so that you can temporarily force using the master connection to perform
* DB operations. For example, * DB operations even if they are read queries. For example,
* *
* ```php * ```php
* $result = $db->useMaster(function ($db) { * $result = $db->useMaster(function ($db) {
...@@ -712,7 +770,7 @@ class Connection extends Component ...@@ -712,7 +770,7 @@ class Connection extends Component
* ``` * ```
* *
* @param callable $callback a PHP callable to be executed by this method. Its signature is * @param callable $callback a PHP callable to be executed by this method. Its signature is
* `function ($db)`. Its return value will be returned by this method. * `function (Connection $db)`. Its return value will be returned by this method.
* @return mixed the return value of the callback * @return mixed the return value of the callback
*/ */
public function useMaster(callable $callback) public function useMaster(callable $callback)
...@@ -725,49 +783,48 @@ class Connection extends Component ...@@ -725,49 +783,48 @@ class Connection extends Component
} }
/** /**
* Selects a slave and opens the connection. * Opens the connection to a server in the pool.
* @param array $slaves the list of candidate slave configurations * @param array $pool the list of connection configurations in the server pool
* @return Connection the opened slave connection, or null if no slave is available * @param array $sharedConfig the configuration common to those given in `$pool`.
* @throws InvalidConfigException if a slave configuration does not have "dsn" setting * @return Connection the opened DB connection, or null if no server is available
* @throws InvalidConfigException if a configuration does not specify "dsn"
*/ */
protected function openSlave($slaves) protected function openFromPool(array $pool, array $sharedConfig)
{ {
if (empty($slaves)) { if (empty($pool)) {
return null; return null;
} }
shuffle($slaves); if (!isset($sharedConfig['class'])) {
$sharedConfig['class'] = get_class($this);
}
$cache = is_string($this->serverStatusCache) ? Yii::$app->get($this->serverStatusCache, false) : $this->serverStatusCache;
$cache = is_string($this->slaveCache) ? Yii::$app->get($this->slaveCache, false) : $this->slaveCache; shuffle($pool);
foreach ($slaves as $config) { foreach ($pool as $config) {
$config = array_merge($sharedConfig, $config);
if (empty($config['dsn'])) { if (empty($config['dsn'])) {
throw new InvalidConfigException('One of the slave connections has an empty "dsn".'); throw new InvalidConfigException('The "dsn" option must be specified.');
} }
$key = [__METHOD__, $config['dsn']]; $key = [__METHOD__, $config['dsn']];
if ($cache instanceof Cache && $cache->get($key)) { if ($cache instanceof Cache && $cache->get($key)) {
// should not try this dead slave now // should not try this dead server now
continue; continue;
} }
if (!isset($config['class'])) { /* @var $db Connection */
$config['class'] = get_class($this); $db = Yii::createObject($config);
}
/* @var $slave Connection */
$slave = Yii::createObject($config);
if (!isset($slave->attributes[PDO::ATTR_TIMEOUT])) {
$slave->attributes[PDO::ATTR_TIMEOUT] = $this->slaveTimeout;
}
try { try {
$slave->open(); $db->open();
return $slave; return $db;
} catch (\Exception $e) { } catch (\Exception $e) {
Yii::warning("Slave ({$config['dsn']}) not available: " . $e->getMessage(), __METHOD__); Yii::warning("Connection ({$config['dsn']}) failed: " . $e->getMessage(), __METHOD__);
if ($cache instanceof Cache) { if ($cache instanceof Cache) {
$cache->set($key, 1, $this->slaveRetryInterval); $cache->set($key, 1, $this->serverRetryInterval);
} }
} }
} }
......
...@@ -369,7 +369,7 @@ abstract class Schema extends Object ...@@ -369,7 +369,7 @@ abstract class Schema extends Object
return $str; return $str;
} }
if (($value = $this->db->getReadPdo()->quote($str)) !== false) { if (($value = $this->db->getSlavePdo()->quote($str)) !== false) {
return $value; return $value;
} else { } else {
// the driver doesn't support quote (e.g. oci) // the driver doesn't support quote (e.g. oci)
......
...@@ -116,7 +116,7 @@ class Schema extends \yii\db\Schema ...@@ -116,7 +116,7 @@ class Schema extends \yii\db\Schema
return $str; return $str;
} }
$pdo = $this->db->getReadPdo(); $pdo = $this->db->getSlavePdo();
// workaround for broken PDO::quote() implementation in CUBRID 9.1.0 http://jira.cubrid.org/browse/APIS-658 // workaround for broken PDO::quote() implementation in CUBRID 9.1.0 http://jira.cubrid.org/browse/APIS-658
$version = $pdo->getAttribute(\PDO::ATTR_CLIENT_VERSION); $version = $pdo->getAttribute(\PDO::ATTR_CLIENT_VERSION);
...@@ -143,7 +143,7 @@ class Schema extends \yii\db\Schema ...@@ -143,7 +143,7 @@ class Schema extends \yii\db\Schema
*/ */
protected function loadTableSchema($name) protected function loadTableSchema($name)
{ {
$pdo = $this->db->getReadPdo(); $pdo = $this->db->getSlavePdo();
$tableInfo = $pdo->cubrid_schema(\PDO::CUBRID_SCH_TABLE, $name); $tableInfo = $pdo->cubrid_schema(\PDO::CUBRID_SCH_TABLE, $name);
...@@ -266,7 +266,7 @@ class Schema extends \yii\db\Schema ...@@ -266,7 +266,7 @@ class Schema extends \yii\db\Schema
*/ */
protected function findTableNames($schema = '') protected function findTableNames($schema = '')
{ {
$pdo = $this->db->getReadPdo(); $pdo = $this->db->getSlavePdo();
$tables =$pdo->cubrid_schema(\PDO::CUBRID_SCH_TABLE); $tables =$pdo->cubrid_schema(\PDO::CUBRID_SCH_TABLE);
$tableNames = []; $tableNames = [];
foreach ($tables as $table) { foreach ($tables as $table) {
......
...@@ -239,7 +239,7 @@ class QueryBuilder extends \yii\db\QueryBuilder ...@@ -239,7 +239,7 @@ class QueryBuilder extends \yii\db\QueryBuilder
*/ */
protected function isOldMssql() protected function isOldMssql()
{ {
$pdo = $this->db->getReadPdo(); $pdo = $this->db->getSlavePdo();
$version = preg_split("/\./", $pdo->getAttribute(\PDO::ATTR_SERVER_VERSION)); $version = preg_split("/\./", $pdo->getAttribute(\PDO::ATTR_SERVER_VERSION));
return $version[0] < 11; return $version[0] < 11;
} }
......
...@@ -137,7 +137,7 @@ class QueryBuilder extends \yii\db\QueryBuilder ...@@ -137,7 +137,7 @@ class QueryBuilder extends \yii\db\QueryBuilder
} }
// enable to have ability to alter several tables // enable to have ability to alter several tables
$this->db->getWritePdo()->setAttribute(\PDO::ATTR_EMULATE_PREPARES, true); $this->db->getMasterPdo()->setAttribute(\PDO::ATTR_EMULATE_PREPARES, true);
return $command; return $command;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment