vendor/ruflin/elastica/src/Client.php line 538

Open in your IDE?
  1. <?php
  2. namespace Elastica;
  3. use Elastica\Bulk\Action;
  4. use Elastica\Bulk\ResponseSet;
  5. use Elastica\Exception\Bulk\ResponseException as BulkResponseException;
  6. use Elastica\Exception\ClientException;
  7. use Elastica\Exception\ConnectionException;
  8. use Elastica\Exception\InvalidException;
  9. use Elastica\Exception\ResponseException;
  10. use Elastica\Script\AbstractScript;
  11. use Elasticsearch\Endpoints\AbstractEndpoint;
  12. use Elasticsearch\Endpoints\ClosePointInTime;
  13. use Elasticsearch\Endpoints\Indices\ForceMerge;
  14. use Elasticsearch\Endpoints\Indices\Refresh;
  15. use Elasticsearch\Endpoints\Update;
  16. use Psr\Log\LoggerInterface;
  17. use Psr\Log\NullLogger;
  18. /**
  19. * Client to connect the the elasticsearch server.
  20. *
  21. * @author Nicolas Ruflin <spam@ruflin.com>
  22. */
  23. class Client
  24. {
  25. /**
  26. * @var ClientConfiguration
  27. */
  28. protected $_config;
  29. /**
  30. * @var callable
  31. */
  32. protected $_callback;
  33. /**
  34. * @var Connection\ConnectionPool
  35. */
  36. protected $_connectionPool;
  37. /**
  38. * @var Request|null
  39. */
  40. protected $_lastRequest;
  41. /**
  42. * @var Response|null
  43. */
  44. protected $_lastResponse;
  45. /**
  46. * @var LoggerInterface
  47. */
  48. protected $_logger;
  49. /**
  50. * @var string
  51. */
  52. protected $_version;
  53. /**
  54. * Creates a new Elastica client.
  55. *
  56. * @param array|string $config OPTIONAL Additional config or DSN of options
  57. * @param callable|null $callback OPTIONAL Callback function which can be used to be notified about errors (for example connection down)
  58. *
  59. * @throws InvalidException
  60. */
  61. public function __construct($config = [], ?callable $callback = null, ?LoggerInterface $logger = null)
  62. {
  63. if (\is_string($config)) {
  64. $configuration = ClientConfiguration::fromDsn($config);
  65. } elseif (\is_array($config)) {
  66. $configuration = ClientConfiguration::fromArray($config);
  67. } else {
  68. throw new InvalidException('Config parameter must be an array or a string.');
  69. }
  70. $this->_config = $configuration;
  71. $this->_callback = $callback;
  72. $this->_logger = $logger ?? new NullLogger();
  73. $this->_initConnections();
  74. }
  75. /**
  76. * Get current version.
  77. *
  78. * @throws ClientException
  79. * @throws ConnectionException
  80. * @throws ResponseException
  81. */
  82. public function getVersion(): string
  83. {
  84. if ($this->_version) {
  85. return $this->_version;
  86. }
  87. $data = $this->request('/')->getData();
  88. return $this->_version = $data['version']['number'];
  89. }
  90. /**
  91. * Sets specific config values (updates and keeps default values).
  92. *
  93. * @param array $config Params
  94. */
  95. public function setConfig(array $config): self
  96. {
  97. foreach ($config as $key => $value) {
  98. $this->_config->set($key, $value);
  99. }
  100. return $this;
  101. }
  102. /**
  103. * Returns a specific config key or the whole config array if not set.
  104. *
  105. * @throws InvalidException if the given key is not found in the configuration
  106. *
  107. * @return array|bool|string
  108. */
  109. public function getConfig(string $key = '')
  110. {
  111. return $this->_config->get($key);
  112. }
  113. /**
  114. * Sets / overwrites a specific config value.
  115. *
  116. * @param mixed $value Value
  117. */
  118. public function setConfigValue(string $key, $value): self
  119. {
  120. return $this->setConfig([$key => $value]);
  121. }
  122. /**
  123. * @param array|string $keys config key or path of config keys
  124. * @param mixed $default default value will be returned if key was not found
  125. *
  126. * @return mixed
  127. */
  128. public function getConfigValue($keys, $default = null)
  129. {
  130. $value = $this->_config->getAll();
  131. foreach ((array) $keys as $key) {
  132. if (isset($value[$key])) {
  133. $value = $value[$key];
  134. } else {
  135. return $default;
  136. }
  137. }
  138. return $value;
  139. }
  140. /**
  141. * Returns the index for the given connection.
  142. */
  143. public function getIndex(string $name): Index
  144. {
  145. return new Index($this, $name);
  146. }
  147. /**
  148. * Adds a HTTP Header.
  149. */
  150. public function addHeader(string $header, string $value): self
  151. {
  152. if ($this->_config->has('headers')) {
  153. $headers = $this->_config->get('headers');
  154. } else {
  155. $headers = [];
  156. }
  157. $headers[$header] = $value;
  158. $this->_config->set('headers', $headers);
  159. return $this;
  160. }
  161. /**
  162. * Remove a HTTP Header.
  163. */
  164. public function removeHeader(string $header): self
  165. {
  166. if ($this->_config->has('headers')) {
  167. $headers = $this->_config->get('headers');
  168. unset($headers[$header]);
  169. $this->_config->set('headers', $headers);
  170. }
  171. return $this;
  172. }
  173. /**
  174. * Uses _bulk to send documents to the server.
  175. *
  176. * Array of \Elastica\Document as input. Index has to be set inside the
  177. * document, because for bulk settings documents, documents can belong to
  178. * any index
  179. *
  180. * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
  181. *
  182. * @param array|Document[] $docs Array of Elastica\Document
  183. *
  184. * @throws InvalidException If docs is empty
  185. * @throws ClientException
  186. * @throws ConnectionException
  187. * @throws ResponseException
  188. * @throws BulkResponseException
  189. */
  190. public function updateDocuments(array $docs, array $requestParams = []): ResponseSet
  191. {
  192. if (!$docs) {
  193. throw new InvalidException('Array has to consist of at least one element');
  194. }
  195. $bulk = new Bulk($this);
  196. $bulk->addDocuments($docs, Action::OP_TYPE_UPDATE);
  197. foreach ($requestParams as $key => $value) {
  198. $bulk->setRequestParam($key, $value);
  199. }
  200. return $bulk->send();
  201. }
  202. /**
  203. * Uses _bulk to send documents to the server.
  204. *
  205. * Array of \Elastica\Document as input. Index has to be set inside the
  206. * document, because for bulk settings documents, documents can belong to
  207. * any index
  208. *
  209. * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
  210. *
  211. * @param array|Document[] $docs Array of Elastica\Document
  212. *
  213. * @throws InvalidException If docs is empty
  214. * @throws ClientException
  215. * @throws ConnectionException
  216. * @throws ResponseException
  217. * @throws BulkResponseException
  218. */
  219. public function addDocuments(array $docs, array $requestParams = []): ResponseSet
  220. {
  221. if (!$docs) {
  222. throw new InvalidException('Array has to consist of at least one element');
  223. }
  224. $bulk = new Bulk($this);
  225. $bulk->addDocuments($docs);
  226. foreach ($requestParams as $key => $value) {
  227. $bulk->setRequestParam($key, $value);
  228. }
  229. return $bulk->send();
  230. }
  231. /**
  232. * Update document, using update script. Requires elasticsearch >= 0.19.0.
  233. *
  234. * @param int|string $id document id
  235. * @param AbstractScript|array|Document $data raw data for request body
  236. * @param string $index index to update
  237. * @param array $options array of query params to use for query. For possible options check es api
  238. *
  239. * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html
  240. *
  241. * @throws ClientException
  242. * @throws ConnectionException
  243. * @throws ResponseException
  244. */
  245. public function updateDocument($id, $data, $index, array $options = []): Response
  246. {
  247. $endpoint = new Update();
  248. $endpoint->setId($id);
  249. $endpoint->setIndex($index);
  250. if ($data instanceof AbstractScript) {
  251. $requestData = $data->toArray();
  252. } elseif ($data instanceof Document) {
  253. $requestData = ['doc' => $data->getData()];
  254. if ($data->getDocAsUpsert()) {
  255. $requestData['doc_as_upsert'] = true;
  256. }
  257. $docOptions = $data->getOptions(
  258. [
  259. 'consistency',
  260. 'parent',
  261. 'percolate',
  262. 'refresh',
  263. 'replication',
  264. 'retry_on_conflict',
  265. 'routing',
  266. 'timeout',
  267. ]
  268. );
  269. $options += $docOptions;
  270. } else {
  271. $requestData = $data;
  272. }
  273. // If an upsert document exists
  274. if ($data instanceof AbstractScript || $data instanceof Document) {
  275. if ($data->hasUpsert()) {
  276. $requestData['upsert'] = $data->getUpsert()->getData();
  277. }
  278. }
  279. $endpoint->setBody($requestData);
  280. $endpoint->setParams($options);
  281. $response = $this->requestEndpoint($endpoint);
  282. if ($response->isOk()
  283. && $data instanceof Document
  284. && ($data->isAutoPopulate() || $this->getConfigValue(['document', 'autoPopulate'], false))
  285. ) {
  286. $data->setVersionParams($response->getData());
  287. }
  288. return $response;
  289. }
  290. /**
  291. * Bulk deletes documents.
  292. *
  293. * @param array|Document[] $docs
  294. *
  295. * @throws InvalidException
  296. * @throws ClientException
  297. * @throws ConnectionException
  298. * @throws ResponseException
  299. * @throws BulkResponseException
  300. */
  301. public function deleteDocuments(array $docs, array $requestParams = []): ResponseSet
  302. {
  303. if (!$docs) {
  304. throw new InvalidException('Array has to consist of at least one element');
  305. }
  306. $bulk = new Bulk($this);
  307. $bulk->addDocuments($docs, Action::OP_TYPE_DELETE);
  308. foreach ($requestParams as $key => $value) {
  309. $bulk->setRequestParam($key, $value);
  310. }
  311. return $bulk->send();
  312. }
  313. /**
  314. * Returns the status object for all indices.
  315. *
  316. * @return Status
  317. */
  318. public function getStatus()
  319. {
  320. return new Status($this);
  321. }
  322. /**
  323. * Returns the current cluster.
  324. *
  325. * @return Cluster
  326. */
  327. public function getCluster()
  328. {
  329. return new Cluster($this);
  330. }
  331. /**
  332. * Establishes the client connections.
  333. */
  334. public function connect()
  335. {
  336. $this->_initConnections();
  337. }
  338. /**
  339. * @return $this
  340. */
  341. public function addConnection(Connection $connection)
  342. {
  343. $this->_connectionPool->addConnection($connection);
  344. return $this;
  345. }
  346. /**
  347. * Determines whether a valid connection is available for use.
  348. *
  349. * @return bool
  350. */
  351. public function hasConnection()
  352. {
  353. return $this->_connectionPool->hasConnection();
  354. }
  355. /**
  356. * @throws ClientException
  357. *
  358. * @return Connection
  359. */
  360. public function getConnection()
  361. {
  362. return $this->_connectionPool->getConnection();
  363. }
  364. /**
  365. * @return Connection[]
  366. */
  367. public function getConnections()
  368. {
  369. return $this->_connectionPool->getConnections();
  370. }
  371. /**
  372. * @return \Elastica\Connection\Strategy\StrategyInterface
  373. */
  374. public function getConnectionStrategy()
  375. {
  376. return $this->_connectionPool->getStrategy();
  377. }
  378. /**
  379. * @param array|Connection[] $connections
  380. *
  381. * @return $this
  382. */
  383. public function setConnections(array $connections)
  384. {
  385. $this->_connectionPool->setConnections($connections);
  386. return $this;
  387. }
  388. /**
  389. * Deletes documents with the given ids, index, type from the index.
  390. *
  391. * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
  392. *
  393. * @param array $ids Document ids
  394. * @param Index|string $index Index name
  395. * @param bool|string $routing Optional routing key for all ids
  396. *
  397. * @throws InvalidException
  398. * @throws ClientException
  399. * @throws ConnectionException
  400. * @throws ResponseException
  401. * @throws BulkResponseException
  402. */
  403. public function deleteIds(array $ids, $index, $routing = false): ResponseSet
  404. {
  405. if (!$ids) {
  406. throw new InvalidException('Array has to consist of at least one id');
  407. }
  408. $bulk = new Bulk($this);
  409. $bulk->setIndex($index);
  410. foreach ($ids as $id) {
  411. $action = new Action(Action::OP_TYPE_DELETE);
  412. $action->setId($id);
  413. if (!empty($routing)) {
  414. $action->setRouting($routing);
  415. }
  416. $bulk->addAction($action);
  417. }
  418. return $bulk->send();
  419. }
  420. /**
  421. * Bulk operation.
  422. *
  423. * Every entry in the params array has to exactly on array
  424. * of the bulk operation. An example param array would be:
  425. *
  426. * array(
  427. * array('index' => array('_index' => 'test', '_id' => '1')),
  428. * array('field1' => 'value1'),
  429. * array('delete' => array('_index' => 'test', '_id' => '2')),
  430. * array('create' => array('_index' => 'test', '_id' => '3')),
  431. * array('field1' => 'value3'),
  432. * array('update' => array('_id' => '1', '_index' => 'test')),
  433. * array('doc' => array('field2' => 'value2')),
  434. * );
  435. *
  436. * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
  437. *
  438. * @throws ResponseException
  439. * @throws InvalidException
  440. * @throws ClientException
  441. * @throws ConnectionException
  442. * @throws BulkResponseException
  443. */
  444. public function bulk(array $params): ResponseSet
  445. {
  446. if (!$params) {
  447. throw new InvalidException('Array has to consist of at least one param');
  448. }
  449. $bulk = new Bulk($this);
  450. $bulk->addRawData($params);
  451. return $bulk->send();
  452. }
  453. /**
  454. * Makes calls to the elasticsearch server based on this index.
  455. *
  456. * It's possible to make any REST query directly over this method
  457. *
  458. * @param string $path Path to call
  459. * @param string $method Rest method to use (GET, POST, DELETE, PUT)
  460. * @param array|string $data OPTIONAL Arguments as array or pre-encoded string
  461. * @param array $query OPTIONAL Query params
  462. * @param string $contentType Content-Type sent with this request
  463. *
  464. * @throws ClientException
  465. * @throws ConnectionException
  466. * @throws ResponseException
  467. */
  468. public function request(string $path, string $method = Request::GET, $data = [], array $query = [], string $contentType = Request::DEFAULT_CONTENT_TYPE): Response
  469. {
  470. $connection = $this->getConnection();
  471. $request = $this->_lastRequest = new Request($path, $method, $data, $query, $connection, $contentType);
  472. $this->_lastResponse = null;
  473. try {
  474. $response = $this->_lastResponse = $request->send();
  475. } catch (ConnectionException $e) {
  476. $this->_connectionPool->onFail($connection, $e, $this);
  477. $this->_logger->error('Elastica Request Failure', [
  478. 'exception' => $e,
  479. 'request' => $e->getRequest()->toArray(),
  480. 'retry' => $this->hasConnection(),
  481. ]);
  482. // In case there is no valid connection left, throw exception which caused the disabling of the connection.
  483. if (!$this->hasConnection()) {
  484. throw $e;
  485. }
  486. return $this->request($path, $method, $data, $query);
  487. }
  488. $this->_logger->debug('Elastica Request', [
  489. 'request' => $request->toArray(),
  490. 'response' => $response->getData(),
  491. 'responseStatus' => $response->getStatus(),
  492. ]);
  493. return $response;
  494. }
  495. /**
  496. * Makes calls to the elasticsearch server with usage official client Endpoint.
  497. *
  498. * @throws ClientException
  499. * @throws ConnectionException
  500. * @throws ResponseException
  501. */
  502. public function requestEndpoint(AbstractEndpoint $endpoint): Response
  503. {
  504. return $this->request(
  505. \ltrim($endpoint->getURI(), '/'),
  506. $endpoint->getMethod(),
  507. $endpoint->getBody() ?? [],
  508. $endpoint->getParams()
  509. );
  510. }
  511. /**
  512. * Force merges all search indices.
  513. *
  514. * @param array $args OPTIONAL Optional arguments
  515. *
  516. * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-forcemerge.html
  517. *
  518. * @throws ClientException
  519. * @throws ConnectionException
  520. * @throws ResponseException
  521. */
  522. public function forcemergeAll($args = []): Response
  523. {
  524. $endpoint = new ForceMerge();
  525. $endpoint->setParams($args);
  526. return $this->requestEndpoint($endpoint);
  527. }
  528. /**
  529. * Closes the given PointInTime.
  530. *
  531. * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/point-in-time-api.html#close-point-in-time-api
  532. *
  533. * @throws ClientException
  534. * @throws ConnectionException
  535. * @throws ResponseException
  536. */
  537. public function closePointInTime(string $pointInTimeId): Response
  538. {
  539. $endpoint = new ClosePointInTime();
  540. $endpoint->setBody(['id' => $pointInTimeId]);
  541. return $this->requestEndpoint($endpoint);
  542. }
  543. /**
  544. * Refreshes all search indices.
  545. *
  546. * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-refresh.html
  547. *
  548. * @throws ClientException
  549. * @throws ConnectionException
  550. * @throws ResponseException
  551. */
  552. public function refreshAll(): Response
  553. {
  554. return $this->requestEndpoint(new Refresh());
  555. }
  556. public function getLastRequest(): ?Request
  557. {
  558. return $this->_lastRequest;
  559. }
  560. public function getLastResponse(): ?Response
  561. {
  562. return $this->_lastResponse;
  563. }
  564. /**
  565. * Replace the existing logger.
  566. *
  567. * @return $this
  568. */
  569. public function setLogger(LoggerInterface $logger)
  570. {
  571. $this->_logger = $logger;
  572. return $this;
  573. }
  574. /**
  575. * Inits the client connections.
  576. */
  577. protected function _initConnections(): void
  578. {
  579. $connections = [];
  580. foreach ($this->getConfig('connections') as $connection) {
  581. $connections[] = Connection::create($this->_prepareConnectionParams($connection));
  582. }
  583. if ($this->_config->has('servers')) {
  584. $servers = $this->_config->get('servers');
  585. foreach ($servers as $server) {
  586. $connections[] = Connection::create($this->_prepareConnectionParams($server));
  587. }
  588. }
  589. // If no connections set, create default connection
  590. if (!$connections) {
  591. $connections[] = Connection::create($this->_prepareConnectionParams($this->getConfig()));
  592. }
  593. if (!$this->_config->has('connectionStrategy')) {
  594. if (true === $this->getConfig('roundRobin')) {
  595. $this->setConfigValue('connectionStrategy', 'RoundRobin');
  596. } else {
  597. $this->setConfigValue('connectionStrategy', 'Simple');
  598. }
  599. }
  600. $strategy = Connection\Strategy\StrategyFactory::create($this->getConfig('connectionStrategy'));
  601. $this->_connectionPool = new Connection\ConnectionPool($connections, $strategy, $this->_callback);
  602. }
  603. /**
  604. * Creates a Connection params array from a Client or server config array.
  605. */
  606. protected function _prepareConnectionParams(array $config): array
  607. {
  608. $params = [];
  609. $params['config'] = [];
  610. foreach ($config as $key => $value) {
  611. if (\in_array($key, ['bigintConversion', 'curl', 'headers', 'url'])) {
  612. $params['config'][$key] = $value;
  613. } else {
  614. $params[$key] = $value;
  615. }
  616. }
  617. return $params;
  618. }
  619. }