src/Service/ElasticsearchLogstashHandlerImp.php line 13

Open in your IDE?
  1. <?php
  2. namespace App\Service ;
  3. use Monolog\Formatter\FormatterInterface;
  4. use Monolog\Formatter\LogstashFormatter;
  5. use Monolog\Handler\FormattableHandlerTrait;
  6. use Monolog\Handler\ProcessableHandlerTrait;
  7. use Monolog\Logger;
  8. use Symfony\Bridge\Monolog\Handler\ElasticsearchLogstashHandler as MonologHandler;
  9. use Symfony\Component\HttpClient\HttpClient;
  10. use Symfony\Contracts\HttpClient\Exception\ExceptionInterface;
  11. use Symfony\Contracts\HttpClient\HttpClientInterface;
  12. class ElasticsearchLogstashHandlerImp extends MonologHandler
  13. {
  14.     use FormattableHandlerTrait;
  15.     use ProcessableHandlerTrait;
  16.     private $endpoint;
  17.     private $index;
  18.     private $client;
  19.     private $responses;
  20.     /**
  21.      * @param string|int $level The minimum logging level at which this handler will be triggered
  22.      */
  23.     public function __construct(string $endpoint 'http://127.0.0.1:9200'string $index 'monolog'HttpClientInterface $client null$level Logger::INFObool $bubble true)
  24.     {
  25.         if (!interface_exists(HttpClientInterface::class)) {
  26.             throw new \LogicException(sprintf('The "%s" handler needs an HTTP client. Try running "composer require symfony/http-client".'__CLASS__));
  27.         }
  28.         parent::__construct($endpoint$index$client$level$bubble);
  29.         $this->endpoint $endpoint;
  30.         $this->index $index;
  31.         $this->client $client ?: HttpClient::create(['timeout' => 1]);
  32.         $this->responses = new \SplObjectStorage();
  33.     }
  34.     public function handle(array $record): bool
  35.     {
  36.         if (!$this->isHandling($record)) {
  37.             return false;
  38.         }
  39.         $this->sendToElasticsearch([$record]);
  40.         return !$this->bubble;
  41.     }
  42.     public function handleBatch(array $records): void
  43.     {
  44.         $records array_filter($records, [$this'isHandling']);
  45.         if ($records) {
  46.             $this->sendToElasticsearch($records);
  47.         }
  48.     }
  49.     protected function getDefaultFormatter(): FormatterInterface
  50.     {
  51.         // Monolog 1.X
  52.         if (\defined(LogstashFormatter::class.'::V1')) {
  53.             return new LogstashFormatter('application'nullnull'ctxt_'LogstashFormatter::V1);
  54.         }
  55.         // Monolog 2.X
  56.         return new LogstashFormatter('application');
  57.     }
  58.     private function sendToElasticsearch(array $records)
  59.     {
  60.         $formatter $this->getFormatter();
  61.         $body '';
  62.         foreach ($records as $record) {
  63.             foreach ($this->processors as $processor) {
  64.                 $record $processor($record);
  65.             }
  66.             $body .= json_encode([
  67.                 'index' => [
  68.                     '_index' => $this->index,
  69.                     '_type' => '_doc',
  70.                 ],
  71.             ]);
  72.             $body .= "\n";
  73.             $body .= $formatter->format($record);
  74.             $body .= "\n";
  75.         }
  76.         $response $this->client->request('POST'$this->endpoint.'/_bulk', [
  77.             'body' => $body,
  78.             'headers' => [
  79.                 'Content-Type' => 'application/json',
  80.                 'partner'=>$_ENV['PARTENER_ELASTICSEACH'],
  81.                 'appkey'=>$_ENV['APP_KEY_ELASTICSEACH'],
  82.             ],
  83.         ]);
  84.         //dump($this->level);
  85.         //die;
  86.         // dd($response->getStatusCode(), $response->getContent());
  87.         $this->responses->attach($response);
  88.         $this->wait(false);
  89.     }
  90.     public function __destruct()
  91.     {
  92.         $this->wait(true);
  93.     }
  94.     private function wait(bool $blocking)
  95.     {
  96.         foreach ($this->client->stream($this->responses$blocking null 0.0) as $response => $chunk) {
  97.             try {
  98.                 if ($chunk->isTimeout() && !$blocking) {
  99.                     continue;
  100.                 }
  101.                 if (!$chunk->isFirst() && !$chunk->isLast()) {
  102.                     continue;
  103.                 }
  104.                 if ($chunk->isLast()) {
  105.                     $this->responses->detach($response);
  106.                 }
  107.             } catch (ExceptionInterface $e) {
  108.                 $this->responses->detach($response);
  109.                 error_log(sprintf("Could not push logs to Elasticsearch:\n%s", (string) $e));
  110.             }
  111.         }
  112.     }
  113. }