source: repository/lib/Metabrik/Client/Elasticsearch.pm

tip
Last change on this file was 931:07eaa8bc26b2, checked in by GomoR <gomor@…>, 3 weeks ago
  • bugfix: client::elasticsearch: verify node is defined when checking error message
  • bugfix: client::kafka: ignore warning on redefined routine
  • update: server::elasticsearch: more configuration settings in generate_conf Command
  • update: server::kafka: more configuration settings in generate_conf Command
File size: 71.9 KB
Line 
1#
2# $Id$
3#
4# client::elasticsearch Brik
5#
6package Metabrik::Client::Elasticsearch;
7use strict;
8use warnings;
9
10use base qw(Metabrik::Client::Rest);
11
12sub brik_properties {
13   return {
14      revision => '$Revision$',
15      tags => [ qw(unstable es es) ],
16      author => 'GomoR <GomoR[at]metabrik.org>',
17      license => 'http://opensource.org/licenses/BSD-3-Clause',
18      attributes => {
19         nodes => [ qw(node_list) ],
20         cxn_pool => [ qw(Sniff|Static|Static::NoPing) ],
21         date => [ qw(date) ],
22         index => [ qw(index) ],
23         type => [ qw(type) ],
24         from => [ qw(number) ],
25         size => [ qw(count) ],
26         max => [ qw(count) ],
27         max_flush_count => [ qw(count) ],
28         max_flush_size => [ qw(count) ],
29         rtimeout => [ qw(seconds) ],
30         sniff_rtimeout => [ qw(seconds) ],
31         try => [ qw(count) ],
32         use_bulk_autoflush => [ qw(0|1) ],
33         use_indexing_optimizations => [ qw(0|1) ],
34         csv_encoded_fields => [ qw(fields) ],
35         csv_object_fields => [ qw(fields) ],
36         _es => [ qw(INTERNAL) ],
37         _bulk => [ qw(INTERNAL) ],
38         _scroll => [ qw(INTERNAL) ],
39      },
40      attributes_default => {
41         nodes => [ qw(http://localhost:9200) ],
42         cxn_pool => 'Sniff',
43         from => 0,
44         size => 10,
45         max => 0,
46         index => '*',
47         type => '*',
48         rtimeout => 60,
49         sniff_rtimeout => 3,
50         try => 3,
51         max_flush_count => 1_000,
52         max_flush_size => 1_000_000,
53         use_bulk_autoflush => 1,
54         use_indexing_optimizations => 0,
55      },
56      commands => {
57         open => [ qw(nodes_list|OPTIONAL cxn_pool|OPTIONAL) ],
58         open_bulk_mode => [ qw(index|OPTIONAL type|OPTIONAL) ],
59         open_scroll_scan_mode => [ qw(index|OPTIONAL size|OPTIONAL) ],
60         open_scroll => [ qw(index|OPTIONAL size|OPTIONAL) ],
61         close_scroll => [ ],
62         total_scroll => [ ],
63         next_scroll => [ ],
64         index_document => [ qw(document index|OPTIONAL type|OPTIONAL id|OPTIONAL) ],
65         index_bulk => [ qw(document index|OPTIONAL type|OPTIONAL id|OPTIONAL) ],
66         bulk_flush => [ ],
67         query => [ qw($query_hash index|OPTIONAL type|OPTIONAL hash|OPTIONAL) ],
68         count => [ qw(index|OPTIONAL type|OPTIONAL) ],
69         get_from_id => [ qw(id index|OPTIONAL type|OPTIONAL) ],
70         www_search => [ qw(query index|OPTIONAL type|OPTIONAL) ],
71         delete_index => [ qw(index|indices_list) ],
72         update_alias => [ qw(new_index alias) ],
73         delete_document => [ qw(index type id) ],
74         delete_by_query => [ qw($query_hash index type) ],
75         show_indices => [ qw(string_filter|OPTIONAL) ],
76         show_nodes => [ ],
77         show_health => [ ],
78         show_recovery => [ ],
79         list_indices => [ ],
80         get_indices => [ ],
81         get_index => [ qw(index|indices_list) ],
82         list_index_types => [ qw(index) ],
83         list_index_fields => [ qw(index) ],
84         list_indices_version => [ qw(index|indices_list) ],
85         open_index => [ qw(index|indices_list) ],
86         close_index => [ qw(index|indices_list) ],
87         get_aliases => [ qw(index) ],
88         put_alias => [ qw(index alias) ],
89         delete_alias => [ qw(index alias) ],
90         get_mappings => [ qw(index type|OPTIONAL) ],
91         create_index => [ qw(index) ],
92         create_index_with_mappings => [ qw(index mappings) ],
93         info => [ qw(nodes_list|OPTIONAL) ],
94         version => [ qw(nodes_list|OPTIONAL) ],
95         get_templates => [ ],
96         list_templates => [ ],
97         get_template => [ qw(name) ],
98         put_template => [ qw(name template) ],
99         put_template_from_json_file => [ qw(file) ],
100         get_settings => [ qw(index|indices_list|OPTIONAL name|names_list|OPTIONAL) ],
101         put_settings => [ qw(settings_hash index|indices_list|OPTIONAL) ],
102         set_index_number_of_replicas => [ qw(index|indices_list number) ],
103         set_index_refresh_interval => [ qw(index|indices_list number) ],
104         get_index_number_of_replicas => [ qw(index|indices) ],
105         get_index_refresh_interval => [ qw(index|indices_list) ],
106         get_index_number_of_shards => [ qw(index|indices_list) ],
107         delete_template => [ qw(name) ],
108         is_index_exists => [ qw(index) ],
109         is_type_exists => [ qw(index type) ],
110         is_document_exists => [ qw(index type document) ],
111         parse_error_string => [ qw(string) ],
112         refresh_index => [ qw(index) ],
113         export_as_csv => [ qw(index size|OPTIONAL) ],
114         import_from_csv => [ qw(input_csv index|OPTIONAL type|OPTIONAL size|OPTIONAL) ],
115         get_stats_process => [ ],
116         get_process => [ ],
117         get_cluster_state => [ ],
118         get_cluster_health => [ ],
119         get_cluster_settings => [ ],
120         put_cluster_settings => [ qw(settings) ],
121         count_green_indices => [ ],
122         count_yellow_indices => [ ],
123         count_red_indices => [ ],
124         list_green_indices => [ ],
125         list_yellow_indices => [ ],
126         list_red_indices => [ ],
127         count_indices => [ ],
128         list_indices_status => [ ],
129         count_shards => [ ],
130         count_size => [ ],
131         count_total_size => [ ],
132         count_count => [ ],
133         list_datatypes => [ ],
134         get_hits_total => [ ],
135         disable_shard_allocation => [ ],
136         enable_shard_allocation => [ ],
137         flush_synced => [ ],
138         create_snapshot_repository => [ qw(body repository_name|OPTIONAL) ],
139         create_shared_fs_snapshot_repository => [ qw(location
140            repository_name|OPTIONAL) ],
141         get_snapshot_repositories => [ ],
142         get_snapshot_status => [ ],
143         delete_snapshot_repository => [ qw(repository_name) ],
144         create_snapshot => [ qw(snapshot_name|OPTIONAL repository_name|OPTIONAL
145            body|OPTIONAL) ],
146         create_snapshot_for_indices => [ qw(indices snapshot_name|OPTIONAL
147            repository_name|OPTIONAL) ],
148         is_snapshot_finished => [ ],
149         get_snapshot_state => [ ],
150         get_snapshot => [ qw(snapshot_name|OPTIONAL repository_name|OPTIONAL) ],
151         delete_snapshot => [ qw(snapshot_name repository_name) ],
152         restore_snapshot => [ qw(snapshot_name repository_name body|OPTIONAL) ],
153         restore_snapshot_for_indices => [ qw(indices snapshot_name repository_name) ],
154      },
155      require_modules => {
156         'Metabrik::String::Json' => [ ],
157         'Metabrik::File::Csv' => [ ],
158         'Metabrik::File::Json' => [ ],
159         'Metabrik::File::Dump' => [ ],
160         'Metabrik::Format::Number' => [ ],
161         'Search::Elasticsearch' => [ ],
162      },
163   };
164}
165
166sub brik_preinit {
167   my $self = shift;
168
169   eval("use Search::Elasticsearch;");
170   if ($Search::Elasticsearch::VERSION < 5) {
171      $self->log->error("brik_preinit: please upgrade Search::Elasticsearch module ".
172         "with: run perl::module install Search::Elasticsearch");
173   }
174
175   return $self->SUPER::brik_preinit;
176}
177
178sub open {
179   my $self = shift;
180   my ($nodes, $cxn_pool) = @_;
181
182   $nodes ||= $self->nodes;
183   $cxn_pool ||= $self->cxn_pool;
184   $self->brik_help_run_undef_arg('open', $nodes) or return;
185   $self->brik_help_run_undef_arg('open', $cxn_pool) or return;
186   $self->brik_help_run_invalid_arg('open', $nodes, 'ARRAY') or return;
187   $self->brik_help_run_empty_array_arg('open', $nodes) or return;
188
189   for my $node (@$nodes) {
190      if ($node !~ m{https?://}) {
191         return $self->log->error("open: invalid node[$node], must start with http(s)");
192      }
193   }
194
195   my $timeout = $self->rtimeout;
196
197   my $nodes_str = join('|', @$nodes);
198   $self->log->verbose("open: using nodes [$nodes_str]");
199
200   #
201   # Timeout description here:
202   #
203   # Search::Elasticsearch::Role::Cxn
204   #
205
206   my $es = Search::Elasticsearch->new(
207      nodes => $nodes,
208      cxn_pool => $cxn_pool,
209      timeout => $timeout,
210      max_retries => $self->try,
211      retry_on_timeout => 1,
212      sniff_timeout => $self->sniff_rtimeout, # seconds, default 1
213      request_timeout => 60,  # seconds, default 30
214      ping_timeout => 5,  # seconds, default 2
215      dead_timeout => 120,  # seconds, detault 60
216      max_dead_timeout => 3600,  # seconds, default 3600
217      sniff_request_timeout => 15, # seconds, default 2
218   );
219   if (! defined($es)) {
220      return $self->log->error("open: failed");
221   }
222
223   $self->_es($es);
224
225   return $nodes;
226}
227
228#
229# Search::Elasticsearch::Client::5_0::Bulk
230#
231sub open_bulk_mode {
232   my $self = shift;
233   my ($index, $type) = @_;
234
235   $index ||= $self->index;
236   $type ||= $self->type;
237   my $es = $self->_es;
238   $self->brik_help_run_undef_arg('open', $es) or return;
239   $self->brik_help_run_undef_arg('open_bulk_mode', $index) or return;
240   $self->brik_help_run_undef_arg('open_bulk_mode', $type) or return;
241
242   my %args = (
243      index => $index,
244      type => $type,
245   );
246
247   if ($self->use_bulk_autoflush) {
248      my $max_count = $self->max_flush_count || 1_000;
249      my $max_size = $self->max_flush_size || 1_000_000;
250
251      $args{max_count} = $max_count;
252      $args{max_size} = $max_size;
253      $args{max_time} = 0;
254
255      $self->log->info("open_bulk_mode: opening with max_flush_count [$max_count] and ".
256         "max_flush_size [$max_size]");
257   }
258   else {
259      $args{max_count} = 0;
260      $args{max_size} = 0;
261      $args{max_time} = 0;
262      $args{on_error} = undef;
263      #$args{on_success} = sub {
264         #my ($action, $response, $i) = @_;
265      #};
266
267      $self->log->info("open_bulk_mode: opening without automatic flushing");
268   }
269
270   my $bulk = $es->bulk_helper(%args);
271   if (! defined($bulk)) {
272      return $self->log->error("open_bulk_mode: failed");
273   }
274
275   $self->_bulk($bulk);
276
277   return $self->nodes;
278}
279
280sub open_scroll_scan_mode {
281   my $self = shift;
282   my ($index, $size) = @_;
283
284   my $version = $self->version or return;
285   if ($version ge "5.0.0") {
286      return $self->log->error("open_scroll_scan_mode: Command not supported for ES version ".
287         "$version, try open_scroll Command instead");
288   }
289
290   $index ||= $self->index;
291   $size ||= $self->size;
292   my $es = $self->_es;
293   $self->brik_help_run_undef_arg('open', $es) or return;
294   $self->brik_help_run_undef_arg('open_scroll_scan_mode', $index) or return;
295   $self->brik_help_run_undef_arg('open_scroll_scan_mode', $size) or return;
296
297   my $scroll = $es->scroll_helper(
298      index => $index,
299      search_type => 'scan',
300      size => $size,
301   );
302   if (! defined($scroll)) {
303      return $self->log->error("open_scroll_scan_mode: failed");
304   }
305
306   $self->_scroll($scroll);
307
308   return $self->nodes;
309}
310
311#
312# Search::Elasticsearch::Client::5_0::Scroll
313#
314sub open_scroll {
315   my $self = shift;
316   my ($index, $size) = @_;
317
318   my $version = $self->version or return;
319   if ($version lt "5.0.0") {
320      return $self->log->error("open_scroll: Command not supported for ES version ".
321         "$version, try open_scroll_scan_mode Command instead");
322   }
323
324   $index ||= $self->index;
325   $size ||= $self->size;
326   my $es = $self->_es;
327   $self->brik_help_run_undef_arg('open', $es) or return;
328   $self->brik_help_run_undef_arg('open_scroll', $index) or return;
329   $self->brik_help_run_undef_arg('open_scroll', $size) or return;
330
331   my $timeout = $self->rtimeout;
332
333   #
334   # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html
335   #
336   my $scroll = $es->scroll_helper(
337      scroll => "${timeout}s",
338      scroll_in_qs => 1,  # By default (0), pass scroll_id in request body. When 1, pass
339                          # it in query string.
340      index => $index,
341      size => $size,
342      body => {
343         sort => [ qw(_doc) ],
344      },
345   );
346   if (! defined($scroll)) {
347      return $self->log->error("open_scroll: failed");
348   }
349
350   $self->_scroll($scroll);
351
352   $self->log->info("open_scroll: opened with size [$size] and timeout [${timeout}s]");
353
354   return $self->nodes;
355}
356
357#
358# Search::Elasticsearch::Client::5_0::Scroll
359#
360sub close_scroll {
361   my $self = shift;
362
363   my $scroll = $self->_scroll;
364   if (! defined($scroll)) {
365      return 1;
366   }
367
368   $scroll->finish;
369   $self->_scroll(undef);
370
371   return 1;
372}
373
374sub total_scroll {
375   my $self = shift;
376
377   my $scroll = $self->_scroll;
378   $self->brik_help_run_undef_arg('open_scroll', $scroll) or return;
379
380   my $total;
381   eval {
382      $total = $scroll->total;
383   };
384   if ($@) {
385      chomp($@);
386      return $self->log->error("total_scroll: failed with: [$@]");
387   }
388
389   return $total;
390}
391
392sub next_scroll {
393   my $self = shift;
394
395   my $scroll = $self->_scroll;
396   $self->brik_help_run_undef_arg('open_scroll', $scroll) or return;
397
398   my $next;
399   eval {
400      $next = $scroll->next;
401   };
402   if ($@) {
403      chomp($@);
404      return $self->log->error("next_scroll: failed with: [$@]");
405   }
406
407   return $next;
408}
409
410sub index_document {
411   my $self = shift;
412   my ($doc, $index, $type, $id) = @_;
413
414   $index ||= $self->index;
415   $type ||= $self->type;
416   my $es = $self->_es;
417   $self->brik_help_run_undef_arg('open', $es) or return;
418   $self->brik_help_run_undef_arg('index_document', $doc) or return;
419   $self->brik_help_run_invalid_arg('index_document', $doc, 'HASH') or return;
420   $self->brik_help_set_undef_arg('index', $index) or return;
421   $self->brik_help_set_undef_arg('type', $type) or return;
422
423   my %args = (
424      index => $index,
425      type => $type,
426      body => $doc,
427   );
428   if (defined($id)) {
429      $args{id} = $id;
430   }
431
432   my $r;
433   eval {
434      $r = $es->index(%args);
435   };
436   if ($@) {
437      chomp($@);
438      return $self->log->error("index_document: index failed for index [$index]: [$@]");
439   }
440
441   return $r;
442}
443
444#
445# Search::Elasticsearch::Client::5_0::Bulk
446#
447sub index_bulk {
448   my $self = shift;
449   my ($doc, $index, $type, $id) = @_;
450
451   my $bulk = $self->_bulk;
452   $index ||= $self->index;
453   $type ||= $self->type;
454   $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
455   $self->brik_help_run_undef_arg('index_bulk', $doc) or return;
456   $self->brik_help_set_undef_arg('index', $index) or return;
457   $self->brik_help_set_undef_arg('type', $type) or return;
458
459   my %args = (
460      source => $doc,
461   );
462   if (defined($id)) {
463      $args{id} = $id;
464   }
465
466   my $r;
467   eval {
468      #$r = $bulk->index(\%args);
469      $r = $bulk->add_action(index => \%args);
470   };
471   if ($@) {
472      chomp($@);
473      my $p = $self->parse_error_string($@);
474      if (defined($p) && exists($p->{class})) {
475         my $class = $p->{class};
476         my $code = $p->{code};
477         my $node = $p->{node};
478         return $self->log->error("index_bulk: failed for index [$index] with error ".
479            "[$class] code [$code] for node [$node]");
480      }
481      else {
482         return $self->log->error("index_bulk: index failed for index [$index]: [$@]");
483      }
484   }
485
486   return $r;
487}
488
489sub bulk_flush {
490   my $self = shift;
491
492   my $bulk = $self->_bulk;
493   $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
494
495   my $try = $self->try;
496
497RETRY:
498
499   my $r;
500   eval {
501      $r = $bulk->flush;
502   };
503   if ($@) {
504      if (--$try == 0) {
505         chomp($@);
506         my $p = $self->parse_error_string($@);
507         if (defined($p) && exists($p->{class})) {
508            my $class = $p->{class};
509            my $code = $p->{code};
510            my $node = $p->{node};
511            return $self->log->error("bulk_flush: failed after [$try] tries with error ".
512               "[$class] code [$code] for node [$node]");
513         }
514         else {
515            return $self->log->error("bulk_flush: failed after [$try]: [$@]");
516         }
517      }
518      sleep 60;
519      goto RETRY;
520   }
521
522   return $r;
523}
524
525#
526# Search::Elasticsearch::Client::2_0::Direct
527# Search::Elasticsearch::Client::5_0::Direct
528#
529sub count {
530   my $self = shift;
531   my ($index, $type) = @_;
532
533   $index ||= $self->index;
534   $type ||= $self->type;
535   my $es = $self->_es;
536   $self->brik_help_run_undef_arg('open', $es) or return;
537
538   my %args = ();
539   if (defined($index) && $index ne '*') {
540      $args{index} = $index;
541   }
542   if (defined($type) && $type ne '*') {
543      $args{type} = $type;
544   }
545
546   #$args{body} = {
547      #query => {
548         #match => { title => 'Elasticsearch clients' },
549      #},
550   #}
551
552   my $r;
553   my $version = $self->version or return;
554   if ($version ge "5.0.0") {
555      eval {
556         $r = $es->count(%args);
557      };
558   }
559   else {
560      eval {
561         $r = $es->search(
562            index => $index,
563            type => $type,
564            search_type => 'count',
565            body => {
566               query => {
567                  match_all => {},
568               },
569            },
570         );
571      };
572   }
573   if ($@) {
574      chomp($@);
575      return $self->log->error("count: count failed for index [$index]: [$@]");
576   }
577
578   if ($version ge "5.0.0") {
579      if (exists($r->{count})) {
580         return $r->{count};
581      }
582   }
583   elsif (exists($r->{hits}) && exists($r->{hits}{total})) {
584      return $r->{hits}{total};
585   }
586
587   return $self->log->error("count: nothing found");
588}
589
590#
591# https://www.elastic.co/guide/en/elasticsearch/reference/current/full-text-queries.html
592# https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html
593#
594# Example: my $q = { query => { term => { ip => "192.168.57.19" } } }
595#
596sub query {
597   my $self = shift;
598   my ($query, $index, $type, $hash) = @_;
599
600   $index ||= $self->index;
601   $type ||= $self->type;
602   my $es = $self->_es;
603   $self->brik_help_run_undef_arg('open', $es) or return;
604   $self->brik_help_run_undef_arg('query', $query) or return;
605   $self->brik_help_set_undef_arg('index', $index) or return;
606   $self->brik_help_set_undef_arg('type', $type) or return;
607   $self->brik_help_run_invalid_arg('query', $query, 'HASH') or return;
608
609   my $timeout = $self->rtimeout;
610
611   my %args = (
612      index => $index,
613      body => $query,
614   );
615
616   if (defined($hash)) {
617      %args = ( %args, %$hash );
618   }
619
620   if ($type ne '*') {
621      $args{type} = $type;
622   }
623
624   my $r;
625   eval {
626      $r = $es->search(%args);
627   };
628   if ($@) {
629      chomp($@);
630      return $self->log->error("query: failed for index [$index]: [$@]");
631   }
632
633   return $r;
634}
635
636sub get_from_id {
637   my $self = shift;
638   my ($id, $index, $type) = @_;
639
640   $index ||= $self->index;
641   $type ||= $self->type;
642   my $es = $self->_es;
643   $self->brik_help_run_undef_arg('open', $es) or return;
644   $self->brik_help_run_undef_arg('get_from_id', $id) or return;
645   $self->brik_help_set_undef_arg('index', $index) or return;
646   $self->brik_help_set_undef_arg('type', $type) or return;
647
648   my $r;
649   eval {
650      $r = $es->get(
651         index => $index,
652         type => $type,
653         id => $id,
654      );
655   };
656   if ($@) {
657      chomp($@);
658      return $self->log->error("get_from_id: get failed for index [$index]: [$@]");
659   }
660
661   return $r;
662}
663
664#
665# https://www.elastic.co/guide/en/elasticsearch/reference/current/search-uri-request.html
666#
667sub www_search {
668   my $self = shift;
669   my ($query, $index, $type) = @_;
670
671   $index ||= $self->index;
672   $type ||= $self->type;
673   $self->brik_help_run_undef_arg('www_search', $query) or return;
674   $self->brik_help_set_undef_arg('index', $index) or return;
675   $self->brik_help_set_undef_arg('type', $type) or return;
676
677   my $from = $self->from;
678   my $size = $self->size;
679
680   my $sj = Metabrik::String::Json->new_from_brik_init($self) or return;
681
682   my $nodes = $self->nodes;
683   for my $node (@$nodes) {
684      # http://localhost:9200/INDEX/TYPE/_search/?size=SIZE&q=QUERY
685      my $url = "$node/$index";
686      if ($type ne '*') {
687         $url .= "/$type";
688      }
689      $url .= "/_search/?from=$from&size=$size&q=".$query;
690
691      my $get = $self->SUPER::get($url) or next;
692      my $body = $get->{content};
693
694      my $decoded = $sj->decode($body) or next;
695
696      return $decoded;
697   }
698
699   return;
700}
701
702#
703# Search::Elasticsearch::Client::2_0::Direct::Indices
704#
705sub delete_index {
706   my $self = shift;
707   my ($index) = @_;
708
709   my $es = $self->_es;
710   $self->brik_help_run_undef_arg('open', $es) or return;
711   $self->brik_help_run_undef_arg('delete_index', $index) or return;
712   $self->brik_help_run_invalid_arg('delete_index', $index, 'ARRAY', 'SCALAR') or return;
713
714   my %args = (
715      index => $index,
716   );
717
718   my $r;
719   eval {
720      $r = $es->indices->delete(%args);
721   };
722   if ($@) {
723      chomp($@);
724      return $self->log->error("delete_index: delete failed for index [$index]: [$@]");
725   }
726
727   return $r;
728}
729
730#
731# Search::Elasticsearch::Client::2_0::Direct::Indices
732#
733sub delete_document {
734   my $self = shift;
735   my ($index, $type, $id) = @_;
736
737   my $es = $self->_es;
738   $self->brik_help_run_undef_arg('open', $es) or return;
739   $self->brik_help_run_undef_arg('delete_document', $index) or return;
740   $self->brik_help_run_undef_arg('delete_document', $type) or return;
741   $self->brik_help_run_undef_arg('delete_document', $id) or return;
742
743   my %args = (
744      index => $index,
745      type => $type,
746      id => $id,
747   );
748
749   my $r;
750   eval {
751      $r = $es->delete(%args);
752   };
753   if ($@) {
754      chomp($@);
755      return $self->log->error("delete_document: delete failed for index [$index]: [$@]");
756   }
757
758   return $r;
759}
760
761#
762# https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html
763#
764# Example: my $q = { query => { term => { ip => "192.168.57.19" } } }
765#
766sub delete_by_query {
767   my $self = shift;
768   my ($query, $index, $type) = @_;
769
770   my $es = $self->_es;
771   $self->brik_help_run_undef_arg('open', $es) or return;
772   $self->brik_help_run_undef_arg('delete_by_query', $query) or return;
773   $self->brik_help_run_undef_arg('delete_by_query', $index) or return;
774   $self->brik_help_run_undef_arg('delete_by_query', $type) or return;
775   $self->brik_help_run_invalid_arg('delete_by_query', $query, 'HASH') or return;
776
777   my $timeout = $self->rtimeout;
778
779   my %args = (
780      index => $index,
781      type => $type,
782      body => $query,
783   );
784
785   my $r;
786   eval {
787      $r = $es->delete_by_query(%args);
788   };
789   if ($@) {
790      chomp($@);
791      return $self->log->error("delete_by_query: failed for index [$index]: [$@]");
792   }
793
794   # This may fail, we ignore it.
795   $self->refresh_index($index);
796
797   return $r;
798}
799
800#
801# Search::Elasticsearch::Client::2_0::Direct::Cat
802#
803sub show_indices {
804   my $self = shift;
805   my ($string) = @_;
806
807   my $es = $self->_es;
808   $self->brik_help_run_undef_arg('open', $es) or return;
809
810   my $r;
811   eval {
812      $r = $es->cat->indices;
813   };
814   if ($@) {
815      chomp($@);
816      return $self->log->error("show_indices: failed: [$@]");
817   }
818
819   my @lines = split(/\n/, $r);
820
821   if (@lines == 0) {
822      $self->log->warning("show_indices: nothing returned, no index?");
823   }
824
825   my @filtered = ();
826   if (defined($string)) {
827      for (@lines) {
828         if (m{$string}) {
829            push @filtered, $_;
830         }
831      }
832      @lines = @filtered;
833   }
834
835   return \@lines;
836}
837
838#
839# Search::Elasticsearch::Client::2_0::Direct::Cat
840#
841sub show_nodes {
842   my $self = shift;
843
844   my $es = $self->_es;
845   $self->brik_help_run_undef_arg('open', $es) or return;
846
847   my $r;
848   eval {
849      $r = $es->cat->nodes;
850   };
851   if ($@) {
852      chomp($@);
853      return $self->log->error("show_nodes: failed: [$@]");
854   }
855
856   my @lines = split(/\n/, $r);
857
858   if (@lines == 0) {
859      $self->log->warning("show_nodes: nothing returned, no nodes?");
860   }
861
862   return \@lines;
863}
864
865#
866# Search::Elasticsearch::Client::2_0::Direct::Cat
867#
868sub show_health {
869   my $self = shift;
870
871   my $es = $self->_es;
872   $self->brik_help_run_undef_arg('open', $es) or return;
873
874   my $r;
875   eval {
876      $r = $es->cat->health;
877   };
878   if ($@) {
879      chomp($@);
880      return $self->log->error("show_health: failed: [$@]");
881   }
882
883   my @lines = split(/\n/, $r);
884
885   if (@lines == 0) {
886      $self->log->warning("show_health: nothing returned, no recovery?");
887   }
888
889   return \@lines;
890}
891
892#
893# Search::Elasticsearch::Client::2_0::Direct::Cat
894#
895sub show_recovery {
896   my $self = shift;
897
898   my $es = $self->_es;
899   $self->brik_help_run_undef_arg('open', $es) or return;
900
901   my $r;
902   eval {
903      $r = $es->cat->recovery;
904   };
905   if ($@) {
906      chomp($@);
907      return $self->log->error("show_recovery: failed: [$@]");
908   }
909
910   my @lines = split(/\n/, $r);
911
912   if (@lines == 0) {
913      $self->log->warning("show_recovery: nothing returned, no index?");
914   }
915
916   return \@lines;
917}
918
919sub list_indices {
920   my $self = shift;
921
922   my $get = $self->get_indices or return;
923
924   my @indices = ();
925   for (@$get) {
926      push @indices, $_->{index};
927   }
928
929   return [ sort { $a cmp $b } @indices ];
930}
931
932sub get_indices {
933   my $self = shift;
934
935   my $lines = $self->show_indices or return;
936   if (@$lines == 0) {
937      $self->log->warning("get_indices: no index found");
938      return [];
939   }
940
941   #
942   # Format depends on ElasticSearch version. We try to detect the format.
943   #
944   # 5.0.0:
945   # "yellow open www-2016-08-14 BmNE9RaBRSCKqB5Oe8yZcw 5 1  146 0 251.8kb 251.8kb"
946   #
947   my @indices = ();
948   for (@$lines) {
949      my @t = split(/\s+/);
950      if (@t == 10) {  # Version 5.0.0
951         my $color = $t[0];
952         my $state = $t[1];
953         my $index = $t[2];
954         my $id = $t[3];
955         my $shards = $t[4];
956         my $replicas = $t[5];
957         my $count = $t[6];
958         my $count2 = $t[7];
959         my $total_size = $t[8];
960         my $size = $t[9];
961         push @indices, {
962            color => $color,
963            state => $state,
964            index => $index,
965            id => $id,
966            shards => $shards,
967            replicas => $replicas,
968            count => $count,
969            total_size => $total_size,
970            size => $size,
971         };
972      }
973      elsif (@t == 9) {
974         my $index = $t[2];
975         push @indices, {
976            index => $index,
977         };
978      }
979      elsif (@t == 8) {
980         my $index = $t[1];
981         push @indices, {
982            index => $index,
983         };
984      }
985   }
986
987   return \@indices;
988}
989
990#
991# Search::Elasticsearch::Client::5_0::Direct::Indices
992#
993sub get_index {
994   my $self = shift;
995   my ($index) = @_;
996 
997   my $es = $self->_es;
998   $self->brik_help_run_undef_arg('open', $es) or return;
999   $self->brik_help_run_undef_arg('get_index', $index) or return;
1000   $self->brik_help_run_invalid_arg('get_index', $index, 'ARRAY', 'SCALAR') or return;
1001
1002   my %args = (
1003      index => $index,
1004   );
1005
1006   my $r;
1007   eval {
1008      $r = $es->indices->get(%args);
1009   };
1010   if ($@) {
1011      chomp($@);
1012      return $self->log->error("get_index: get failed for index [$index]: [$@]");
1013   }
1014
1015   return $r;
1016}
1017
1018sub list_index_types {
1019   my $self = shift;
1020   my ($index) = @_;
1021
1022   my $es = $self->_es;
1023   $self->brik_help_run_undef_arg('open', $es) or return;
1024   $self->brik_help_run_undef_arg('list_index_types', $index) or return;
1025   $self->brik_help_run_invalid_arg('list_index_types', $index, 'SCALAR') or return;
1026
1027   my $r = $self->get_mappings($index) or return;
1028   if (keys %$r > 1) {
1029      return $self->log->error("list_index_types: multiple indices found, choose one");
1030   }
1031
1032   my @types = ();
1033   for my $this_index (keys %$r) {
1034      my $mappings = $r->{$this_index}{mappings};
1035      push @types, keys %$mappings;
1036   }
1037
1038   my %uniq = map { $_ => 1 } @types;
1039
1040   return [ sort { $a cmp $b } keys %uniq ];
1041}
1042
1043#
1044# By default, if you provide only one index and no type,
1045# all types will be merged (including _default_)
1046# If you specify one type (other than _default_), _default_ will be merged to it.
1047#
1048sub list_index_fields {
1049   my $self = shift;
1050   my ($index, $type) = @_;
1051
1052   my $es = $self->_es;
1053   $self->brik_help_run_undef_arg('open', $es) or return;
1054   $self->brik_help_run_undef_arg('list_index_fields', $index) or return;
1055   $self->brik_help_run_invalid_arg('list_index_fields', $index, 'SCALAR') or return;
1056
1057   my $r;
1058   if (defined($type)) {
1059      $r = $self->get_mappings($index, $type) or return;
1060      if (keys %$r > 1) {
1061         return $self->log->error("list_index_fields: multiple indices found, ".
1062            "choose one");
1063      }
1064      my $r2 = $self->get_mappings($index, '_default_') or return;
1065      # Merge
1066      for my $this_index (keys %$r2) {
1067         my $default = $r2->{$this_index}{mappings}{'_default_'};
1068         $r->{$this_index}{mappings}{_default_} = $default;
1069      }
1070   }
1071   else {
1072      $r = $self->get_mappings($index) or return;
1073      if (keys %$r > 1) {
1074         return $self->log->error("list_index_fields: multiple indices found, ".
1075            "choose one");
1076      }
1077   }
1078
1079   my @fields = ();
1080   for my $this_index (keys %$r) {
1081      my $mappings = $r->{$this_index}{mappings};
1082      for my $this_type (keys %$mappings) {
1083         my $properties = $mappings->{$this_type}{properties};
1084         push @fields, keys %$properties;
1085      }
1086   }
1087
1088   my %uniq = map { $_ => 1 } @fields;
1089
1090   return [ sort { $a cmp $b } keys %uniq ];
1091}
1092
1093sub list_indices_version {
1094   my $self = shift;
1095   my ($index) = @_;
1096
1097   my $es = $self->_es;
1098   $self->brik_help_run_undef_arg('open', $es) or return;
1099   $self->brik_help_run_undef_arg('list_indices_version', $index) or return;
1100   $self->brik_help_run_invalid_arg('list_indices_version', $index, 'ARRAY', 'SCALAR')
1101      or return;
1102
1103   my $r = $self->get_index($index) or return;
1104
1105   my @list = ();
1106   for my $this (keys %$r) {
1107      my $name = $this;
1108      my $version = $r->{$this}{settings}{index}{version}{created};
1109      push @list, {
1110         index => $name,
1111         version => $version,
1112      };
1113   }
1114
1115   return \@list;
1116}
1117
1118sub open_index {
1119   my $self = shift;
1120   my ($index) = @_;
1121
1122   my $es = $self->_es;
1123   $self->brik_help_run_undef_arg('open', $es) or return;
1124   $self->brik_help_run_undef_arg('open_index', $index) or return;
1125   $self->brik_help_run_invalid_arg('open_index', $index, 'ARRAY', 'SCALAR') or return;
1126
1127   my $r;
1128   eval {
1129      $r = $es->indices->open(
1130         index => $index,
1131      );
1132   };
1133   if ($@) {
1134      chomp($@);
1135      return $self->log->error("open_index: failed: [$@]");
1136   }
1137
1138   return $r;
1139}
1140
1141sub close_index {
1142   my $self = shift;
1143   my ($index) = @_;
1144
1145   my $es = $self->_es;
1146   $self->brik_help_run_undef_arg('open', $es) or return;
1147   $self->brik_help_run_undef_arg('close_index', $index) or return;
1148   $self->brik_help_run_invalid_arg('close_index', $index, 'ARRAY', 'SCALAR') or return;
1149
1150   my $r;
1151   eval {
1152      $r = $es->indices->close(
1153         index => $index,
1154      );
1155   };
1156   if ($@) {
1157      chomp($@);
1158      return $self->log->error("close_index: failed: [$@]");
1159   }
1160
1161   return $r;
1162}
1163
1164#
1165# Search::Elasticsearch::Client::5_0::Direct::Indices
1166#
1167sub get_aliases {
1168   my $self = shift;
1169   my ($index) = @_;
1170
1171   $index ||= $self->index;
1172   my $es = $self->_es;
1173   $self->brik_help_run_undef_arg('open', $es) or return;
1174
1175   my %args = (
1176      index => $index,
1177   );
1178
1179   my $r;
1180   eval {
1181      $r = $es->indices->get(%args);
1182   };
1183   if ($@) {
1184      chomp($@);
1185      return $self->log->error("get_aliases: get_aliases failed: [$@]");
1186   }
1187
1188   my %aliases = ();
1189   for my $this (keys %$r) {
1190      $aliases{$this} = $r->{$this}{aliases};
1191   }
1192
1193   return \%aliases;
1194}
1195
1196#
1197# Search::Elasticsearch::Client::5_0::Direct::Indices
1198#
1199sub put_alias {
1200   my $self = shift;
1201   my ($index, $alias) = @_;
1202
1203   my $es = $self->_es;
1204   $self->brik_help_run_undef_arg('open', $es) or return;
1205   $self->brik_help_run_undef_arg('put_alias', $index) or return;
1206   $self->brik_help_run_undef_arg('put_alias', $alias) or return;
1207
1208   my %args = (
1209      index => $index,
1210      name => $alias,
1211   );
1212
1213   my $r;
1214   eval {
1215      $r = $es->indices->put_alias(%args);
1216   };
1217   if ($@) {
1218      chomp($@);
1219      return $self->log->error("put_alias: put_alias failed: [$@]");
1220   }
1221
1222   return $r;
1223}
1224
1225#
1226# Search::Elasticsearch::Client::5_0::Direct::Indices
1227#
1228sub delete_alias {
1229   my $self = shift;
1230   my ($index, $alias) = @_;
1231
1232   my $es = $self->_es;
1233   $self->brik_help_run_undef_arg('open', $es) or return;
1234   $self->brik_help_run_undef_arg('delete_alias', $index) or return;
1235   $self->brik_help_run_undef_arg('delete_alias', $alias) or return;
1236
1237   my %args = (
1238      index => $index,
1239      name => $alias,
1240   );
1241
1242   my $r;
1243   eval {
1244      $r = $es->indices->delete_alias(%args);
1245   };
1246   if ($@) {
1247      chomp($@);
1248      return $self->log->error("delete_alias: delete_alias failed: [$@]");
1249   }
1250
1251   return $r;
1252}
1253
1254sub update_alias {
1255   my $self = shift;
1256   my ($new_index, $alias) = @_;
1257
1258   my $es = $self->_es;
1259   $self->brik_help_run_undef_arg('open', $es) or return;
1260   $self->brik_help_run_undef_arg('update_alias', $new_index) or return;
1261   $self->brik_help_run_undef_arg('update_alias', $alias) or return;
1262
1263   # Search for previous index with that alias, if any.
1264   my $prev_index;
1265   my $aliases = $self->get_aliases or return;
1266   while (my ($k, $v) = each %$aliases) {
1267      for my $this (keys %$v) {
1268         if ($this eq $alias) {
1269            $prev_index = $k;
1270            last;
1271         }
1272      }
1273      last if $prev_index;
1274   }
1275
1276   # Delete previous alias if it exists.
1277   if (defined($prev_index)) {
1278      $self->delete_alias($prev_index, $alias) or return;
1279   }
1280
1281   return $self->put_alias($new_index, $alias);
1282}
1283
1284#
1285# Search::Elasticsearch::Client::2_0::Direct::Indices
1286#
1287sub get_mappings {
1288   my $self = shift;
1289   my ($index, $type) = @_;
1290
1291   my $es = $self->_es;
1292   $self->brik_help_run_undef_arg('open', $es) or return;
1293   $self->brik_help_run_undef_arg('get_mappings', $index) or return;
1294   $self->brik_help_run_invalid_arg('get_mappings', $index, 'ARRAY', 'SCALAR') or return;
1295
1296   my %args = (
1297      index => $index,
1298      type => $type,
1299   );
1300
1301   my $r;
1302   eval {
1303      $r = $es->indices->get_mapping(%args);
1304   };
1305   if ($@) {
1306      chomp($@);
1307      return $self->log->error("get_mappings: get_mapping failed for index [$index]: ".
1308         "[$@]");
1309   }
1310
1311   return $r;
1312}
1313
1314#
1315# Search::Elasticsearch::Client::2_0::Direct::Indices
1316#
1317sub create_index {
1318   my $self = shift;
1319   my ($index, $shards_count) = @_;
1320
1321   my $es = $self->_es;
1322   $self->brik_help_run_undef_arg('open', $es) or return;
1323   $self->brik_help_run_undef_arg('create_index', $index) or return;
1324         
1325   my $r;
1326   eval {
1327      $r = $es->indices->create(
1328         index => $index,
1329      );
1330   };
1331   if ($@) {
1332      chomp($@);
1333      return $self->log->error("create_index: create failed for index [$index]: [$@]");
1334   }
1335   
1336   return $r;
1337}
1338
1339#
1340# https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html
1341#
1342sub create_index_with_mappings {
1343   my $self = shift;
1344   my ($index, $mappings) = @_;
1345
1346   my $es = $self->_es;
1347   $self->brik_help_run_undef_arg('open', $es) or return;
1348   $self->brik_help_run_undef_arg('create_index_with_mappings', $index) or return;
1349   $self->brik_help_run_undef_arg('create_index_with_mappings', $mappings) or return;
1350   $self->brik_help_run_invalid_arg('create_index_with_mappings', $mappings, 'HASH') or return;
1351
1352   my $r;
1353   eval {
1354      $r = $es->indices->create(
1355         index => $index,
1356         body => {
1357            mappings => $mappings,
1358         },
1359      );
1360   };
1361   if ($@) {
1362      chomp($@);
1363      return $self->log->error("create_index_with_mappings: create failed for index [$index]: [$@]");
1364   }
1365
1366   return $r;
1367}
1368
1369# GET http://localhost:9200/
1370sub info {
1371   my $self = shift;
1372   my ($nodes) = @_;
1373
1374   $nodes ||= $self->nodes;
1375   $self->brik_help_run_undef_arg('info', $nodes) or return;
1376   $self->brik_help_run_invalid_arg('info', $nodes, 'ARRAY') or return;
1377   $self->brik_help_run_empty_array_arg('info', $nodes) or return;
1378
1379   my $first = $nodes->[0];
1380
1381   $self->get($first) or return;
1382
1383   return $self->content;
1384}
1385
1386sub version {
1387   my $self = shift;
1388   my ($nodes) = @_;
1389
1390   $nodes ||= $self->nodes;
1391   $self->brik_help_run_undef_arg('version', $nodes) or return;
1392   $self->brik_help_run_invalid_arg('version', $nodes, 'ARRAY') or return;
1393   $self->brik_help_run_empty_array_arg('version', $nodes) or return;
1394
1395   my $first = $nodes->[0];
1396
1397   $self->get($first) or return;
1398   my $content = $self->content or return;
1399
1400   return $content->{version}{number};
1401}
1402
1403#
1404# Search::Elasticsearch::Client::2_0::Direct::Indices
1405#
1406sub get_templates {
1407   my $self = shift;
1408
1409   my $es = $self->_es;
1410   $self->brik_help_run_undef_arg('open', $es) or return;
1411
1412   my $r;
1413   eval {
1414      $r = $es->indices->get_template;
1415   };
1416   if ($@) {
1417      chomp($@);
1418      return $self->log->error("get_templates: failed: [$@]");
1419   }
1420
1421   return $r;
1422}
1423
1424sub list_templates {
1425   my $self = shift;
1426
1427   my $content = $self->get_templates or return;
1428
1429   return [ sort { $a cmp $b } keys %$content ];
1430}
1431
1432#
1433# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
1434#
1435sub get_template {
1436   my $self = shift;
1437   my ($template) = @_;
1438
1439   my $es = $self->_es;
1440   $self->brik_help_run_undef_arg('open', $es) or return;
1441   $self->brik_help_run_undef_arg('get_template', $template) or return;
1442
1443   my $r;
1444   eval {
1445      $r = $es->indices->get_template(
1446         name => $template,
1447      );
1448   };
1449   if ($@) {
1450      chomp($@);
1451      return $self->log->error("get_template: template failed for name [$template]: [$@]");
1452   }
1453
1454   return $r;
1455}
1456
1457#
1458# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
1459#
1460sub put_template {
1461   my $self = shift;
1462   my ($name, $template) = @_;
1463
1464   my $es = $self->_es;
1465   $self->brik_help_run_undef_arg('open', $es) or return;
1466   $self->brik_help_run_undef_arg('put_template', $name) or return;
1467   $self->brik_help_run_undef_arg('put_template', $template) or return;
1468   $self->brik_help_run_invalid_arg('put_template', $template, 'HASH') or return;
1469
1470   my $r;
1471   eval {
1472      $r = $es->indices->put_template(
1473         name => $name,
1474         body => $template,
1475      );
1476   };
1477   if ($@) {
1478      chomp($@);
1479      return $self->log->error("put_template: template failed for name [$name]: [$@]");
1480   }
1481
1482   return $r;
1483}
1484
1485sub put_template_from_json_file {
1486   my $self = shift;
1487   my ($json_file) = @_;
1488
1489   my $es = $self->_es;
1490   $self->brik_help_run_undef_arg('open', $es) or return;
1491   $self->brik_help_run_undef_arg('put_template_from_json_file', $json_file) or return;
1492   $self->brik_help_run_file_not_found('put_template_from_json_file', $json_file) or return;
1493
1494   my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
1495   my $data = $fj->read($json_file) or return;
1496
1497   if (! exists($data->{template})) {
1498      return $self->log->error("put_template_from_json_file: no template name found");
1499   }
1500
1501   my $name = $data->{template};
1502
1503   return $self->put_template($name, $data);
1504}
1505
1506#
1507# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-settings.html
1508# Search::Elasticsearch::Client::2_0::Direct::Indices
1509#
1510sub get_settings {
1511   my $self = shift;
1512   my ($indices, $names) = @_;
1513
1514   my $es = $self->_es;
1515   $self->brik_help_run_undef_arg('open', $es) or return;
1516
1517   my %args = ();
1518   if (defined($indices)) {
1519      $self->brik_help_run_undef_arg('get_settings', $indices) or return;
1520      my $ref = $self->brik_help_run_invalid_arg('get_settings', $indices, 'ARRAY', 'SCALAR')
1521         or return;
1522      $args{index} = $indices;
1523   }
1524   if (defined($names)) {
1525      $self->brik_help_run_file_not_found('get_settings', $names) or return;
1526      my $ref = $self->brik_help_run_invalid_arg('get_settings', $names, 'ARRAY', 'SCALAR')
1527         or return;
1528      $args{name} = $names;
1529   }
1530
1531   my $r;
1532   eval {
1533      $r = $es->indices->get_settings(%args);
1534   };
1535   if ($@) {
1536      chomp($@);
1537      return $self->log->error("get_settings: failed: [$@]");
1538   }
1539
1540   return $r;
1541}
1542
1543#
1544# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-settings.html
1545# Search::Elasticsearch::Client::2_0::Direct::Indices
1546#
1547# Example:
1548#
1549# run client::elasticsearch put_settings "{ index => { refresh_interval => -1 } }"
1550#
1551# XXX: should be renamed to put_index_settings
1552#
1553sub put_settings {
1554   my $self = shift;
1555   my ($settings, $indices) = @_;
1556
1557   my $es = $self->_es;
1558   $self->brik_help_run_undef_arg('open', $es) or return;
1559   $self->brik_help_run_undef_arg('put_settings', $settings) or return;
1560   $self->brik_help_run_invalid_arg('put_settings', $settings, 'HASH') or return;
1561
1562   my %args = (
1563      body => $settings,
1564   );
1565   if (defined($indices)) {
1566      $self->brik_help_run_undef_arg('put_settings', $indices) or return;
1567      my $ref = $self->brik_help_run_invalid_arg('put_settings', $indices, 'ARRAY', 'SCALAR')
1568         or return;
1569      $args{index} = $indices;
1570   }
1571
1572   my $r;
1573   eval {
1574      $r = $es->indices->put_settings(%args);
1575   };
1576   if ($@) {
1577      chomp($@);
1578      return $self->log->error("put_settings: failed: [$@]");
1579   }
1580
1581   return $r;
1582}
1583
1584sub set_index_number_of_replicas {
1585   my $self = shift;
1586   my ($indices, $number) = @_;
1587
1588   my $es = $self->_es;
1589   $self->brik_help_run_undef_arg('open', $es) or return;
1590   $self->brik_help_run_undef_arg('set_index_number_of_replicas', $indices) or return;
1591   $self->brik_help_run_invalid_arg('set_index_number_of_replicas', $indices, 'ARRAY', 'SCALAR')
1592      or return;
1593
1594   my $settings = { number_of_replicas => $number };
1595
1596   return $self->put_settings($settings, $indices);
1597}
1598
1599sub set_index_refresh_interval {
1600   my $self = shift;
1601   my ($indices, $number) = @_;
1602
1603   my $es = $self->_es;
1604   $self->brik_help_run_undef_arg('open', $es) or return;
1605   $self->brik_help_run_undef_arg('set_index_refresh_interval', $indices) or return;
1606   $self->brik_help_run_invalid_arg('set_index_refresh_interval', $indices, 'ARRAY', 'SCALAR')
1607      or return;
1608
1609   # If there is a meaningful value not postfixed with a unity,
1610   # we default to add a `s' for a number of seconds.
1611   if ($number =~ /^\d+$/ && $number > 0) {
1612      $number .= 's';
1613   }
1614
1615   my $settings = { refresh_interval => $number };
1616
1617   return $self->put_settings($settings, $indices);
1618}
1619
1620sub get_index_number_of_replicas {
1621   my $self = shift;
1622   my ($indices) = @_;
1623
1624   my $es = $self->_es;
1625   $self->brik_help_run_undef_arg('open', $es) or return;
1626   $self->brik_help_run_undef_arg('get_index_number_of_replicas', $indices) or return;
1627   $self->brik_help_run_invalid_arg('get_index_number_of_replicas', $indices, 'ARRAY', 'SCALAR')
1628      or return;
1629
1630   my $settings = $self->get_settings($indices);
1631
1632   my %indices = ();
1633   for (keys %$settings) {
1634      $indices{$_} = $settings->{$_}{settings}{index}{number_of_replicas};
1635   }
1636
1637   return \%indices;
1638}
1639
1640sub get_index_refresh_interval {
1641   my $self = shift;
1642   my ($indices, $number) = @_;
1643
1644   my $es = $self->_es;
1645   $self->brik_help_run_undef_arg('open', $es) or return;
1646   $self->brik_help_run_undef_arg('get_index_refresh_interval', $indices) or return;
1647   $self->brik_help_run_invalid_arg('get_index_refresh_interval', $indices, 'ARRAY', 'SCALAR')
1648      or return;
1649
1650   my $settings = $self->get_settings($indices);
1651
1652   my %indices = ();
1653   for (keys %$settings) {
1654      $indices{$_} = $settings->{$_}{settings}{index}{refresh_interval};
1655   }
1656
1657   return \%indices;
1658}
1659
1660sub get_index_number_of_shards {
1661   my $self = shift;
1662   my ($indices, $number) = @_;
1663
1664   my $es = $self->_es;
1665   $self->brik_help_run_undef_arg('open', $es) or return;
1666   $self->brik_help_run_undef_arg('get_index_number_of_shards', $indices) or return;
1667   $self->brik_help_run_invalid_arg('get_index_number_of_shards', $indices, 'ARRAY', 'SCALAR')
1668      or return;
1669
1670   my $settings = $self->get_settings($indices);
1671
1672   my %indices = ();
1673   for (keys %$settings) {
1674      $indices{$_} = $settings->{$_}{settings}{index}{number_of_shards};
1675   }
1676
1677   return \%indices;
1678}
1679
1680#
1681# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
1682#
1683sub delete_template {
1684   my $self = shift;
1685   my ($name) = @_;
1686
1687   my $es = $self->_es;
1688   $self->brik_help_run_undef_arg('open', $es) or return;
1689   $self->brik_help_run_undef_arg('delete_template', $name) or return;
1690
1691   my $r;
1692   eval {
1693      $r = $es->indices->delete_template(
1694         name => $name,
1695      );
1696   };
1697   if ($@) {
1698      chomp($@);
1699      return $self->log->error("delete_template: failed for name [$name]: [$@]");
1700   }
1701
1702   return $r;
1703}
1704
1705#
1706# Return a boolean to state for index existence
1707#
1708sub is_index_exists {
1709   my $self = shift;
1710   my ($index) = @_;
1711
1712   my $es = $self->_es;
1713   $self->brik_help_run_undef_arg('open', $es) or return;
1714   $self->brik_help_run_undef_arg('is_index_exists', $index) or return;
1715
1716   my $r;
1717   eval {
1718      $r = $es->indices->exists(
1719         index => $index,
1720      );
1721   };
1722   if ($@) {
1723      chomp($@);
1724      return $self->log->error("is_index_exists: failed for index [$index]: [$@]");
1725   }
1726
1727   return $r ? 1 : 0;
1728}
1729
1730#
1731# Return a boolean to state for index with type existence
1732#
1733sub is_type_exists {
1734   my $self = shift;
1735   my ($index, $type) = @_;
1736
1737   my $es = $self->_es;
1738   $self->brik_help_run_undef_arg('open', $es) or return;
1739   $self->brik_help_run_undef_arg('is_type_exists', $index) or return;
1740   $self->brik_help_run_undef_arg('is_type_exists', $type) or return;
1741
1742   my $r;
1743   eval {
1744      $r = $es->indices->exists_type(
1745         index => $index,
1746         type => $type,
1747      );
1748   };
1749   if ($@) {
1750      chomp($@);
1751      return $self->log->error("is_type_exists: failed for index [$index] and ".
1752         "type [$type]: [$@]");
1753   }
1754
1755   return $r ? 1 : 0;
1756}
1757
1758#
1759# Return a boolean to state for document existence
1760#
1761sub is_document_exists {
1762   my $self = shift;
1763   my ($index, $type, $document) = @_;
1764
1765   my $es = $self->_es;
1766   $self->brik_help_run_undef_arg('open', $es) or return;
1767   $self->brik_help_run_undef_arg('is_document_exists', $index) or return;
1768   $self->brik_help_run_undef_arg('is_document_exists', $type) or return;
1769   $self->brik_help_run_undef_arg('is_document_exists', $document) or return;
1770   $self->brik_help_run_invalid_arg('is_document_exists', $document, 'HASH') or return;
1771
1772   my $r;
1773   eval {
1774      $r = $es->exists(
1775         index => $index,
1776         type => $type,
1777         %$document,
1778      );
1779   };
1780   if ($@) {
1781      chomp($@);
1782      return $self->log->error("is_document_exists: failed for index [$index] and ".
1783         "type [$type]: [$@]");
1784   }
1785
1786   return $r ? 1 : 0;
1787}
1788
1789sub parse_error_string {
1790   my $self = shift;
1791   my ($string) = @_;
1792
1793   $self->brik_help_run_undef_arg('parse_error_string', $string) or return;
1794
1795   # [Timeout] ** [http://X.Y.Z.1:9200]-[599] Timed out while waiting for socket to become ready for reading, called from sub Search::Elasticsearch::Role::Client::Direct::__ANON__ at /usr/local/lib/perl5/site_perl/Metabrik/Client/Elasticsearch.pm line 1466. With vars: {'status_code' => 599,'request' => {'body' => undef,'qs' => {},'ignore' => [],'serialize' => 'std','path' => '/index-thing/_refresh','method' => 'POST'}}
1796
1797   my ($class, $node, $code, $message, $dump) = $string =~
1798      m{^\[([^]]+)\] \*\* \[([^]]+)\]\-\[(\d+)\] (.+)\. With vars: (.+)$};
1799
1800   if (defined($dump) && length($dump)) {
1801      my $sd = Metabrik::String::Dump->new_from_brik_init($self) or return;
1802      $dump = $sd->decode($dump);
1803   }
1804
1805   # Sanity check
1806   if (defined($node) && $node =~ m{^http} && $code =~ m{^\d+$}
1807   &&  defined($dump) && ref($dump) eq 'HASH') {
1808      return {
1809         class => $class,
1810         node => $node,
1811         code => $code,
1812         message => $message,
1813         dump => $dump,
1814      };
1815   }
1816
1817   # Were not able to decode, we return as-is.
1818   return {
1819      message => $string,
1820   };
1821}
1822
1823#
1824# Refresh an index to receive latest additions
1825#
1826# Search::Elasticsearch::Client::5_0::Direct::Indices
1827# https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-refresh.html
1828#
1829sub refresh_index {
1830   my $self = shift;
1831   my ($index) = @_;
1832
1833   my $es = $self->_es;
1834   $self->brik_help_run_undef_arg('open', $es) or return;
1835   $self->brik_help_run_undef_arg('refresh_index', $index) or return;
1836
1837   my $try = $self->try;
1838
1839RETRY:
1840
1841   my $r;
1842   eval {
1843      $r = $es->indices->refresh(
1844         index => $index,
1845      );
1846   };
1847   if ($@) {
1848      if (--$try == 0) {
1849         chomp($@);
1850         my $p = $self->parse_error_string($@);
1851         if (defined($p) && exists($p->{class})) {
1852            my $class = $p->{class};
1853            my $code = $p->{code};
1854            my $node = $p->{node};
1855            return $self->log->error("refresh_index: failed for index [$index] ".
1856               "after [$try] tries with error [$class] code [$code] for node [$node]");
1857         }
1858         else {
1859            return $self->log->error("refresh_index: failed for index [$index] ".
1860               "after [$try]: [$@]");
1861         }
1862      }
1863      sleep 60;
1864      goto RETRY;
1865   }
1866
1867   return $r;
1868}
1869
1870sub export_as_csv {
1871   my $self = shift;
1872   my ($index, $size) = @_;
1873
1874   $size ||= 10_000;
1875   my $es = $self->_es;
1876   $self->brik_help_run_undef_arg('open', $es) or return;
1877   $self->brik_help_run_undef_arg('export_as_csv', $index) or return;
1878   $self->brik_help_run_undef_arg('export_as_csv', $size) or return;
1879
1880   my $max = $self->max;
1881
1882   my $scroll;
1883   my $version = $self->version or return;
1884   if ($version lt "5.0.0") {
1885      $scroll = $self->open_scroll_scan_mode($index, $size) or return;
1886   }
1887   else {
1888      $scroll = $self->open_scroll($index, $size) or return;
1889   }
1890
1891   my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;
1892
1893   my $fc = Metabrik::File::Csv->new_from_brik_init($self) or return;
1894   $fc->separator(',');
1895   $fc->escape('\\');
1896   $fc->append(1);
1897   $fc->first_line_is_header(0);
1898   $fc->write_header(1);
1899   $fc->use_quoting(1);
1900   $fc->encoded_fields($self->csv_encoded_fields);
1901   $fc->object_fields($self->csv_object_fields);
1902
1903   my $total = $self->total_scroll;
1904   $self->log->info("export_as_csv: total [$total] for index [$index]");
1905
1906   my $h = {};
1907   my %types = ();
1908   my $read = 0;
1909   my $skipped = 0;
1910   my $exported = 0;
1911   my $start = time();
1912   my $done = 'output.exported';
1913   my $start_time = time();
1914   while (my $this = $self->next_scroll) {
1915      $read++;
1916      my $id = $this->{_id};
1917      my $doc = $this->{_source};
1918      my $type = $this->{_type};
1919      if (! exists($types{$type})) {
1920         my $fields = $self->list_index_fields($index, $type) or return;
1921         #$types{$type}{header} = [ '_id', sort { $a cmp $b } keys %$doc ];
1922         $types{$type}{header} = [ '_id', @$fields ];
1923         $types{$type}{output} = "$index:$type.csv";
1924         $done = $types{$type}{output_exported} = "$index:$type.csv.exported";
1925
1926         # Verify it has not been exported yet
1927         if (-f $types{$type}{output_exported}) {
1928            return $self->log->error("export_as_csv: export already done for index ".
1929               "[$index] with type [$type] and file [$index:$type.csv]");
1930         }
1931
1932         $self->log->info("export_as_csv: exporting to file [$index:$type.csv] ".
1933            "for new type [$type], using chunk size of [$size]");
1934      }
1935
1936      $h->{_id} = $id;
1937
1938      for my $k (keys %$doc) {
1939         $h->{$k} = $doc->{$k};
1940      }
1941
1942      $fc->header($types{$type}{header});
1943      my $r = $fc->write([ $h ], $types{$type}{output});
1944      if (!defined($r)) {
1945         $self->log->warning("export_as_csv: unable to process entry, skipping");
1946         $skipped++;
1947         next;
1948      }
1949
1950      # Log a status sometimes.
1951      if (! (++$exported % 100_000)) {
1952         my $now = time();
1953         $self->log->info("export_as_csv: fetched [$exported/$total] elements in ".
1954            ($now - $start)." second(s) from index [$index]");
1955         $start = time();
1956      }
1957
1958      # Limit export to specified maximum
1959      if ($max > 0 && $exported >= $max) {
1960         $self->log->info("export_as_csv: max export reached [$exported] for index ".
1961            "[$index], stopping");
1962         last;
1963      }
1964   }
1965
1966   $self->close_scroll;
1967
1968   my $stop_time = time();
1969   my $duration = $stop_time - $start_time;
1970   my $eps = $exported;
1971   if ($duration > 0) {
1972      $eps = $exported / $duration;
1973   }
1974
1975   my $result = {
1976      read => $read,
1977      exported => $exported,
1978      skipped => $read - $exported,
1979      total_count => $total,
1980      complete => ($exported == $total) ? 1 : 0,
1981      duration => $duration,
1982      eps => $eps, 
1983   };
1984
1985   # Say the file has been processed, and put resulting stats.
1986   $fd->write($result, $done) or return;
1987
1988   return $result;
1989}
1990
1991#
1992# Optimization instructions:
1993# https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html
1994#
1995sub import_from_csv {
1996   my $self = shift;
1997   my ($input_csv, $index, $type) = @_;
1998
1999   my $es = $self->_es;
2000   $self->brik_help_run_undef_arg('open', $es) or return;
2001   $self->brik_help_run_undef_arg('import_from_csv', $input_csv) or return;
2002   $self->brik_help_run_file_not_found('import_from_csv', $input_csv) or return;
2003
2004   # If index and/or types are not defined, we try to get them from input filename
2005   if (! defined($index) || ! defined($type)) {
2006      # Example: index-DATE:type.csv
2007      if ($input_csv =~ m{^(.+):(.+)\.csv(?:.*)?$}) {
2008         my ($this_index, $this_type) = $input_csv =~ m{^(.+):(.+)\.csv(?:.*)?$};
2009         $index ||= $this_index;
2010         $type ||= $this_type;
2011      }
2012   }
2013
2014   # Verify it has not been indexed yet
2015   my $done = "$input_csv.imported";
2016   if (-f $done) {
2017      $self->log->info("import_from_csv: import already done for file [$input_csv]");
2018      return 0;
2019   }
2020
2021   # And default to Attributes if guess failed.
2022   $index ||= $self->index;
2023   $type ||= $self->type;
2024   $self->brik_help_set_undef_arg('index', $index) or return;
2025   $self->brik_help_set_undef_arg('type', $type) or return;
2026
2027   if ($index eq '*') {
2028      return $self->log->error("import_from_csv: cannot import to invalid index [$index]");
2029   }
2030   if ($type eq '*') {
2031      return $self->log->error("import_from_csv: cannot import to invalid type [$type]");
2032   }
2033
2034   $self->log->debug("input [$input_csv]");
2035   $self->log->debug("index [$index]");
2036   $self->log->debug("type [$type]");
2037
2038   my $count_before = 0;
2039   if ($self->is_index_exists($index)) {
2040      $count_before = $self->count($index, $type);
2041      if (! defined($count_before)) {
2042         return;
2043      }
2044      $self->log->info("import_from_csv: current index [$index] count is ".
2045         "[$count_before]");
2046   }
2047
2048   my $max = $self->max;
2049
2050   $self->open_bulk_mode($index, $type) or return;
2051
2052   $self->log->info("import_from_csv: importing file [$input_csv] to index [$index] ".
2053      "with type [$type]");
2054
2055   my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;
2056
2057   my $fc = Metabrik::File::Csv->new_from_brik_init($self) or return;
2058   $fc->separator(',');
2059   $fc->escape('\\');
2060   $fc->first_line_is_header(1);
2061   $fc->encoded_fields($self->csv_encoded_fields);
2062   $fc->object_fields($self->csv_object_fields);
2063
2064   my $refresh_interval;
2065   my $number_of_replicas;
2066   my $start = time();
2067   my $speed_settings = {};
2068   my $imported = 0;
2069   my $first = 1;
2070   my $read = 0;
2071   my $skipped_chunks = 0;
2072   my $start_time = time();
2073   while (my $this = $fc->read_next($input_csv)) {
2074      $read++;
2075
2076      my $h = {};
2077      my $id = $this->{_id};
2078      delete $this->{_id};
2079      for my $k (keys %$this) {
2080         my $value = $this->{$k};
2081         # We keep only fields when they have a value.
2082         # No need to index data that is empty.
2083         if (defined($value) && length($value)) {
2084            $h->{$k} = $value;
2085         }
2086      }
2087
2088      my $r = $self->index_bulk($h, $index, $type, $id);
2089      if (! defined($r)) {
2090         $self->log->error("import_from_csv: bulk processing failed for index [$index] ".
2091            "at read [$read], skipping chunk");
2092         $skipped_chunks++;
2093         next;
2094      }
2095
2096      # Gather index settings, and set values for speed.
2097      # We don't do it earlier, cause we need index to be created,
2098      # and it should have been done from index_bulk Command.
2099      if ($first && $self->is_index_exists($index)) {
2100         # Save current values so we can restore them at the end of Command.
2101         # We ignore errors here, this is non-blocking for indexing.
2102         $refresh_interval = $self->get_index_refresh_interval($index);
2103         $refresh_interval = $refresh_interval->{$index};
2104         $number_of_replicas = $self->get_index_number_of_replicas($index);
2105         $number_of_replicas = $number_of_replicas->{$index};
2106         if ($self->use_indexing_optimizations) {
2107            $self->set_index_number_of_replicas($index, 0);
2108         }
2109         $self->set_index_refresh_interval($index, -1);
2110         $first = 0;
2111      }
2112
2113      # Log a status sometimes.
2114      if (! (++$imported % 100_000)) {
2115         my $now = time();
2116         $self->log->info("import_from_csv: imported [$imported] entries in ".
2117            ($now - $start)." second(s) to index [$index]");
2118         $start = time();
2119      }
2120
2121      # Limit import to specified maximum
2122      if ($max > 0 && $imported >= $max) {
2123         $self->log->info("import_from_csv: max import reached [$imported] for ".
2124            "index [$index], stopping");
2125         last;
2126      }
2127   }
2128
2129   $self->bulk_flush;
2130
2131   my $stop_time = time();
2132   my $duration = $stop_time - $start_time;
2133   my $eps = $imported / ($duration || 1);  # Avoid divide by zero error.
2134
2135   $self->refresh_index($index);
2136
2137   my $count_current = $self->count($index, $type) or return;
2138   $self->log->info("import_from_csv: after index [$index] count is [$count_current]");
2139
2140   my $skipped = 0;
2141   my $complete = (($count_current - $count_before) == $read) ? 1 : 0;
2142   if ($complete) {  # If complete, import has been retried, and everything is now ok.
2143      $imported = $read;
2144   }
2145   else {
2146      $skipped = $read - ($count_current - $count_before);
2147   }
2148
2149   my $result = {
2150      read => $read,
2151      imported => $imported,
2152      skipped => $skipped,
2153      previous_count => $count_before,
2154      current_count => $count_current,
2155      complete => $complete,
2156      duration => $duration,
2157      eps => $eps,
2158   };
2159
2160   # Say the file has been processed, and put resulting stats.
2161   $fd->write($result, $done) or return;
2162
2163   # Restore previous settings, if any
2164   if (defined($refresh_interval)) {
2165      $self->set_index_refresh_interval($index, $refresh_interval);
2166   }
2167   if (defined($number_of_replicas) && $self->use_indexing_optimizations) {
2168      $self->set_index_number_of_replicas($index, $number_of_replicas);
2169   }
2170
2171   return $result;
2172}
2173
2174#
2175# http://localhost:9200/_nodes/stats/process?pretty
2176#
2177# Search::Elasticsearch::Client::2_0::Direct::Nodes
2178#
2179sub get_stats_process {
2180   my $self = shift;
2181
2182   my $es = $self->_es;
2183   $self->brik_help_run_undef_arg('open', $es) or return;
2184
2185   my $r;
2186   eval {
2187      $r = $es->nodes->stats(
2188         metric => [ qw(process) ],
2189      );
2190   };
2191   if ($@) {
2192      chomp($@);
2193      return $self->log->error("get_stats_process: failed: [$@]");
2194   }
2195
2196   return $r;
2197}
2198
2199#
2200# curl http://localhost:9200/_nodes/process?pretty
2201#
2202# Search::Elasticsearch::Client::2_0::Direct::Nodes
2203#
2204sub get_process {
2205   my $self = shift;
2206
2207   my $es = $self->_es;
2208   $self->brik_help_run_undef_arg('open', $es) or return;
2209
2210   my $r;
2211   eval {
2212      $r = $es->nodes->info(
2213         metric => [ qw(process) ],
2214      );
2215   };
2216   if ($@) {
2217      chomp($@);
2218      return $self->log->error("get_process: failed: [$@]");
2219   }
2220
2221   return $r;
2222}
2223
2224#
2225# Search::Elasticsearch::Client::2_0::Direct::Cluster
2226#
2227sub get_cluster_state {
2228   my $self = shift;
2229
2230   my $es = $self->_es;
2231   $self->brik_help_run_undef_arg('open', $es) or return;
2232
2233   my $r;
2234   eval {
2235      $r = $es->cluster->state;
2236   };
2237   if ($@) {
2238      chomp($@);
2239      return $self->log->error("get_cluster_state: failed: [$@]");
2240   }
2241
2242   return $r;
2243}
2244
2245#
2246# Search::Elasticsearch::Client::2_0::Direct::Cluster
2247#
2248sub get_cluster_health {
2249   my $self = shift;
2250
2251   my $es = $self->_es;
2252   $self->brik_help_run_undef_arg('open', $es) or return;
2253
2254   my $r;
2255   eval {
2256      $r = $es->cluster->health;
2257   };
2258   if ($@) {
2259      chomp($@);
2260      return $self->log->error("get_cluster_health: failed: [$@]");
2261   }
2262
2263   return $r;
2264}
2265
2266#
2267# Search::Elasticsearch::Client::2_0::Direct::Cluster
2268#
2269sub get_cluster_settings {
2270   my $self = shift;
2271
2272   my $es = $self->_es;
2273   $self->brik_help_run_undef_arg('open', $es) or return;
2274
2275   my $r;
2276   eval {
2277      $r = $es->cluster->get_settings;
2278   };
2279   if ($@) {
2280      chomp($@);
2281      return $self->log->error("get_cluster_settings: failed: [$@]");
2282   }
2283
2284   return $r;
2285}
2286
2287#
2288# Search::Elasticsearch::Client::2_0::Direct::Cluster
2289#
2290sub put_cluster_settings {
2291   my $self = shift;
2292   my ($settings) = @_;
2293
2294   my $es = $self->_es;
2295   $self->brik_help_run_undef_arg('open', $es) or return;
2296   $self->brik_help_run_undef_arg('put_cluster_settings', $settings) or return;
2297   $self->brik_help_run_invalid_arg('put_cluster_settings', $settings, 'HASH') or return;
2298
2299   my %args = (
2300      body => $settings,
2301   );
2302
2303   my $r;
2304   eval {
2305      $r = $es->cluster->put_settings(%args);
2306   };
2307   if ($@) {
2308      chomp($@);
2309      return $self->log->error("put_cluster_settings: failed: [$@]");
2310   }
2311
2312   return $r;
2313}
2314
2315sub count_green_indices {
2316   my $self = shift;
2317
2318   my $get = $self->show_indices or return;
2319
2320   my $count = 0;
2321   for (@$get) {
2322      if (/^\s*green\s+/) {
2323         $count++;
2324      }
2325   }
2326
2327   return $count;
2328}
2329
2330sub count_yellow_indices {
2331   my $self = shift;
2332
2333   my $get = $self->show_indices or return;
2334
2335   my $count = 0;
2336   for (@$get) {
2337      if (/^\s*yellow\s+/) {
2338         $count++;
2339      }
2340   }
2341
2342   return $count;
2343}
2344
2345sub count_red_indices {
2346   my $self = shift;
2347
2348   my $get = $self->show_indices or return;
2349
2350   my $count = 0;
2351   for (@$get) {
2352      if (/^\s*red\s+/) {
2353         $count++;
2354      }
2355   }
2356
2357   return $count;
2358}
2359
2360sub count_indices {
2361   my $self = shift;
2362
2363   my $get = $self->show_indices or return;
2364
2365   return scalar @$get;
2366}
2367
2368sub list_indices_status {
2369   my $self = shift;
2370
2371   my $get = $self->show_indices or return;
2372
2373   my $count_red = 0;
2374   my $count_yellow = 0;
2375   my $count_green = 0;
2376   for (@$get) {
2377      if (/^\s*red\s+/) {
2378         $count_red++;
2379      }
2380      elsif (/^\s*yellow\s+/) {
2381         $count_yellow++;
2382      }
2383      elsif (/^\s*green\s+/) {
2384         $count_green++;
2385      }
2386   }
2387
2388   return {
2389      red => $count_red,
2390      yellow => $count_yellow,
2391      green => $count_green,
2392   };
2393}
2394
2395sub count_shards {
2396   my $self = shift;
2397
2398   my $indices = $self->get_indices or return;
2399
2400   my $count = 0;
2401   for (@$indices) {
2402      $count += $_->{shards};
2403   }
2404
2405   return $count;
2406}
2407
2408sub count_size {
2409   my $self = shift;
2410
2411   my $indices = $self->get_indices or return;
2412
2413   my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
2414   $fn->kibi_suffix("kb");
2415   $fn->mebi_suffix("mb");
2416   $fn->gibi_suffix("gb");
2417   $fn->kilo_suffix("KB");
2418   $fn->mega_suffix("MB");
2419   $fn->giga_suffix("GB");
2420
2421   my $size = 0;
2422   for (@$indices) {
2423      $size += $fn->to_number($_->{size});
2424   }
2425
2426   return $fn->from_number($size);
2427}
2428
2429sub count_total_size {
2430   my $self = shift;
2431
2432   my $indices = $self->get_indices or return;
2433
2434   my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
2435   $fn->kibi_suffix("kb");
2436   $fn->mebi_suffix("mb");
2437   $fn->gibi_suffix("gb");
2438   $fn->kilo_suffix("KB");
2439   $fn->mega_suffix("MB");
2440   $fn->giga_suffix("GB");
2441
2442   my $size = 0;
2443   for (@$indices) {
2444      $size += $fn->to_number($_->{total_size});
2445   }
2446
2447   return $fn->from_number($size);
2448}
2449
2450sub count_count {
2451   my $self = shift;
2452
2453   my $indices = $self->get_indices or return;
2454
2455   my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
2456   $fn->kilo_suffix('k');
2457   $fn->mega_suffix('m');
2458   $fn->giga_suffix('M');
2459
2460   my $count = 0;
2461   for (@$indices) {
2462      $count += $_->{count};
2463   }
2464
2465   return $fn->from_number($count);
2466}
2467
2468sub list_green_indices {
2469   my $self = shift;
2470
2471   my $get = $self->get_indices or return;
2472
2473   my @indices = ();
2474   for (@$get) {
2475      if ($_->{color} eq 'green') {
2476         push @indices, $_->{index};
2477      }
2478   }
2479
2480   return \@indices;
2481}
2482
2483sub list_yellow_indices {
2484   my $self = shift;
2485
2486   my $get = $self->get_indices or return;
2487
2488   my @indices = ();
2489   for (@$get) {
2490      if ($_->{color} eq 'yellow') {
2491         push @indices, $_->{index};
2492      }
2493   }
2494
2495   return \@indices;
2496}
2497
2498sub list_red_indices {
2499   my $self = shift;
2500
2501   my $get = $self->get_indices or return;
2502
2503   my @indices = ();
2504   for (@$get) {
2505      if ($_->{color} eq 'red') {
2506         push @indices, $_->{index};
2507      }
2508   }
2509
2510   return \@indices;
2511}
2512
2513#
2514# https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html
2515#
2516sub list_datatypes {
2517   my $self = shift;
2518
2519   return {
2520      core => [ qw(string long integer short byte double float data boolean binary) ],
2521   };
2522}
2523
2524#
2525# Return total hits for last www_search
2526#
2527sub get_hits_total {
2528   my $self = shift;
2529
2530   # Retrieve data stored in the $RUN Variable from Context
2531   my $run = $self->context->do('$RUN');
2532   if (ref($run) eq 'HASH') {
2533      if (exists($run->{hits}) && exists($run->{hits}{total})) {
2534         return $run->{hits}{total};
2535      }
2536   }
2537
2538   return $self->log->error("get_hits_total: last Command not compatible");
2539}
2540
2541sub disable_shard_allocation {
2542   my $self = shift;
2543
2544   my $settings = {
2545      persistent => {
2546         'cluster.routing.allocation.enable' => 'none',
2547      }
2548   };
2549
2550   return $self->put_cluster_settings($settings);
2551}
2552
2553sub enable_shard_allocation {
2554   my $self = shift;
2555
2556   my $settings = {
2557      persistent => { 
2558         'cluster.routing.allocation.enable' => 'all',
2559      }
2560   };
2561
2562   return $self->put_cluster_settings($settings);
2563}
2564
2565sub flush_synced {
2566   my $self = shift;
2567
2568   my $es = $self->_es;
2569   $self->brik_help_run_undef_arg('open', $es) or return;
2570
2571   my $r;
2572   eval {
2573      $r = $es->indices->flush_synced;
2574   };
2575   if ($@) {
2576      chomp($@);
2577      return $self->log->error("flush_synced: failed: [$@]");
2578   }
2579
2580   return $r;
2581}
2582
2583#
2584# https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html
2585#
2586# run client::elasticsearch create_snapshot_repository myrepo
2587#      "{ type => 'fs', settings => { compress => 'true', location => '/path/' } }"
2588#
2589# You have to set path.repo in elasticsearch.yml like:
2590# path.repo: ["/home/gomor/es-backups"]
2591#
2592# Search::Elasticsearch::Client::2_0::Direct::Snapshot
2593#
2594sub create_snapshot_repository {
2595   my $self = shift;
2596   my ($body, $repository_name) = @_;
2597
2598   my $es = $self->_es;
2599   $self->brik_help_run_undef_arg('open', $es) or return;
2600   $self->brik_help_run_undef_arg('create_snapshot_repository', $body) or return;
2601
2602   $repository_name ||= 'repository';
2603
2604   my %args = (
2605      repository => $repository_name,
2606      body => $body,
2607   );
2608
2609   my $r;
2610   eval {
2611      $r = $es->snapshot->create_repository(%args);
2612   };
2613   if ($@) {
2614      chomp($@);
2615      return $self->log->error("create_snapshot_repository: failed: [$@]");
2616   }
2617
2618   return $r;
2619}
2620
2621sub create_shared_fs_snapshot_repository {
2622   my $self = shift;
2623   my ($location, $repository_name) = @_;
2624
2625   $repository_name ||= 'repository';
2626   $self->brik_help_run_undef_arg('create_shared_fs_snapshot_repository', $location) or return;
2627
2628   if ($location !~ m{^/}) {
2629      return $self->log->error("create_shared_fs_snapshot_repository: you have to give ".
2630         "a full directory path, this one is invalid [$location]");
2631   }
2632
2633   my $body = {
2634      type => 'fs',
2635      settings => {
2636         compress => 'true',
2637         location => $location,
2638      },
2639   };
2640
2641   return $self->create_snapshot_repository($body, $repository_name);
2642}
2643
2644#
2645# Search::Elasticsearch::Client::2_0::Direct::Snapshot
2646#
2647sub get_snapshot_repositories {
2648   my $self = shift;
2649
2650   my $es = $self->_es;
2651   $self->brik_help_run_undef_arg('open', $es) or return;
2652
2653   my $r;
2654   eval {
2655      $r = $es->snapshot->get_repository;
2656   };
2657   if ($@) {
2658      chomp($@);
2659      return $self->log->error("get_snapshot_repositories: failed: [$@]");
2660   }
2661
2662   return $r;
2663}
2664
2665#
2666# Search::Elasticsearch::Client::2_0::Direct::Snapshot
2667#
2668sub get_snapshot_status {
2669   my $self = shift;
2670
2671   my $es = $self->_es;
2672   $self->brik_help_run_undef_arg('open', $es) or return;
2673
2674   my $r;
2675   eval {
2676      $r = $es->snapshot->status;
2677   };
2678   if ($@) {
2679      chomp($@);
2680      return $self->log->error("get_snapshot_status: failed: [$@]");
2681   }
2682
2683   return $r;
2684}
2685
2686#
2687# Search::Elasticsearch::Client::5_0::Direct::Snapshot
2688#
2689sub create_snapshot {
2690   my $self = shift;
2691   my ($snapshot_name, $repository_name, $body) = @_;
2692
2693   my $es = $self->_es;
2694   $self->brik_help_run_undef_arg('open', $es) or return;
2695
2696   $snapshot_name ||= 'snapshot';
2697   $repository_name ||= 'repository';
2698
2699   my %args = (
2700      repository => $repository_name,
2701      snapshot => $snapshot_name,
2702   );
2703   if (defined($body)) {
2704      $args{body} = $body;
2705   }
2706
2707   my $r;
2708   eval {
2709      $r = $es->snapshot->create(%args);
2710   };
2711   if ($@) {
2712      chomp($@);
2713      return $self->log->error("create_snapshot: failed: [$@]");
2714   }
2715
2716   return $r;
2717}
2718
2719sub create_snapshot_for_indices {
2720   my $self = shift;
2721   my ($indices, $snapshot_name, $repository_name) = @_;
2722
2723   $self->brik_help_run_undef_arg('create_snapshot_for_indices', $indices) or return;
2724
2725   $snapshot_name ||= 'snapshot';
2726   $repository_name ||= 'repository';
2727
2728   my $body = {
2729      indices => $indices,
2730   };
2731
2732   return $self->create_snapshot($snapshot_name, $repository_name, $body);
2733}
2734
2735sub is_snapshot_finished {
2736   my $self = shift;
2737
2738   my $status = $self->get_snapshot_status or return;
2739
2740   if (@{$status->{snapshots}} == 0) {
2741      return 1;
2742   }
2743
2744   return 0;
2745}
2746
2747sub get_snapshot_state {
2748   my $self = shift;
2749
2750   if ($self->is_snapshot_finished) {
2751      return $self->log->info("get_snapshot_state: is already finished");
2752   }
2753
2754   my $status = $self->get_snapshot_status or return;
2755
2756   my @indices_done = ();
2757   my @indices_not_done = ();
2758
2759   my $list = $status->{snapshots};
2760   for my $snapshot (@$list) {
2761      my $indices = $snapshot->{indices};
2762      for my $index (@$indices) {
2763         my $done = $index->{shards_stats}{done};
2764         if ($done) {
2765            push @indices_done, $index;
2766         }
2767         else {
2768            push @indices_not_done, $index;
2769         }
2770      }
2771   }
2772
2773   return { done => \@indices_done, not_done => \@indices_not_done };
2774}
2775
2776sub verify_snapshot_repository {
2777}
2778
2779sub delete_snapshot_repository {
2780   my $self = shift;
2781   my ($repository_name) = @_;
2782
2783   my $es = $self->_es;
2784   $self->brik_help_run_undef_arg('open', $es) or return;
2785   $self->brik_help_run_undef_arg('delete_snapshot_repository', $repository_name) or return;
2786
2787   my $r;
2788   eval {
2789      $r = $es->snapshot->delete_repository(
2790         repository => $repository_name,
2791      );
2792   };
2793   if ($@) {
2794      chomp($@);
2795      return $self->log->error("delete_snapshot_repository: failed: [$@]");
2796   }
2797
2798   return $r;
2799}
2800
2801sub get_snapshot {
2802   my $self = shift;
2803   my ($snapshot_name, $repository_name) = @_;
2804
2805   my $es = $self->_es;
2806   $self->brik_help_run_undef_arg('open', $es) or return;
2807
2808   $snapshot_name ||= 'snapshot';
2809   $repository_name ||= 'repository';
2810
2811   my $r;
2812   eval {
2813      $r = $es->snapshot->get(
2814         repository => $repository_name,
2815         snapshot => $snapshot_name,
2816      );
2817   };
2818   if ($@) {
2819      chomp($@);
2820      return $self->log->error("get_snapshot: failed: [$@]");
2821   }
2822
2823   return $r;
2824}
2825
2826#
2827# Search::Elasticsearch::Client::5_0::Direct::Snapshot
2828#
2829sub delete_snapshot {
2830   my $self = shift;
2831   my ($snapshot_name, $repository_name) = @_;
2832
2833   my $es = $self->_es;
2834   $self->brik_help_run_undef_arg('open', $es) or return;
2835   $self->brik_help_run_undef_arg('delete_snapshot', $snapshot_name) or return;
2836   $self->brik_help_run_undef_arg('delete_snapshot', $repository_name) or return;
2837
2838   my $timeout = $self->rtimeout;
2839
2840   my $r;
2841   eval {
2842      $r = $es->snapshot->delete(
2843         repository => $repository_name,
2844         snapshot => $snapshot_name,
2845         master_timeout => "${timeout}s",
2846      );
2847   };
2848   if ($@) {
2849      chomp($@);
2850      return $self->log->error("delete_snapshot: failed: [$@]");
2851   }
2852
2853   return $r;
2854}
2855
2856#
2857# https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html
2858#
2859sub restore_snapshot {
2860   my $self = shift;
2861   my ($snapshot_name, $repository_name, $body) = @_;
2862
2863   my $es = $self->_es;
2864   $snapshot_name ||= 'snapshot';
2865   $repository_name ||= 'repository';
2866   $self->brik_help_run_undef_arg('open', $es) or return;
2867   $self->brik_help_run_undef_arg('restore_snapshot', $snapshot_name) or return;
2868   $self->brik_help_run_undef_arg('restore_snapshot', $repository_name) or return;
2869
2870   my %args = (
2871      repository => $repository_name,
2872      snapshot => $snapshot_name,
2873   );
2874   if (defined($body)) {
2875      $args{body} = $body;
2876   }
2877
2878   my $r;
2879   eval {
2880      $r = $es->snapshot->restore(%args);
2881   };
2882   if ($@) {
2883      chomp($@);
2884      return $self->log->error("restore_snapshot: failed: [$@]");
2885   }
2886
2887   return $r;
2888}
2889
2890sub restore_snapshot_for_indices {
2891   my $self = shift;
2892   my ($indices, $snapshot_name, $repository_name) = @_;
2893
2894   $snapshot_name ||= 'snapshot';
2895   $repository_name ||= 'repository';
2896   $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $indices) or return;
2897   $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $snapshot_name) or return;
2898   $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $repository_name) or return;
2899
2900   my $body = {
2901      indices => $indices,
2902   };
2903
2904   return $self->restore_snapshot($snapshot_name, $repository_name, $body);
2905}
2906
29071;
2908
2909__END__
2910
2911=head1 NAME
2912
2913Metabrik::Client::Elasticsearch - client::elasticsearch Brik
2914
2915=head1 SYNOPSIS
2916
2917   host:~> my $q = { term => { ip => "192.168.57.19" } }
2918   host:~> run client::elasticsearch open
2919   host:~> run client::elasticsearch query $q data-*
2920
2921=head1 DESCRIPTION
2922
2923Template to write a new Metabrik Brik.
2924
2925=head1 COPYRIGHT AND LICENSE
2926
2927Copyright (c) 2014-2017, Patrice E<lt>GomoRE<gt> Auffret
2928
2929You may distribute this module under the terms of The BSD 3-Clause License.
2930See LICENSE file in the source distribution archive.
2931
2932=head1 AUTHOR
2933
2934Patrice E<lt>GomoRE<gt> Auffret
2935
2936=cut
Note: See TracBrowser for help on using the repository browser.