<?php

namespace ShSo\Lacassa;

use Cassandra;
use ShSo\Lacassa\Query\Builder as QueryBuilder;
use ShSo\Lacassa\Query\Grammar as QueryGrammar;
use ShSo\Lacassa\Schema\Builder as SchemaBuilder;
use ShSo\Lacassa\Schema\Grammar as SchemaGrammar;
use ShSo\Lacassa\Query\Processor as QueryProcessor;
use Illuminate\Database\Connection as BaseConnection;

class Connection extends BaseConnection
{
    /**
     * The Cassandra connection handler.
     *
     * @var \Cassandra\DefaultSession
     */
    protected $connection;

    /**
     * Create a new database connection instance.
     *
     * @param array $config
     */
    public function __construct(array $config)
    {
        $this->config = $config;
        $this->db = $config['keyspace'];
        $this->connection = $this->createConnection($config);

        $this->useDefaultPostProcessor();
        $this->useDefaultSchemaGrammar();
        $this->useDefaultQueryGrammar();
    }

    /**
     * Begin a fluent query against a database table.
     *
     * @param string $table
     *
     * @return \ShSo\Lacassa\Query\Builder
     */
    public function table($table)
    {
        return $this->getDefaultQueryBuilder()->from($table);
    }

    /**
     * @return \ShSo\Lacassa\Schema\Builder
     */
    public function getSchemaBuilder()
    {
        return new SchemaBuilder($this);
    }

    /**
     * Returns the connection grammer.
     *
     * @return \ShSo\Lacassa\Schema\Grammar
     */
    public function getSchemaGrammar()
    {
        return new SchemaGrammar();
    }

    /**
     * return Cassandra object.
     *
     * @return \Cassandra\Session
     */
    public function getConnection()
    {
        return $this->connection ?? null;
    }

    /**
     * Create a new Cassandra connection.
     *
     * @param array $config
     *
     * @return \Cassandra\Session
     */
    protected function createConnection(array $config)
    {
        $builder = Cassandra::cluster()
            ->withContactPoints($config['host'] ?? '127.0.0.1')
            ->withPort(intval($config['port'] ?? '7000'));
        if (array_key_exists('page_size', $config) && !empty($config['page_size'])) {
            $builder->withDefaultPageSize(intval($config['page_size'] ?? '5000'));
        }
        if (array_key_exists('consistency', $config) && in_array(strtoupper($config['consistency']), [
                'ANY', 'ONE', 'TWO', 'THREE', 'QOURUM', 'ALL', 'SERIAL',
                'LOCAL_QUORUM', 'EACH_QOURUM', 'LOCAL_SERIAL', 'LOCAL_ONE',
            ])) {
            $consistency = constant('\Cassandra::CONSISTENCY_'.strtoupper($config['consistency']));
            $builder->withDefaultConsistency($consistency);
        }
        if (array_key_exists('timeout', $config) && !empty($config['timeout'])) {
            $builder->withDefaultTimeout(intval($config['timeout']));
        }
        if (array_key_exists('connect_timeout', $config) && !empty($config['connect_timeout'])) {
            $builder->withConnectTimeout(floatval($config['connect_timeout']));
        }
        if (array_key_exists('request_timeout', $config) && !empty($config['request_timeout'])) {
            $builder->withRequestTimeout(floatval($config['request_timeout']));
        }
        if (array_key_exists('username', $config) && array_key_exists('password', $config)) {
            $builder->withCredentials($config['username'], $config['password']);
        }

        return $builder->build()->connect($config['keyspace']);
    }

    /**
     * @return void
     */
    public function disconnect()
    {
        $this->connection->close();
        unset($this->connection);
    }

    /**
     * @return string
     */
    public function getDriverName()
    {
        return 'cassandra';
    }

    /**
     * @return \ShSo\Lacassa\Query\Processor
     */
    protected function getDefaultPostProcessor()
    {
        return new QueryProcessor();
    }

    /**
     * @return \ShSo\Lacassa\Query\Builder
     */
    protected function getDefaultQueryBuilder()
    {
        return new QueryBuilder($this, $this->getPostProcessor());
    }

    /**
     * @return \ShSo\Lacassa\Query\Grammar
     */
    protected function getDefaultQueryGrammar()
    {
        return new QueryGrammar();
    }

    /**
     * @return \ShSo\Lacassa\Schema\Grammar
     */
    protected function getDefaultSchemaGrammar()
    {
        return new SchemaGrammar();
    }

    public function affectingStatement($query, $bindings = [])
    {
        return $this->statement($query, $bindings);
    }

    public function statement($query, $bindings = [])
    {
        return $this->run($query, $bindings, function ($query, $bindings) {
            if ($this->pretending()) {
                return true;
            }

            $statement = $this->connection->prepare($query);
            $this->recordsHaveBeenModified();

            return $this->connection->execute($statement, ['arguments' => $bindings]);
        });
    }

    /**
     * Reconnect to the database if a PDO connection is missing.
     *
     * @return void
     */
    protected function reconnectIfMissingConnection()
    {
        if (is_null($this->connection)) {
            $this->connection = $this->createConnection($this->config);
        }
    }

    /**
     * Dynamically pass methods to the connection.
     *
     * @param string $method
     * @param array $parameters
     *
     * @return mixed
     */
    public function __call($method, $parameters)
    {
        return call_user_func_array([$this->connection, $method], $parameters);
    }
}