Commit ea7bfa19 by Qiang Xue

Merge pull request #4274 from yiisoft/feature-multiconnection

Support for separating Read/Write Database
parents cdb2b44f 919ab232
......@@ -274,8 +274,8 @@ We wrap the execution of all queries in a try-catch-block to be able to handle e
We call [[yii\db\Transaction::commit()|commit()]] on success to commit the transaction and
[[yii\db\Transaction::rollBack()|rollBack()]] in case of an error. This will revert the effect of all queries
that have been executed inside of the transaction.
`throw $e` is used to re-throw the exception in case we can not handle the error ourselfs and deligate it
to some other code or the yii errorhandler.
`throw $e` is used to re-throw the exception in case we can not handle the error ourselves and delegate it
to some other code or the yii error handler.
It is also possible to nest multiple transactions, if needed:
......@@ -331,13 +331,173 @@ At the time of this writing affected DBMS are MSSQL and SQLite.
> Note: SQLite only supports two isolation levels, so you can only use `READ UNCOMMITTED` and `SERIALIZABLE`.
Usage of other levels will result in an exception to be thrown.
> Note: PostgreSQL does not allow settin the isolation level before the transaction starts so you can not
> Note: PostgreSQL does not allow setting the isolation level before the transaction starts so you can not
specify the isolation level directly when starting the transaction.
You have to call [[yii\db\Transaction::setIsolationLevel()]] in this case after the transaction has started.
[isolation levels]: http://en.wikipedia.org/wiki/Isolation_%28database_systems%29#Isolation_levels
Replication and Read-Write Splitting
------------------------------------
Many DBMS support [database replication](http://en.wikipedia.org/wiki/Replication_(computing)#Database_replication)
to get better database availability and faster server response time. With database replication, data are replicated
from the so-called *master servers* to *slave servers*. All writes and updates must take place on the master servers,
while reads may take place on the slave servers.
To take advantage of database replication and achieve read-write splitting, you can configure a [[yii\db\Connection]]
component like the following:
```php
[
'class' => 'yii\db\Connection',
// configuration for the master
'dsn' => 'dsn for master server',
'username' => 'master',
'password' => '',
// common configuration for slaves
'slaveConfig' => [
'username' => 'slave',
'password' => '',
'attributes' => [
// use a smaller connection timeout
PDO::ATTR_TIMEOUT => 10,
],
],
// list of slave configurations
'slaves' => [
['dsn' => 'dsn for slave server 1'],
['dsn' => 'dsn for slave server 2'],
['dsn' => 'dsn for slave server 3'],
['dsn' => 'dsn for slave server 4'],
],
]
```
The above configuration specifies a setup with a single master and multiple slaves. One of the slaves will
be connected and used to perform read queries, while the master will be used to perform write queries.
Such read-write splitting is accomplished automatically with this configuration. For example,
```php
// create a Connection instance using the above configuration
$db = Yii::createObject($config);
// query against one of the slaves
$rows = $db->createCommand('SELECT * FROM user LIMIT 10')->queryAll();
// query against the master
$db->createCommand("UPDATE user SET username='demo' WHERE id=1")->execute();
```
> Info: Queries performed by calling [[yii\db\Command::execute()]] are considered as write queries, while
all other queries done through one of the "query" method of [[yii\db\Command]] are read queries.
You can get the currently active slave connection via `$db->slave`.
The `Connection` component supports load balancing and failover about slaves.
When performing a read query for the first time, the `Connection` component will randomly pick a slave and
try connecting to it. If the slave is found "dead", it will try another one. If none of the slaves is available,
it will connect to the master. By configuring a [[yii\db\Connection::serverStatusCache|server status cache]],
a "dead" server can be remembered so that it will not be tried again during a
[[yii\db\Connection::serverRetryInterval|certain period of time]].
> Info: In the above configuration, a connection timeout of 10 seconds is specified for every slave.
This means if a slave cannot be reached in 10 seconds, it is considered as "dead". You can adjust this parameter
based on your actual environment.
You can also configure multiple masters with multiple slaves. For example,
```php
[
'class' => 'yii\db\Connection',
// common configuration for masters
'masterConfig' => [
'username' => 'master',
'password' => '',
'attributes' => [
// use a smaller connection timeout
PDO::ATTR_TIMEOUT => 10,
],
],
// list of master configurations
'masters' => [
['dsn' => 'dsn for master server 1'],
['dsn' => 'dsn for master server 2'],
],
// common configuration for slaves
'slaveConfig' => [
'username' => 'slave',
'password' => '',
'attributes' => [
// use a smaller connection timeout
PDO::ATTR_TIMEOUT => 10,
],
],
// list of slave configurations
'slaves' => [
['dsn' => 'dsn for slave server 1'],
['dsn' => 'dsn for slave server 2'],
['dsn' => 'dsn for slave server 3'],
['dsn' => 'dsn for slave server 4'],
],
]
```
The above configuration specifies two masters and four slaves. The `Connection` component also supports
load balancing and failover about masters, like that about slaves. A difference is that in case none of
the masters is available, an exception will be thrown.
> Note: When you use the [[yii\db\Connection::masters|masters]] property to configure one or multiple
masters, all other properties for specifying a database connection (e.g. `dsn`, `username`, `password`)
with the `Connection` object itself will be ignored.
By default, transactions use the master connection. And within a transaction, all DB operations will use
the master connection. For example,
```php
// the transaction is started on the master connection
$transaction = $db->beginTransaction();
try {
// both queries are performed against the master
$rows = $db->createCommand('SELECT * FROM user LIMIT 10')->queryAll();
$db->createCommand("UPDATE user SET username='demo' WHERE id=1")->execute();
$transaction->commit();
} catch(\Exception $e) {
$transaction->rollBack();
throw $e;
}
```
If you want to start a transaction with the slave connection, you should explicitly do so, like the following:
```php
$transaction = $db->slave->beginTransaction();
```
Sometimes, you may want to force using the master connection to perform a read query. This can be achieved
with the `useMaster()` method:
```php
$rows = $db->useMaster(function ($db) {
return $db->createCommand('SELECT * FROM user LIMIT 10')->queryAll();
});
```
You may also directly set `$db->enableSlaves` to be false to direct all queries to the master connection.
Working with database schema
----------------------------
......
......@@ -191,7 +191,7 @@ class Command extends \yii\db\Command
* @param string $index the name of the index from which to take the text processing settings
* @param string $text the text to break down to keywords.
* @param boolean $fetchStatistic whether to return document and hit occurrence statistics
* @return string the SQL statement for call keywords.
* @return static the command object itself
*/
public function callKeywords($index, $text, $fetchStatistic = false)
{
......
......@@ -109,7 +109,6 @@ class Connection extends \yii\db\Connection
*/
public function createCommand($sql = null, $params = [])
{
$this->open();
$command = new Command([
'db' => $this,
'sql' => $sql,
......
......@@ -323,12 +323,11 @@ class Schema extends Object
*/
public function quoteValue($str)
{
if (!is_string($str)) {
if (is_string($str)) {
return $this->db->getSlavePdo()->quote($str);
} else {
return $str;
}
$this->db->open();
return $this->db->pdo->quote($str);
}
/**
......@@ -519,4 +518,15 @@ class Schema extends Object
throw new Exception($message, $errorInfo, (int) $e->getCode(), $e);
}
}
/**
* Returns a value indicating whether a SQL statement is for read purpose.
* @param string $sql the SQL statement
* @return boolean whether a SQL statement is for read purpose.
*/
public function isReadQuery($sql)
{
$pattern = '/^\s*(SELECT|SHOW|DESCRIBE)\b/i';
return preg_match($pattern, $sql) > 0;
}
}
......@@ -68,11 +68,15 @@ class Command extends \yii\base\Component
public $fetchMode = \PDO::FETCH_ASSOC;
/**
* @var array the parameters (name => value) that are bound to the current PDO statement.
* This property is maintained by methods such as [[bindValue()]].
* Do not modify it directly.
* This property is maintained by methods such as [[bindValue()]]. It is mainly provided for logging purpose
* and is used to generate [[rawSql]]. Do not modify it directly.
*/
public $params = [];
/**
* @var array pending parameters to be bound to the current PDO statement.
*/
private $_pendingParams = [];
/**
* @var string the SQL statement that this command represents
*/
private $_sql;
......@@ -97,6 +101,7 @@ class Command extends \yii\base\Component
if ($sql !== $this->_sql) {
$this->cancel();
$this->_sql = $this->db->quoteSql($sql);
$this->_pendingParams = [];
$this->params = [];
}
......@@ -143,19 +148,34 @@ class Command extends \yii\base\Component
* this may improve performance.
* For SQL statement with binding parameters, this method is invoked
* automatically.
* @param boolean $forRead whether this method is called for a read query. If null, it means
* the SQL statement should be used to determine whether it is for read or write.
* @throws Exception if there is any DB error
*/
public function prepare()
public function prepare($forRead = null)
{
if ($this->pdoStatement == null) {
$sql = $this->getSql();
try {
$this->pdoStatement = $this->db->pdo->prepare($sql);
} catch (\Exception $e) {
$message = $e->getMessage() . "\nFailed to prepare SQL: $sql";
$errorInfo = $e instanceof \PDOException ? $e->errorInfo : null;
throw new Exception($message, $errorInfo, (int) $e->getCode(), $e);
}
if ($this->pdoStatement) {
return;
}
$sql = $this->getSql();
if ($this->db->getTransaction()) {
// master is in a transaction. use the same connection.
$forRead = false;
}
if ($forRead || $forRead === null && $this->db->getSchema()->isReadQuery($sql)) {
$pdo = $this->db->getSlavePdo();
} else {
$pdo = $this->db->getMasterPdo();
}
try {
$this->pdoStatement = $pdo->prepare($sql);
} catch (\Exception $e) {
$message = $e->getMessage() . "\nFailed to prepare SQL: $sql";
$errorInfo = $e instanceof \PDOException ? $e->errorInfo : null;
throw new Exception($message, $errorInfo, (int) $e->getCode(), $e);
}
}
......@@ -184,6 +204,9 @@ class Command extends \yii\base\Component
public function bindParam($name, &$value, $dataType = null, $length = null, $driverOptions = null)
{
$this->prepare();
$this->bindPendingParams();
if ($dataType === null) {
$dataType = $this->db->getSchema()->getPdoType($value);
}
......@@ -200,6 +223,18 @@ class Command extends \yii\base\Component
}
/**
* Binds pending parameters that were registered via [[bindValue()]] and [[bindValues()]].
* Note that this method requires an active [[pdoStatement]].
*/
protected function bindPendingParams()
{
foreach ($this->_pendingParams as $name => $value) {
$this->pdoStatement->bindValue($name, $value[0], $value[1]);
}
$this->_pendingParams = [];
}
/**
* Binds a value to a parameter.
* @param string|integer $name Parameter identifier. For a prepared statement
* using named placeholders, this will be a parameter name of
......@@ -212,11 +247,10 @@ class Command extends \yii\base\Component
*/
public function bindValue($name, $value, $dataType = null)
{
$this->prepare();
if ($dataType === null) {
$dataType = $this->db->getSchema()->getPdoType($value);
}
$this->pdoStatement->bindValue($name, $value, $dataType);
$this->_pendingParams[$name] = [$value, $dataType];
$this->params[$name] = $value;
return $this;
......@@ -235,16 +269,17 @@ class Command extends \yii\base\Component
*/
public function bindValues($values)
{
if (!empty($values)) {
$this->prepare();
foreach ($values as $name => $value) {
if (is_array($value)) {
$type = $value[1];
$value = $value[0];
} else {
$type = $this->db->getSchema()->getPdoType($value);
}
$this->pdoStatement->bindValue($name, $value, $type);
if (empty($values)) {
return $this;
}
foreach ($values as $name => $value) {
if (is_array($value)) {
$this->_pendingParams[$name] = $value;
$this->params[$name] = $value[0];
} else {
$type = $this->db->getSchema()->getPdoType($value);
$this->_pendingParams[$name] = [$value, $type];
$this->params[$name] = $value;
}
}
......@@ -271,11 +306,13 @@ class Command extends \yii\base\Component
return 0;
}
$this->prepare(false);
$this->bindPendingParams();
$token = $rawSql;
try {
Yii::beginProfile($token, __METHOD__);
$this->prepare();
$this->pdoStatement->execute();
$n = $this->pdoStatement->rowCount();
......@@ -390,11 +427,13 @@ class Command extends \yii\base\Component
}
}
$this->prepare(true);
$this->bindPendingParams();
$token = $rawSql;
try {
Yii::beginProfile($token, 'yii\db\Command::query');
$this->prepare();
$this->pdoStatement->execute();
if ($method === '') {
......
......@@ -369,11 +369,10 @@ abstract class Schema extends Object
return $str;
}
$this->db->open();
if (($value = $this->db->pdo->quote($str)) !== false) {
if (($value = $this->db->getSlavePdo()->quote($str)) !== false) {
return $value;
} else { // the driver doesn't support quote (e.g. oci)
} else {
// the driver doesn't support quote (e.g. oci)
return "'" . addcslashes(str_replace("'", "''", $str), "\000\n\r\\\032") . "'";
}
}
......@@ -520,4 +519,15 @@ abstract class Schema extends Object
throw new $exceptionClass($message, $errorInfo, (int) $e->getCode(), $e);
}
}
/**
* Returns a value indicating whether a SQL statement is for read purpose.
* @param string $sql the SQL statement
* @return boolean whether a SQL statement is for read purpose.
*/
public function isReadQuery($sql)
{
$pattern = '/^\s*(SELECT|SHOW|DESCRIBE)\b/i';
return preg_match($pattern, $sql) > 0;
}
}
......@@ -116,13 +116,14 @@ class Schema extends \yii\db\Schema
return $str;
}
$this->db->open();
$pdo = $this->db->getSlavePdo();
// workaround for broken PDO::quote() implementation in CUBRID 9.1.0 http://jira.cubrid.org/browse/APIS-658
$version = $this->db->pdo->getAttribute(\PDO::ATTR_CLIENT_VERSION);
$version = $pdo->getAttribute(\PDO::ATTR_CLIENT_VERSION);
if (version_compare($version, '8.4.4.0002', '<') || $version[0] == '9' && version_compare($version, '9.2.0.0002', '<=')) {
return "'" . addcslashes(str_replace("'", "''", $str), "\000\n\r\\\032") . "'";
} else {
return $this->db->pdo->quote($str);
return $pdo->quote($str);
}
}
......@@ -142,8 +143,9 @@ class Schema extends \yii\db\Schema
*/
protected function loadTableSchema($name)
{
$this->db->open();
$tableInfo = $this->db->pdo->cubrid_schema(\PDO::CUBRID_SCH_TABLE, $name);
$pdo = $this->db->getSlavePdo();
$tableInfo = $pdo->cubrid_schema(\PDO::CUBRID_SCH_TABLE, $name);
if (!isset($tableInfo[0]['NAME'])) {
return null;
......@@ -160,7 +162,7 @@ class Schema extends \yii\db\Schema
$table->columns[$column->name] = $column;
}
$primaryKeys = $this->db->pdo->cubrid_schema(\PDO::CUBRID_SCH_PRIMARY_KEY, $table->name);
$primaryKeys = $pdo->cubrid_schema(\PDO::CUBRID_SCH_PRIMARY_KEY, $table->name);
foreach ($primaryKeys as $key) {
$column = $table->columns[$key['ATTR_NAME']];
$column->isPrimaryKey = true;
......@@ -170,7 +172,7 @@ class Schema extends \yii\db\Schema
}
}
$foreignKeys = $this->db->pdo->cubrid_schema(\PDO::CUBRID_SCH_IMPORTED_KEYS, $table->name);
$foreignKeys = $pdo->cubrid_schema(\PDO::CUBRID_SCH_IMPORTED_KEYS, $table->name);
foreach ($foreignKeys as $key) {
if (isset($table->foreignKeys[$key['FK_NAME']])) {
$table->foreignKeys[$key['FK_NAME']][$key['FKCOLUMN_NAME']] = $key['PKCOLUMN_NAME'];
......@@ -264,8 +266,8 @@ class Schema extends \yii\db\Schema
*/
protected function findTableNames($schema = '')
{
$this->db->open();
$tables = $this->db->pdo->cubrid_schema(\PDO::CUBRID_SCH_TABLE);
$pdo = $this->db->getSlavePdo();
$tables =$pdo->cubrid_schema(\PDO::CUBRID_SCH_TABLE);
$tableNames = [];
foreach ($tables as $table) {
// do not list system tables
......
......@@ -239,8 +239,8 @@ class QueryBuilder extends \yii\db\QueryBuilder
*/
protected function isOldMssql()
{
$this->db->open();
$version = preg_split("/\./", $this->db->pdo->getAttribute(\PDO::ATTR_SERVER_VERSION));
$pdo = $this->db->getSlavePdo();
$version = preg_split("/\./", $pdo->getAttribute(\PDO::ATTR_SERVER_VERSION));
return $version[0] < 11;
}
}
......@@ -8,6 +8,7 @@
namespace yii\db\oci;
use yii\base\InvalidParamException;
use yii\db\Connection;
/**
* QueryBuilder is the query builder for Oracle databases.
......@@ -132,8 +133,10 @@ EOD;
if ($value !== null) {
$value = (int) $value;
} else {
$value = (int) $this->db->createCommand("SELECT MAX(\"{$tableSchema->primaryKey}\") FROM \"{$tableSchema->name}\"")->queryScalar();
$value++;
// use master connection to get the biggest PK value
$value = $this->db->useMaster(function (Connection $db) use ($tableSchema) {
return $db->createCommand("SELECT MAX(\"{$tableSchema->primaryKey}\") FROM \"{$tableSchema->name}\"")->queryScalar();
}) + 1;
}
return "DROP SEQUENCE \"{$tableSchema->name}_SEQ\";"
......
......@@ -8,6 +8,7 @@
namespace yii\db\oci;
use yii\base\InvalidCallException;
use yii\db\Connection;
use yii\db\TableSchema;
use yii\db\ColumnSchema;
......@@ -195,7 +196,10 @@ EOD;
public function getLastInsertID($sequenceName = '')
{
if ($this->db->isActive) {
return $this->db->createCommand("SELECT {$sequenceName}.CURRVAL FROM DUAL")->queryScalar();
// get the last insert id from the master connection
return $this->db->useMaster(function (Connection $db) use ($sequenceName) {
return $db->createCommand("SELECT {$sequenceName}.CURRVAL FROM DUAL")->queryScalar();
});
} else {
throw new InvalidCallException('DB Connection is not active.');
}
......
......@@ -136,8 +136,8 @@ class QueryBuilder extends \yii\db\QueryBuilder
$command .= "ALTER TABLE $tableName $enable TRIGGER ALL; ";
}
#enable to have ability to alter several tables
$this->db->pdo->setAttribute(\PDO::ATTR_EMULATE_PREPARES, true);
// enable to have ability to alter several tables
$this->db->getMasterPdo()->setAttribute(\PDO::ATTR_EMULATE_PREPARES, true);
return $command;
}
......
......@@ -7,6 +7,7 @@
namespace yii\db\sqlite;
use yii\db\Connection;
use yii\db\Exception;
use yii\base\InvalidParamException;
use yii\base\NotSupportedException;
......@@ -120,7 +121,9 @@ class QueryBuilder extends \yii\db\QueryBuilder
if ($value === null) {
$key = reset($table->primaryKey);
$tableName = $db->quoteTableName($tableName);
$value = $db->createCommand("SELECT MAX('$key') FROM $tableName")->queryScalar();
$value = $this->db->useMaster(function (Connection $db) use ($key, $tableName) {
return $db->createCommand("SELECT MAX('$key') FROM $tableName")->queryScalar();
});
} else {
$value = (int) $value - 1;
}
......
......@@ -44,26 +44,35 @@ abstract class DatabaseTestCase extends TestCase
if (!$reset && $this->db) {
return $this->db;
}
$db = new \yii\db\Connection;
$db->dsn = $this->database['dsn'];
if (isset($this->database['username'])) {
$db->username = $this->database['username'];
$db->password = $this->database['password'];
$config = $this->database;
if (isset($config['fixture'])) {
$fixture = $config['fixture'];
unset($config['fixture']);
} else {
$fixture = null;
}
if (isset($this->database['attributes'])) {
$db->attributes = $this->database['attributes'];
return $this->db = $this->prepareDatabase($config, $fixture, $open);
}
public function prepareDatabase($config, $fixture, $open = true)
{
if (!isset($config['class'])) {
$config['class'] = 'yii\db\Connection';
}
if ($open) {
$db->open();
$lines = explode(';', file_get_contents($this->database['fixture']));
/* @var $db \yii\db\Connection */
$db = \Yii::createObject($config);
if (!$open) {
return $db;
}
$db->open();
if ($fixture !== null) {
$lines = explode(';', file_get_contents($fixture));
foreach ($lines as $line) {
if (trim($line) !== '') {
$db->pdo->exec($line);
}
}
}
$this->db = $db;
return $db;
}
}
<?php
namespace yiiunit\framework\db\sqlite;
use yii\db\Connection;
use yii\db\Transaction;
use yiiunit\framework\db\ConnectionTest;
use yiiunit\data\ar\ActiveRecord;
use yiiunit\data\ar\Customer;
/**
* @group db
......@@ -57,4 +60,86 @@ class SqliteConnectionTest extends ConnectionTest
$transaction = $connection->beginTransaction(Transaction::SERIALIZABLE);
$transaction->rollBack();
}
public function testMasterSlave()
{
$counts = [[0, 2], [1, 2], [2, 2]];
foreach ($counts as $count) {
list($masterCount, $slaveCount) = $count;
$db = $this->prepareMasterSlave($masterCount, $slaveCount);
$this->assertTrue($db->getSlave() instanceof Connection);
$this->assertTrue($db->getSlave()->isActive);
$this->assertFalse($db->isActive);
// test SELECT uses slave
$this->assertEquals(2, $db->createCommand('SELECT COUNT(*) FROM profile')->queryScalar());
$this->assertFalse($db->isActive);
// test UPDATE uses master
$db->createCommand("UPDATE profile SET description='test' WHERE id=1")->execute();
$this->assertTrue($db->isActive);
$this->assertNotEquals('test', $db->createCommand("SELECT description FROM profile WHERE id=1")->queryScalar());
$result = $db->useMaster(function (Connection $db) {
return $db->createCommand("SELECT description FROM profile WHERE id=1")->queryScalar();
});
$this->assertEquals('test', $result);
// test ActiveRecord read/write split
ActiveRecord::$db = $db = $this->prepareMasterSlave($masterCount, $slaveCount);
$this->assertFalse($db->isActive);
$customer = Customer::findOne(1);
$this->assertTrue($customer instanceof Customer);
$this->assertEquals('user1', $customer->name);
$this->assertFalse($db->isActive);
$customer->name = 'test';
$customer->save();
$this->assertTrue($db->isActive);
$customer = Customer::findOne(1);
$this->assertTrue($customer instanceof Customer);
$this->assertEquals('user1', $customer->name);
$result = $db->useMaster(function () {
return Customer::findOne(1)->name;
});
$this->assertEquals('test', $result);
}
}
/**
* @param integer $masterCount
* @param integer $slaveCount
* @return Connection
*/
protected function prepareMasterSlave($masterCount, $slaveCount)
{
$databases = self::getParam('databases');
$fixture = $databases[$this->driverName]['fixture'];
$basePath = \Yii::getAlias('@yiiunit/runtime');
$config = [
'class' => 'yii\db\Connection',
'dsn' => 'sqlite:memory:',
];
$this->prepareDatabase($config, $fixture)->close();
for ($i = 0; $i < $masterCount; ++$i) {
$master = ['dsn' => "sqlite:$basePath/yii2test_master{$i}.sq3"];
$db = $this->prepareDatabase($master, $fixture);
$db->close();
$config['masters'][] = $master;
}
for ($i = 0; $i < $slaveCount; ++$i) {
$slave = ['dsn' => "sqlite:$basePath/yii2test_slave{$i}.sq3"];
$db = $this->prepareDatabase($slave, $fixture);
$db->close();
$config['slaves'][] = $slave;
}
return \Yii::createObject($config);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment