<?php
namespace ShSo\Lacassa;
use Cassandra;
use ShSo\Lacassa\Query\Builder as QueryBuilder;
use ShSo\Lacassa\Query\Grammar as QueryGrammar;
use ShSo\Lacassa\Schema\Builder as SchemaBuilder;
use ShSo\Lacassa\Schema\Grammar as SchemaGrammar;
use ShSo\Lacassa\Query\Processor as QueryProcessor;
use Illuminate\Database\Connection as BaseConnection;
class Connection extends BaseConnection
{
/**
* The Cassandra connection handler.
*
* @var \Cassandra\DefaultSession
*/
protected $connection;
/**
* Create a new database connection instance.
*
* @param array $config
*/
public function __construct(array $config)
{
$this->config = $config;
$this->db = $config['keyspace'];
$this->connection = $this->createConnection($config);
$this->useDefaultPostProcessor();
$this->useDefaultSchemaGrammar();
$this->useDefaultQueryGrammar();
}
/**
* Begin a fluent query against a database table.
*
* @param string $table
*
* @return \ShSo\Lacassa\Query\Builder
*/
public function table($table)
{
return $this->getDefaultQueryBuilder()->from($table);
}
/**
* @return \ShSo\Lacassa\Schema\Builder
*/
public function getSchemaBuilder()
{
return new SchemaBuilder($this);
}
/**
* Returns the connection grammer.
*
* @return \ShSo\Lacassa\Schema\Grammar
*/
public function getSchemaGrammar()
{
return new SchemaGrammar();
}
/**
* return Cassandra object.
*
* @return \Cassandra\Session
*/
public function getConnection()
{
return $this->connection ?? null;
}
/**
* Create a new Cassandra connection.
*
* @param array $config
*
* @return \Cassandra\Session
*/
protected function createConnection(array $config)
{
$builder = Cassandra::cluster()
->withContactPoints($config['host'] ?? '127.0.0.1')
->withPort(intval($config['port'] ?? '7000'));
if (array_key_exists('page_size', $config) && !empty($config['page_size'])) {
$builder->withDefaultPageSize(intval($config['page_size'] ?? '5000'));
}
if (array_key_exists('consistency', $config) && in_array(strtoupper($config['consistency']), [
'ANY', 'ONE', 'TWO', 'THREE', 'QOURUM', 'ALL', 'SERIAL',
'LOCAL_QUORUM', 'EACH_QOURUM', 'LOCAL_SERIAL', 'LOCAL_ONE',
])) {
$consistency = constant('\Cassandra::CONSISTENCY_'.strtoupper($config['consistency']));
$builder->withDefaultConsistency($consistency);
}
if (array_key_exists('timeout', $config) && !empty($config['timeout'])) {
$builder->withDefaultTimeout(intval($config['timeout']));
}
if (array_key_exists('connect_timeout', $config) && !empty($config['connect_timeout'])) {
$builder->withConnectTimeout(floatval($config['connect_timeout']));
}
if (array_key_exists('request_timeout', $config) && !empty($config['request_timeout'])) {
$builder->withRequestTimeout(floatval($config['request_timeout']));
}
if (array_key_exists('username', $config) && array_key_exists('password', $config)) {
$builder->withCredentials($config['username'], $config['password']);
}
return $builder->build()->connect($config['keyspace']);
}
/**
* @return void
*/
public function disconnect()
{
$this->connection->close();
unset($this->connection);
}
/**
* @return string
*/
public function getDriverName()
{
return 'cassandra';
}
/**
* @return \ShSo\Lacassa\Query\Processor
*/
protected function getDefaultPostProcessor()
{
return new QueryProcessor();
}
/**
* @return \ShSo\Lacassa\Query\Builder
*/
protected function getDefaultQueryBuilder()
{
return new QueryBuilder($this, $this->getPostProcessor());
}
/**
* @return \ShSo\Lacassa\Query\Grammar
*/
protected function getDefaultQueryGrammar()
{
return new QueryGrammar();
}
/**
* @return \ShSo\Lacassa\Schema\Grammar
*/
protected function getDefaultSchemaGrammar()
{
return new SchemaGrammar();
}
public function affectingStatement($query, $bindings = [])
{
return $this->statement($query, $bindings);
}
public function statement($query, $bindings = [])
{
return $this->run($query, $bindings, function ($query, $bindings) {
if ($this->pretending()) {
return true;
}
$statement = $this->connection->prepare($query);
$this->recordsHaveBeenModified();
return $this->connection->execute($statement, ['arguments' => $bindings]);
});
}
/**
* Reconnect to the database if a PDO connection is missing.
*
* @return void
*/
protected function reconnectIfMissingConnection()
{
if (is_null($this->connection)) {
$this->connection = $this->createConnection($this->config);
}
}
/**
* Dynamically pass methods to the connection.
*
* @param string $method
* @param array $parameters
*
* @return mixed
*/
public function __call($method, $parameters)
{
return call_user_func_array([$this->connection, $method], $parameters);
}
}