source: repository/lib/Metabrik/Client/Elasticsearch.pm

tip
Last change on this file was 909:a1a68453d514, checked in by GomoR <gomor@…>, 8 days ago
  • new: file::csv: support for object encoded fields (base64 encoding/decoding) through object_fields Attribute
  • update: client::elasticsearch: use object_fields logic to handle base64 encoding/decoding in import/export
File size: 70.7 KB
Line 
1#
2# $Id$
3#
4# client::elasticsearch Brik
5#
6package Metabrik::Client::Elasticsearch;
7use strict;
8use warnings;
9
10use base qw(Metabrik::Client::Rest);
11
12sub brik_properties {
13   return {
14      revision => '$Revision$',
15      tags => [ qw(unstable es es) ],
16      author => 'GomoR <GomoR[at]metabrik.org>',
17      license => 'http://opensource.org/licenses/BSD-3-Clause',
18      attributes => {
19         nodes => [ qw(node_list) ],
20         cxn_pool => [ qw(Sniff|Static|Static::NoPing) ],
21         date => [ qw(date) ],
22         index => [ qw(index) ],
23         type => [ qw(type) ],
24         from => [ qw(number) ],
25         size => [ qw(count) ],
26         max => [ qw(count) ],
27         max_flush_count => [ qw(count) ],
28         max_flush_size => [ qw(count) ],
29         rtimeout => [ qw(seconds) ],
30         sniff_rtimeout => [ qw(seconds) ],
31         try => [ qw(count) ],
32         use_bulk_autoflush => [ qw(0|1) ],
33         use_indexing_optimizations => [ qw(0|1) ],
34         csv_encoded_fields => [ qw(fields) ],
35         csv_object_fields => [ qw(fields) ],
36         _es => [ qw(INTERNAL) ],
37         _bulk => [ qw(INTERNAL) ],
38         _scroll => [ qw(INTERNAL) ],
39      },
40      attributes_default => {
41         nodes => [ qw(http://localhost:9200) ],
42         cxn_pool => 'Sniff',
43         from => 0,
44         size => 10,
45         max => 0,
46         index => '*',
47         type => '*',
48         rtimeout => 60,
49         sniff_rtimeout => 3,
50         try => 3,
51         max_flush_count => 1_000,
52         max_flush_size => 1_000_000,
53         use_bulk_autoflush => 1,
54         use_indexing_optimizations => 0,
55      },
56      commands => {
57         open => [ qw(nodes_list|OPTIONAL cxn_pool|OPTIONAL) ],
58         open_bulk_mode => [ qw(index|OPTIONAL type|OPTIONAL) ],
59         open_scroll_scan_mode => [ qw(index|OPTIONAL size|OPTIONAL) ],
60         open_scroll => [ qw(index|OPTIONAL size|OPTIONAL) ],
61         close_scroll => [ ],
62         total_scroll => [ ],
63         next_scroll => [ ],
64         index_document => [ qw(document index|OPTIONAL type|OPTIONAL id|OPTIONAL) ],
65         index_bulk => [ qw(document index|OPTIONAL type|OPTIONAL id|OPTIONAL) ],
66         bulk_flush => [ ],
67         query => [ qw($query_hash index|OPTIONAL type|OPTIONAL) ],
68         count => [ qw(index|OPTIONAL type|OPTIONAL) ],
69         get_from_id => [ qw(id index|OPTIONAL type|OPTIONAL) ],
70         www_search => [ qw(query index|OPTIONAL type|OPTIONAL) ],
71         delete_index => [ qw(index|indices_list) ],
72         update_alias => [ qw(new_index alias) ],
73         delete_document => [ qw(index type id) ],
74         show_indices => [ qw(string_filter|OPTIONAL) ],
75         show_nodes => [ ],
76         show_health => [ ],
77         show_recovery => [ ],
78         list_indices => [ ],
79         get_indices => [ ],
80         get_index => [ qw(index|indices_list) ],
81         list_index_types => [ qw(index) ],
82         list_index_fields => [ qw(index) ],
83         list_indices_version => [ qw(index|indices_list) ],
84         open_index => [ qw(index|indices_list) ],
85         close_index => [ qw(index|indices_list) ],
86         get_aliases => [ qw(index) ],
87         put_alias => [ qw(index alias) ],
88         delete_alias => [ qw(index alias) ],
89         get_mappings => [ qw(index type|OPTIONAL) ],
90         create_index => [ qw(index) ],
91         create_index_with_mappings => [ qw(index mappings) ],
92         info => [ qw(nodes_list|OPTIONAL) ],
93         version => [ qw(nodes_list|OPTIONAL) ],
94         get_templates => [ ],
95         list_templates => [ ],
96         get_template => [ qw(name) ],
97         put_template => [ qw(name template) ],
98         put_template_from_json_file => [ qw(file) ],
99         get_settings => [ qw(index|indices_list|OPTIONAL name|names_list|OPTIONAL) ],
100         put_settings => [ qw(settings_hash index|indices_list|OPTIONAL) ],
101         set_index_number_of_replicas => [ qw(index|indices_list number) ],
102         set_index_refresh_interval => [ qw(index|indices_list number) ],
103         get_index_number_of_replicas => [ qw(index|indices) ],
104         get_index_refresh_interval => [ qw(index|indices_list) ],
105         get_index_number_of_shards => [ qw(index|indices_list) ],
106         delete_template => [ qw(name) ],
107         is_index_exists => [ qw(index) ],
108         is_type_exists => [ qw(index type) ],
109         is_document_exists => [ qw(index type document) ],
110         parse_error_string => [ qw(string) ],
111         refresh_index => [ qw(index) ],
112         export_as_csv => [ qw(index size|OPTIONAL) ],
113         import_from_csv => [ qw(input_csv index|OPTIONAL type|OPTIONAL size|OPTIONAL) ],
114         get_stats_process => [ ],
115         get_process => [ ],
116         get_cluster_state => [ ],
117         get_cluster_health => [ ],
118         get_cluster_settings => [ ],
119         put_cluster_settings => [ qw(settings) ],
120         count_green_indices => [ ],
121         count_yellow_indices => [ ],
122         count_red_indices => [ ],
123         list_green_indices => [ ],
124         list_yellow_indices => [ ],
125         list_red_indices => [ ],
126         count_indices => [ ],
127         list_indices_status => [ ],
128         count_shards => [ ],
129         count_size => [ ],
130         count_total_size => [ ],
131         count_count => [ ],
132         list_datatypes => [ ],
133         get_hits_total => [ ],
134         disable_shard_allocation => [ ],
135         enable_shard_allocation => [ ],
136         flush_synced => [ ],
137         create_snapshot_repository => [ qw(body repository_name|OPTIONAL) ],
138         create_shared_fs_snapshot_repository => [ qw(location
139            repository_name|OPTIONAL) ],
140         get_snapshot_repositories => [ ],
141         get_snapshot_status => [ ],
142         delete_snapshot_repository => [ qw(repository_name) ],
143         create_snapshot => [ qw(snapshot_name|OPTIONAL repository_name|OPTIONAL
144            body|OPTIONAL) ],
145         create_snapshot_for_indices => [ qw(indices snapshot_name|OPTIONAL
146            repository_name|OPTIONAL) ],
147         is_snapshot_finished => [ ],
148         get_snapshot_state => [ ],
149         get_snapshot => [ qw(snapshot_name|OPTIONAL repository_name|OPTIONAL) ],
150         delete_snapshot => [ qw(snapshot_name repository_name) ],
151         restore_snapshot => [ qw(snapshot_name repository_name body|OPTIONAL) ],
152         restore_snapshot_for_indices => [ qw(indices snapshot_name repository_name) ],
153      },
154      require_modules => {
155         'Metabrik::String::Json' => [ ],
156         'Metabrik::File::Csv' => [ ],
157         'Metabrik::File::Json' => [ ],
158         'Metabrik::File::Dump' => [ ],
159         'Metabrik::Format::Number' => [ ],
160         'Search::Elasticsearch' => [ ],
161      },
162   };
163}
164
165sub brik_preinit {
166   my $self = shift;
167
168   eval("use Search::Elasticsearch;");
169   if ($Search::Elasticsearch::VERSION < 5) {
170      $self->log->error("brik_preinit: please upgrade Search::Elasticsearch module ".
171         "with: run perl::module install Search::Elasticsearch");
172   }
173
174   return $self->SUPER::brik_preinit;
175}
176
177sub open {
178   my $self = shift;
179   my ($nodes, $cxn_pool) = @_;
180
181   $nodes ||= $self->nodes;
182   $cxn_pool ||= $self->cxn_pool;
183   $self->brik_help_run_undef_arg('open', $nodes) or return;
184   $self->brik_help_run_undef_arg('open', $cxn_pool) or return;
185   $self->brik_help_run_invalid_arg('open', $nodes, 'ARRAY') or return;
186   $self->brik_help_run_empty_array_arg('open', $nodes) or return;
187
188   for my $node (@$nodes) {
189      if ($node !~ m{https?://}) {
190         return $self->log->error("open: invalid node[$node], must start with http(s)");
191      }
192   }
193
194   my $timeout = $self->rtimeout;
195
196   my $nodes_str = join('|', @$nodes);
197   $self->log->verbose("open: using nodes [$nodes_str]");
198
199   #
200   # Timeout description here:
201   #
202   # Search::Elasticsearch::Role::Cxn
203   #
204
205   my $es = Search::Elasticsearch->new(
206      nodes => $nodes,
207      cxn_pool => $cxn_pool,
208      timeout => $timeout,
209      max_retries => $self->try,
210      retry_on_timeout => 1,
211      sniff_timeout => $self->sniff_rtimeout, # seconds, default 1
212      request_timeout => 60,  # seconds, default 30
213      ping_timeout => 5,  # seconds, default 2
214      dead_timeout => 120,  # seconds, detault 60
215      max_dead_timeout => 3600,  # seconds, default 3600
216      sniff_request_timeout => 15, # seconds, default 2
217   );
218   if (! defined($es)) {
219      return $self->log->error("open: failed");
220   }
221
222   $self->_es($es);
223
224   return $nodes;
225}
226
227#
228# Search::Elasticsearch::Client::5_0::Bulk
229#
230sub open_bulk_mode {
231   my $self = shift;
232   my ($index, $type) = @_;
233
234   $index ||= $self->index;
235   $type ||= $self->type;
236   my $es = $self->_es;
237   $self->brik_help_run_undef_arg('open', $es) or return;
238   $self->brik_help_run_undef_arg('open_bulk_mode', $index) or return;
239   $self->brik_help_run_undef_arg('open_bulk_mode', $type) or return;
240
241   my %args = (
242      index => $index,
243      type => $type,
244   );
245
246   if ($self->use_bulk_autoflush) {
247      my $max_count = $self->max_flush_count || 1_000;
248      my $max_size = $self->max_flush_size || 1_000_000;
249
250      $args{max_count} = $max_count;
251      $args{max_size} = $max_size;
252      $args{max_time} = 0;
253
254      $self->log->info("open_bulk_mode: opening with max_flush_count [$max_count] and ".
255         "max_flush_size [$max_size]");
256   }
257   else {
258      $args{max_count} = 0;
259      $args{max_size} = 0;
260      $args{max_time} = 0;
261      $args{on_error} = undef;
262      #$args{on_success} = sub {
263         #my ($action, $response, $i) = @_;
264      #};
265
266      $self->log->info("open_bulk_mode: opening without automatic flushing");
267   }
268
269   my $bulk = $es->bulk_helper(%args);
270   if (! defined($bulk)) {
271      return $self->log->error("open_bulk_mode: failed");
272   }
273
274   $self->_bulk($bulk);
275
276   return $self->nodes;
277}
278
279sub open_scroll_scan_mode {
280   my $self = shift;
281   my ($index, $size) = @_;
282
283   my $version = $self->version or return;
284   if ($version ge "5.0.0") {
285      return $self->log->error("open_scroll_scan_mode: Command not supported for ES version ".
286         "$version, try open_scroll Command instead");
287   }
288
289   $index ||= $self->index;
290   $size ||= $self->size;
291   my $es = $self->_es;
292   $self->brik_help_run_undef_arg('open', $es) or return;
293   $self->brik_help_run_undef_arg('open_scroll_scan_mode', $index) or return;
294   $self->brik_help_run_undef_arg('open_scroll_scan_mode', $size) or return;
295
296   my $scroll = $es->scroll_helper(
297      index => $index,
298      search_type => 'scan',
299      size => $size,
300   );
301   if (! defined($scroll)) {
302      return $self->log->error("open_scroll_scan_mode: failed");
303   }
304
305   $self->_scroll($scroll);
306
307   return $self->nodes;
308}
309
310#
311# Search::Elasticsearch::Client::5_0::Scroll
312#
313sub open_scroll {
314   my $self = shift;
315   my ($index, $size) = @_;
316
317   my $version = $self->version or return;
318   if ($version lt "5.0.0") {
319      return $self->log->error("open_scroll: Command not supported for ES version ".
320         "$version, try open_scroll_scan_mode Command instead");
321   }
322
323   $index ||= $self->index;
324   $size ||= $self->size;
325   my $es = $self->_es;
326   $self->brik_help_run_undef_arg('open', $es) or return;
327   $self->brik_help_run_undef_arg('open_scroll', $index) or return;
328   $self->brik_help_run_undef_arg('open_scroll', $size) or return;
329
330   my $timeout = $self->rtimeout;
331
332   #
333   # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html
334   #
335   my $scroll = $es->scroll_helper(
336      scroll => "${timeout}s",
337      scroll_in_qs => 1,  # By default (0), pass scroll_id in request body. When 1, pass
338                          # it in query string.
339      index => $index,
340      size => $size,
341      body => {
342         sort => [ qw(_doc) ],
343      },
344   );
345   if (! defined($scroll)) {
346      return $self->log->error("open_scroll: failed");
347   }
348
349   $self->_scroll($scroll);
350
351   $self->log->info("open_scroll: opened with size [$size] and timeout [${timeout}s]");
352
353   return $self->nodes;
354}
355
356#
357# Search::Elasticsearch::Client::5_0::Scroll
358#
359sub close_scroll {
360   my $self = shift;
361
362   my $scroll = $self->_scroll;
363   if (! defined($scroll)) {
364      return 1;
365   }
366
367   $scroll->finish;
368   $self->_scroll(undef);
369
370   return 1;
371}
372
373sub total_scroll {
374   my $self = shift;
375
376   my $scroll = $self->_scroll;
377   $self->brik_help_run_undef_arg('open_scroll', $scroll) or return;
378
379   my $total;
380   eval {
381      $total = $scroll->total;
382   };
383   if ($@) {
384      chomp($@);
385      return $self->log->error("total_scroll: failed with: [$@]");
386   }
387
388   return $total;
389}
390
391sub next_scroll {
392   my $self = shift;
393
394   my $scroll = $self->_scroll;
395   $self->brik_help_run_undef_arg('open_scroll', $scroll) or return;
396
397   my $next;
398   eval {
399      $next = $scroll->next;
400   };
401   if ($@) {
402      chomp($@);
403      return $self->log->error("next_scroll: failed with: [$@]");
404   }
405
406   return $next;
407}
408
409sub index_document {
410   my $self = shift;
411   my ($doc, $index, $type, $id) = @_;
412
413   $index ||= $self->index;
414   $type ||= $self->type;
415   my $es = $self->_es;
416   $self->brik_help_run_undef_arg('open', $es) or return;
417   $self->brik_help_run_undef_arg('index_document', $doc) or return;
418   $self->brik_help_run_invalid_arg('index_document', $doc, 'HASH') or return;
419   $self->brik_help_set_undef_arg('index', $index) or return;
420   $self->brik_help_set_undef_arg('type', $type) or return;
421
422   my %args = (
423      index => $index,
424      type => $type,
425      body => $doc,
426   );
427   if (defined($id)) {
428      $args{id} = $id;
429   }
430
431   my $r;
432   eval {
433      $r = $es->index(%args);
434   };
435   if ($@) {
436      chomp($@);
437      return $self->log->error("index_document: index failed for index [$index]: [$@]");
438   }
439
440   return $r;
441}
442
443#
444# Search::Elasticsearch::Client::5_0::Bulk
445#
446sub index_bulk {
447   my $self = shift;
448   my ($doc, $index, $type, $id) = @_;
449
450   my $bulk = $self->_bulk;
451   $index ||= $self->index;
452   $type ||= $self->type;
453   $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
454   $self->brik_help_run_undef_arg('index_bulk', $doc) or return;
455   $self->brik_help_set_undef_arg('index', $index) or return;
456   $self->brik_help_set_undef_arg('type', $type) or return;
457
458   my %args = (
459      source => $doc,
460   );
461   if (defined($id)) {
462      $args{id} = $id;
463   }
464
465   my $r;
466   eval {
467      #$r = $bulk->index(\%args);
468      $r = $bulk->add_action(index => \%args);
469   };
470   if ($@) {
471      chomp($@);
472      my $p = $self->parse_error_string($@);
473      if (defined($p) && exists($p->{class})) {
474         my $class = $p->{class};
475         my $code = $p->{code};
476         my $node = $p->{node};
477         return $self->log->error("index_bulk: failed for index [$index] with error ".
478            "[$class] code [$code] for node [$node]");
479      }
480      else {
481         return $self->log->error("index_bulk: index failed for index [$index]: [$@]");
482      }
483   }
484
485   return $r;
486}
487
488sub bulk_flush {
489   my $self = shift;
490
491   my $bulk = $self->_bulk;
492   $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
493
494   my $try = $self->try;
495
496RETRY:
497
498   my $r;
499   eval {
500      $r = $bulk->flush;
501   };
502   if ($@) {
503      if (--$try == 0) {
504         chomp($@);
505         my $p = $self->parse_error_string($@);
506         if (defined($p) && exists($p->{class})) {
507            my $class = $p->{class};
508            my $code = $p->{code};
509            my $node = $p->{node};
510            return $self->log->error("bulk_flush: failed after [$try] tries with error ".
511               "[$class] code [$code] for node [$node]");
512         }
513         else {
514            return $self->log->error("bulk_flush: failed after [$try]: [$@]");
515         }
516      }
517      sleep 60;
518      goto RETRY;
519   }
520
521   return $r;
522}
523
524#
525# Search::Elasticsearch::Client::2_0::Direct
526# Search::Elasticsearch::Client::5_0::Direct
527#
528sub count {
529   my $self = shift;
530   my ($index, $type) = @_;
531
532   $index ||= $self->index;
533   $type ||= $self->type;
534   my $es = $self->_es;
535   $self->brik_help_run_undef_arg('open', $es) or return;
536
537   my %args = ();
538   if (defined($index) && $index ne '*') {
539      $args{index} = $index;
540   }
541   if (defined($type) && $type ne '*') {
542      $args{type} = $type;
543   }
544
545   #$args{body} = {
546      #query => {
547         #match => { title => 'Elasticsearch clients' },
548      #},
549   #}
550
551   my $r;
552   my $version = $self->version or return;
553   if ($version ge "5.0.0") {
554      eval {
555         $r = $es->count(%args);
556      };
557   }
558   else {
559      eval {
560         $r = $es->search(
561            index => $index,
562            type => $type,
563            search_type => 'count',
564            body => {
565               query => {
566                  match_all => {},
567               },
568            },
569         );
570      };
571   }
572   if ($@) {
573      chomp($@);
574      return $self->log->error("count: count failed for index [$index]: [$@]");
575   }
576
577   if ($version ge "5.0.0") {
578      if (exists($r->{count})) {
579         return $r->{count};
580      }
581   }
582   elsif (exists($r->{hits}) && exists($r->{hits}{total})) {
583      return $r->{hits}{total};
584   }
585
586   return $self->log->error("count: nothing found");
587}
588
589#
590# https://www.elastic.co/guide/en/elasticsearch/reference/current/full-text-queries.html
591# https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html
592#
593# Example: my $q = { query => { term => { ip => "192.168.57.19" } } }
594#
595sub query {
596   my $self = shift;
597   my ($query, $index, $type) = @_;
598
599   $index ||= $self->index;
600   $type ||= $self->type;
601   my $es = $self->_es;
602   $self->brik_help_run_undef_arg('open', $es) or return;
603   $self->brik_help_run_undef_arg('query', $query) or return;
604   $self->brik_help_set_undef_arg('index', $index) or return;
605   $self->brik_help_set_undef_arg('type', $type) or return;
606   $self->brik_help_run_invalid_arg('query', $query, 'HASH') or return;
607
608   my $timeout = $self->rtimeout;
609
610   my %args = (
611      index => $index,
612      body => $query,
613   );
614
615   if ($type ne '*') {
616      $args{type} = $type;
617   }
618
619   my $r;
620   eval {
621      $r = $es->search(%args);
622   };
623   if ($@) {
624      chomp($@);
625      return $self->log->error("query: failed for index [$index]: [$@]");
626   }
627
628   return $r;
629}
630
631sub get_from_id {
632   my $self = shift;
633   my ($id, $index, $type) = @_;
634
635   $index ||= $self->index;
636   $type ||= $self->type;
637   my $es = $self->_es;
638   $self->brik_help_run_undef_arg('open', $es) or return;
639   $self->brik_help_run_undef_arg('get_from_id', $id) or return;
640   $self->brik_help_set_undef_arg('index', $index) or return;
641   $self->brik_help_set_undef_arg('type', $type) or return;
642
643   my $r;
644   eval {
645      $r = $es->get(
646         index => $index,
647         type => $type,
648         id => $id,
649      );
650   };
651   if ($@) {
652      chomp($@);
653      return $self->log->error("get_from_id: get failed for index [$index]: [$@]");
654   }
655
656   return $r;
657}
658
659#
660# https://www.elastic.co/guide/en/elasticsearch/reference/current/search-uri-request.html
661#
662sub www_search {
663   my $self = shift;
664   my ($query, $index, $type) = @_;
665
666   $index ||= $self->index;
667   $type ||= $self->type;
668   $self->brik_help_run_undef_arg('www_search', $query) or return;
669   $self->brik_help_set_undef_arg('index', $index) or return;
670   $self->brik_help_set_undef_arg('type', $type) or return;
671
672   my $from = $self->from;
673   my $size = $self->size;
674
675   my $sj = Metabrik::String::Json->new_from_brik_init($self) or return;
676
677   my $nodes = $self->nodes;
678   for my $node (@$nodes) {
679      # http://localhost:9200/INDEX/TYPE/_search/?size=SIZE&q=QUERY
680      my $url = "$node/$index";
681      if ($type ne '*') {
682         $url .= "/$type";
683      }
684      $url .= "/_search/?from=$from&size=$size&q=".$query;
685
686      my $get = $self->SUPER::get($url) or next;
687      my $body = $get->{content};
688
689      my $decoded = $sj->decode($body) or next;
690
691      return $decoded;
692   }
693
694   return;
695}
696
697#
698# Search::Elasticsearch::Client::2_0::Direct::Indices
699#
700sub delete_index {
701   my $self = shift;
702   my ($index) = @_;
703
704   my $es = $self->_es;
705   $self->brik_help_run_undef_arg('open', $es) or return;
706   $self->brik_help_run_undef_arg('delete_index', $index) or return;
707   $self->brik_help_run_invalid_arg('delete_index', $index, 'ARRAY', 'SCALAR') or return;
708
709   my %args = (
710      index => $index,
711   );
712
713   my $r;
714   eval {
715      $r = $es->indices->delete(%args);
716   };
717   if ($@) {
718      chomp($@);
719      return $self->log->error("delete_index: delete failed for index [$index]: [$@]");
720   }
721
722   return $r;
723}
724
725#
726# Search::Elasticsearch::Client::2_0::Direct::Indices
727#
728sub delete_document {
729   my $self = shift;
730   my ($index, $type, $id) = @_;
731
732   my $es = $self->_es;
733   $self->brik_help_run_undef_arg('open', $es) or return;
734   $self->brik_help_run_undef_arg('delete_document', $index) or return;
735   $self->brik_help_run_undef_arg('delete_document', $type) or return;
736   $self->brik_help_run_undef_arg('delete_document', $id) or return;
737
738   my %args = (
739      index => $index,
740      type => $type,
741      id => $id,
742   );
743
744   my $r;
745   eval {
746      $r = $es->delete(%args);
747   };
748   if ($@) {
749      chomp($@);
750      return $self->log->error("delete_document: delete failed for index [$index]: [$@]");
751   }
752
753   return $r;
754}
755
756#
757# Search::Elasticsearch::Client::2_0::Direct::Cat
758#
759sub show_indices {
760   my $self = shift;
761   my ($string) = @_;
762
763   my $es = $self->_es;
764   $self->brik_help_run_undef_arg('open', $es) or return;
765
766   my $r;
767   eval {
768      $r = $es->cat->indices;
769   };
770   if ($@) {
771      chomp($@);
772      return $self->log->error("show_indices: failed: [$@]");
773   }
774
775   my @lines = split(/\n/, $r);
776
777   if (@lines == 0) {
778      $self->log->warning("show_indices: nothing returned, no index?");
779   }
780
781   my @filtered = ();
782   if (defined($string)) {
783      for (@lines) {
784         if (m{$string}) {
785            push @filtered, $_;
786         }
787      }
788      @lines = @filtered;
789   }
790
791   return \@lines;
792}
793
794#
795# Search::Elasticsearch::Client::2_0::Direct::Cat
796#
797sub show_nodes {
798   my $self = shift;
799
800   my $es = $self->_es;
801   $self->brik_help_run_undef_arg('open', $es) or return;
802
803   my $r;
804   eval {
805      $r = $es->cat->nodes;
806   };
807   if ($@) {
808      chomp($@);
809      return $self->log->error("show_nodes: failed: [$@]");
810   }
811
812   my @lines = split(/\n/, $r);
813
814   if (@lines == 0) {
815      $self->log->warning("show_nodes: nothing returned, no nodes?");
816   }
817
818   return \@lines;
819}
820
821#
822# Search::Elasticsearch::Client::2_0::Direct::Cat
823#
824sub show_health {
825   my $self = shift;
826
827   my $es = $self->_es;
828   $self->brik_help_run_undef_arg('open', $es) or return;
829
830   my $r;
831   eval {
832      $r = $es->cat->health;
833   };
834   if ($@) {
835      chomp($@);
836      return $self->log->error("show_health: failed: [$@]");
837   }
838
839   my @lines = split(/\n/, $r);
840
841   if (@lines == 0) {
842      $self->log->warning("show_health: nothing returned, no recovery?");
843   }
844
845   return \@lines;
846}
847
848#
849# Search::Elasticsearch::Client::2_0::Direct::Cat
850#
851sub show_recovery {
852   my $self = shift;
853
854   my $es = $self->_es;
855   $self->brik_help_run_undef_arg('open', $es) or return;
856
857   my $r;
858   eval {
859      $r = $es->cat->recovery;
860   };
861   if ($@) {
862      chomp($@);
863      return $self->log->error("show_recovery: failed: [$@]");
864   }
865
866   my @lines = split(/\n/, $r);
867
868   if (@lines == 0) {
869      $self->log->warning("show_recovery: nothing returned, no index?");
870   }
871
872   return \@lines;
873}
874
875sub list_indices {
876   my $self = shift;
877
878   my $get = $self->get_indices or return;
879
880   my @indices = ();
881   for (@$get) {
882      push @indices, $_->{index};
883   }
884
885   return [ sort { $a cmp $b } @indices ];
886}
887
888sub get_indices {
889   my $self = shift;
890
891   my $lines = $self->show_indices or return;
892   if (@$lines == 0) {
893      $self->log->warning("get_indices: no index found");
894      return [];
895   }
896
897   #
898   # Format depends on ElasticSearch version. We try to detect the format.
899   #
900   # 5.0.0:
901   # "yellow open www-2016-08-14 BmNE9RaBRSCKqB5Oe8yZcw 5 1  146 0 251.8kb 251.8kb"
902   #
903   my @indices = ();
904   for (@$lines) {
905      my @t = split(/\s+/);
906      if (@t == 10) {  # Version 5.0.0
907         my $color = $t[0];
908         my $state = $t[1];
909         my $index = $t[2];
910         my $id = $t[3];
911         my $shards = $t[4];
912         my $replicas = $t[5];
913         my $count = $t[6];
914         my $count2 = $t[7];
915         my $total_size = $t[8];
916         my $size = $t[9];
917         push @indices, {
918            color => $color,
919            state => $state,
920            index => $index,
921            id => $id,
922            shards => $shards,
923            replicas => $replicas,
924            count => $count,
925            total_size => $total_size,
926            size => $size,
927         };
928      }
929      elsif (@t == 9) {
930         my $index = $t[2];
931         push @indices, {
932            index => $index,
933         };
934      }
935      elsif (@t == 8) {
936         my $index = $t[1];
937         push @indices, {
938            index => $index,
939         };
940      }
941   }
942
943   return \@indices;
944}
945
946#
947# Search::Elasticsearch::Client::5_0::Direct::Indices
948#
949sub get_index {
950   my $self = shift;
951   my ($index) = @_;
952 
953   my $es = $self->_es;
954   $self->brik_help_run_undef_arg('open', $es) or return;
955   $self->brik_help_run_undef_arg('get_index', $index) or return;
956   $self->brik_help_run_invalid_arg('get_index', $index, 'ARRAY', 'SCALAR') or return;
957
958   my %args = (
959      index => $index,
960   );
961
962   my $r;
963   eval {
964      $r = $es->indices->get(%args);
965   };
966   if ($@) {
967      chomp($@);
968      return $self->log->error("get_index: get failed for index [$index]: [$@]");
969   }
970
971   return $r;
972}
973
974sub list_index_types {
975   my $self = shift;
976   my ($index) = @_;
977
978   my $es = $self->_es;
979   $self->brik_help_run_undef_arg('open', $es) or return;
980   $self->brik_help_run_undef_arg('list_index_types', $index) or return;
981   $self->brik_help_run_invalid_arg('list_index_types', $index, 'SCALAR') or return;
982
983   my $r = $self->get_mappings($index) or return;
984   if (keys %$r > 1) {
985      return $self->log->error("list_index_types: multiple indices found, choose one");
986   }
987
988   my @types = ();
989   for my $this_index (keys %$r) {
990      my $mappings = $r->{$this_index}{mappings};
991      push @types, keys %$mappings;
992   }
993
994   my %uniq = map { $_ => 1 } @types;
995
996   return [ sort { $a cmp $b } keys %uniq ];
997}
998
999#
1000# By default, if you provide only one index and no type,
1001# all types will be merged (including _default_)
1002# If you specify one type (other than _default_), _default_ will be merged to it.
1003#
1004sub list_index_fields {
1005   my $self = shift;
1006   my ($index, $type) = @_;
1007
1008   my $es = $self->_es;
1009   $self->brik_help_run_undef_arg('open', $es) or return;
1010   $self->brik_help_run_undef_arg('list_index_fields', $index) or return;
1011   $self->brik_help_run_invalid_arg('list_index_fields', $index, 'SCALAR') or return;
1012
1013   my $r;
1014   if (defined($type)) {
1015      $r = $self->get_mappings($index, $type) or return;
1016      if (keys %$r > 1) {
1017         return $self->log->error("list_index_fields: multiple indices found, ".
1018            "choose one");
1019      }
1020      my $r2 = $self->get_mappings($index, '_default_') or return;
1021      # Merge
1022      for my $this_index (keys %$r2) {
1023         my $default = $r2->{$this_index}{mappings}{'_default_'};
1024         $r->{$this_index}{mappings}{_default_} = $default;
1025      }
1026   }
1027   else {
1028      $r = $self->get_mappings($index) or return;
1029      if (keys %$r > 1) {
1030         return $self->log->error("list_index_fields: multiple indices found, ".
1031            "choose one");
1032      }
1033   }
1034
1035   my @fields = ();
1036   for my $this_index (keys %$r) {
1037      my $mappings = $r->{$this_index}{mappings};
1038      for my $this_type (keys %$mappings) {
1039         my $properties = $mappings->{$this_type}{properties};
1040         push @fields, keys %$properties;
1041      }
1042   }
1043
1044   my %uniq = map { $_ => 1 } @fields;
1045
1046   return [ sort { $a cmp $b } keys %uniq ];
1047}
1048
1049sub list_indices_version {
1050   my $self = shift;
1051   my ($index) = @_;
1052
1053   my $es = $self->_es;
1054   $self->brik_help_run_undef_arg('open', $es) or return;
1055   $self->brik_help_run_undef_arg('list_indices_version', $index) or return;
1056   $self->brik_help_run_invalid_arg('list_indices_version', $index, 'ARRAY', 'SCALAR')
1057      or return;
1058
1059   my $r = $self->get_index($index) or return;
1060
1061   my @list = ();
1062   for my $this (keys %$r) {
1063      my $name = $this;
1064      my $version = $r->{$this}{settings}{index}{version}{created};
1065      push @list, {
1066         index => $name,
1067         version => $version,
1068      };
1069   }
1070
1071   return \@list;
1072}
1073
1074sub open_index {
1075   my $self = shift;
1076   my ($index) = @_;
1077
1078   my $es = $self->_es;
1079   $self->brik_help_run_undef_arg('open', $es) or return;
1080   $self->brik_help_run_undef_arg('open_index', $index) or return;
1081   $self->brik_help_run_invalid_arg('open_index', $index, 'ARRAY', 'SCALAR') or return;
1082
1083   my $r;
1084   eval {
1085      $r = $es->indices->open(
1086         index => $index,
1087      );
1088   };
1089   if ($@) {
1090      chomp($@);
1091      return $self->log->error("open_index: failed: [$@]");
1092   }
1093
1094   return $r;
1095}
1096
1097sub close_index {
1098   my $self = shift;
1099   my ($index) = @_;
1100
1101   my $es = $self->_es;
1102   $self->brik_help_run_undef_arg('open', $es) or return;
1103   $self->brik_help_run_undef_arg('close_index', $index) or return;
1104   $self->brik_help_run_invalid_arg('close_index', $index, 'ARRAY', 'SCALAR') or return;
1105
1106   my $r;
1107   eval {
1108      $r = $es->indices->close(
1109         index => $index,
1110      );
1111   };
1112   if ($@) {
1113      chomp($@);
1114      return $self->log->error("close_index: failed: [$@]");
1115   }
1116
1117   return $r;
1118}
1119
1120#
1121# Search::Elasticsearch::Client::5_0::Direct::Indices
1122#
1123sub get_aliases {
1124   my $self = shift;
1125   my ($index) = @_;
1126
1127   $index ||= $self->index;
1128   my $es = $self->_es;
1129   $self->brik_help_run_undef_arg('open', $es) or return;
1130
1131   my %args = (
1132      index => $index,
1133   );
1134
1135   my $r;
1136   eval {
1137      $r = $es->indices->get(%args);
1138   };
1139   if ($@) {
1140      chomp($@);
1141      return $self->log->error("get_aliases: get_aliases failed: [$@]");
1142   }
1143
1144   my %aliases = ();
1145   for my $this (keys %$r) {
1146      $aliases{$this} = $r->{$this}{aliases};
1147   }
1148
1149   return \%aliases;
1150}
1151
1152#
1153# Search::Elasticsearch::Client::5_0::Direct::Indices
1154#
1155sub put_alias {
1156   my $self = shift;
1157   my ($index, $alias) = @_;
1158
1159   my $es = $self->_es;
1160   $self->brik_help_run_undef_arg('open', $es) or return;
1161   $self->brik_help_run_undef_arg('put_alias', $index) or return;
1162   $self->brik_help_run_undef_arg('put_alias', $alias) or return;
1163
1164   my %args = (
1165      index => $index,
1166      name => $alias,
1167   );
1168
1169   my $r;
1170   eval {
1171      $r = $es->indices->put_alias(%args);
1172   };
1173   if ($@) {
1174      chomp($@);
1175      return $self->log->error("put_alias: put_alias failed: [$@]");
1176   }
1177
1178   return $r;
1179}
1180
1181#
1182# Search::Elasticsearch::Client::5_0::Direct::Indices
1183#
1184sub delete_alias {
1185   my $self = shift;
1186   my ($index, $alias) = @_;
1187
1188   my $es = $self->_es;
1189   $self->brik_help_run_undef_arg('open', $es) or return;
1190   $self->brik_help_run_undef_arg('delete_alias', $index) or return;
1191   $self->brik_help_run_undef_arg('delete_alias', $alias) or return;
1192
1193   my %args = (
1194      index => $index,
1195      name => $alias,
1196   );
1197
1198   my $r;
1199   eval {
1200      $r = $es->indices->delete_alias(%args);
1201   };
1202   if ($@) {
1203      chomp($@);
1204      return $self->log->error("delete_alias: delete_alias failed: [$@]");
1205   }
1206
1207   return $r;
1208}
1209
1210sub update_alias {
1211   my $self = shift;
1212   my ($new_index, $alias) = @_;
1213
1214   my $es = $self->_es;
1215   $self->brik_help_run_undef_arg('open', $es) or return;
1216   $self->brik_help_run_undef_arg('update_alias', $new_index) or return;
1217   $self->brik_help_run_undef_arg('update_alias', $alias) or return;
1218
1219   # Search for previous index with that alias, if any.
1220   my $prev_index;
1221   my $aliases = $self->get_aliases or return;
1222   while (my ($k, $v) = each %$aliases) {
1223      for my $this (keys %$v) {
1224         if ($this eq $alias) {
1225            $prev_index = $k;
1226            last;
1227         }
1228      }
1229      last if $prev_index;
1230   }
1231
1232   # Delete previous alias if it exists.
1233   if (defined($prev_index)) {
1234      $self->delete_alias($prev_index, $alias) or return;
1235   }
1236
1237   return $self->put_alias($new_index, $alias);
1238}
1239
1240#
1241# Search::Elasticsearch::Client::2_0::Direct::Indices
1242#
1243sub get_mappings {
1244   my $self = shift;
1245   my ($index, $type) = @_;
1246
1247   my $es = $self->_es;
1248   $self->brik_help_run_undef_arg('open', $es) or return;
1249   $self->brik_help_run_undef_arg('get_mappings', $index) or return;
1250   $self->brik_help_run_invalid_arg('get_mappings', $index, 'ARRAY', 'SCALAR') or return;
1251
1252   my %args = (
1253      index => $index,
1254      type => $type,
1255   );
1256
1257   my $r;
1258   eval {
1259      $r = $es->indices->get_mapping(%args);
1260   };
1261   if ($@) {
1262      chomp($@);
1263      return $self->log->error("get_mappings: get_mapping failed for index [$index]: ".
1264         "[$@]");
1265   }
1266
1267   return $r;
1268}
1269
1270#
1271# Search::Elasticsearch::Client::2_0::Direct::Indices
1272#
1273sub create_index {
1274   my $self = shift;
1275   my ($index, $shards_count) = @_;
1276
1277   my $es = $self->_es;
1278   $self->brik_help_run_undef_arg('open', $es) or return;
1279   $self->brik_help_run_undef_arg('create_index', $index) or return;
1280         
1281   my $r;
1282   eval {
1283      $r = $es->indices->create(
1284         index => $index,
1285      );
1286   };
1287   if ($@) {
1288      chomp($@);
1289      return $self->log->error("create_index: create failed for index [$index]: [$@]");
1290   }
1291   
1292   return $r;
1293}
1294
1295#
1296# https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html
1297#
1298sub create_index_with_mappings {
1299   my $self = shift;
1300   my ($index, $mappings) = @_;
1301
1302   my $es = $self->_es;
1303   $self->brik_help_run_undef_arg('open', $es) or return;
1304   $self->brik_help_run_undef_arg('create_index_with_mappings', $index) or return;
1305   $self->brik_help_run_undef_arg('create_index_with_mappings', $mappings) or return;
1306   $self->brik_help_run_invalid_arg('create_index_with_mappings', $mappings, 'HASH') or return;
1307
1308   my $r;
1309   eval {
1310      $r = $es->indices->create(
1311         index => $index,
1312         body => {
1313            mappings => $mappings,
1314         },
1315      );
1316   };
1317   if ($@) {
1318      chomp($@);
1319      return $self->log->error("create_index_with_mappings: create failed for index [$index]: [$@]");
1320   }
1321
1322   return $r;
1323}
1324
1325# GET http://localhost:9200/
1326sub info {
1327   my $self = shift;
1328   my ($nodes) = @_;
1329
1330   $nodes ||= $self->nodes;
1331   $self->brik_help_run_undef_arg('info', $nodes) or return;
1332   $self->brik_help_run_invalid_arg('info', $nodes, 'ARRAY') or return;
1333   $self->brik_help_run_empty_array_arg('info', $nodes) or return;
1334
1335   my $first = $nodes->[0];
1336
1337   $self->get($first) or return;
1338
1339   return $self->content;
1340}
1341
1342sub version {
1343   my $self = shift;
1344   my ($nodes) = @_;
1345
1346   $nodes ||= $self->nodes;
1347   $self->brik_help_run_undef_arg('version', $nodes) or return;
1348   $self->brik_help_run_invalid_arg('version', $nodes, 'ARRAY') or return;
1349   $self->brik_help_run_empty_array_arg('version', $nodes) or return;
1350
1351   my $first = $nodes->[0];
1352
1353   $self->get($first) or return;
1354   my $content = $self->content or return;
1355
1356   return $content->{version}{number};
1357}
1358
1359#
1360# Search::Elasticsearch::Client::2_0::Direct::Indices
1361#
1362sub get_templates {
1363   my $self = shift;
1364
1365   my $es = $self->_es;
1366   $self->brik_help_run_undef_arg('open', $es) or return;
1367
1368   my $r;
1369   eval {
1370      $r = $es->indices->get_template;
1371   };
1372   if ($@) {
1373      chomp($@);
1374      return $self->log->error("get_templates: failed: [$@]");
1375   }
1376
1377   return $r;
1378}
1379
1380sub list_templates {
1381   my $self = shift;
1382
1383   my $content = $self->get_templates or return;
1384
1385   return [ sort { $a cmp $b } keys %$content ];
1386}
1387
1388#
1389# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
1390#
1391sub get_template {
1392   my $self = shift;
1393   my ($template) = @_;
1394
1395   my $es = $self->_es;
1396   $self->brik_help_run_undef_arg('open', $es) or return;
1397   $self->brik_help_run_undef_arg('get_template', $template) or return;
1398
1399   my $r;
1400   eval {
1401      $r = $es->indices->get_template(
1402         name => $template,
1403      );
1404   };
1405   if ($@) {
1406      chomp($@);
1407      return $self->log->error("get_template: template failed for name [$template]: [$@]");
1408   }
1409
1410   return $r;
1411}
1412
1413#
1414# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
1415#
1416sub put_template {
1417   my $self = shift;
1418   my ($name, $template) = @_;
1419
1420   my $es = $self->_es;
1421   $self->brik_help_run_undef_arg('open', $es) or return;
1422   $self->brik_help_run_undef_arg('put_template', $name) or return;
1423   $self->brik_help_run_undef_arg('put_template', $template) or return;
1424   $self->brik_help_run_invalid_arg('put_template', $template, 'HASH') or return;
1425
1426   my $r;
1427   eval {
1428      $r = $es->indices->put_template(
1429         name => $name,
1430         body => $template,
1431      );
1432   };
1433   if ($@) {
1434      chomp($@);
1435      return $self->log->error("put_template: template failed for name [$name]: [$@]");
1436   }
1437
1438   return $r;
1439}
1440
1441sub put_template_from_json_file {
1442   my $self = shift;
1443   my ($json_file) = @_;
1444
1445   my $es = $self->_es;
1446   $self->brik_help_run_undef_arg('open', $es) or return;
1447   $self->brik_help_run_undef_arg('put_template_from_json_file', $json_file) or return;
1448   $self->brik_help_run_file_not_found('put_template_from_json_file', $json_file) or return;
1449
1450   my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
1451   my $data = $fj->read($json_file) or return;
1452
1453   if (! exists($data->{template})) {
1454      return $self->log->error("put_template_from_json_file: no template name found");
1455   }
1456
1457   my $name = $data->{template};
1458
1459   return $self->put_template($name, $data);
1460}
1461
1462#
1463# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-settings.html
1464# Search::Elasticsearch::Client::2_0::Direct::Indices
1465#
1466sub get_settings {
1467   my $self = shift;
1468   my ($indices, $names) = @_;
1469
1470   my $es = $self->_es;
1471   $self->brik_help_run_undef_arg('open', $es) or return;
1472
1473   my %args = ();
1474   if (defined($indices)) {
1475      $self->brik_help_run_undef_arg('get_settings', $indices) or return;
1476      my $ref = $self->brik_help_run_invalid_arg('get_settings', $indices, 'ARRAY', 'SCALAR')
1477         or return;
1478      $args{index} = $indices;
1479   }
1480   if (defined($names)) {
1481      $self->brik_help_run_file_not_found('get_settings', $names) or return;
1482      my $ref = $self->brik_help_run_invalid_arg('get_settings', $names, 'ARRAY', 'SCALAR')
1483         or return;
1484      $args{name} = $names;
1485   }
1486
1487   my $r;
1488   eval {
1489      $r = $es->indices->get_settings(%args);
1490   };
1491   if ($@) {
1492      chomp($@);
1493      return $self->log->error("get_settings: failed: [$@]");
1494   }
1495
1496   return $r;
1497}
1498
1499#
1500# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-settings.html
1501# Search::Elasticsearch::Client::2_0::Direct::Indices
1502#
1503# Example:
1504#
1505# run client::elasticsearch put_settings "{ index => { refresh_interval => -1 } }"
1506#
1507# XXX: should be renamed to put_index_settings
1508#
1509sub put_settings {
1510   my $self = shift;
1511   my ($settings, $indices) = @_;
1512
1513   my $es = $self->_es;
1514   $self->brik_help_run_undef_arg('open', $es) or return;
1515   $self->brik_help_run_undef_arg('put_settings', $settings) or return;
1516   $self->brik_help_run_invalid_arg('put_settings', $settings, 'HASH') or return;
1517
1518   my %args = (
1519      body => $settings,
1520   );
1521   if (defined($indices)) {
1522      $self->brik_help_run_undef_arg('put_settings', $indices) or return;
1523      my $ref = $self->brik_help_run_invalid_arg('put_settings', $indices, 'ARRAY', 'SCALAR')
1524         or return;
1525      $args{index} = $indices;
1526   }
1527
1528   my $r;
1529   eval {
1530      $r = $es->indices->put_settings(%args);
1531   };
1532   if ($@) {
1533      chomp($@);
1534      return $self->log->error("put_settings: failed: [$@]");
1535   }
1536
1537   return $r;
1538}
1539
1540sub set_index_number_of_replicas {
1541   my $self = shift;
1542   my ($indices, $number) = @_;
1543
1544   my $es = $self->_es;
1545   $self->brik_help_run_undef_arg('open', $es) or return;
1546   $self->brik_help_run_undef_arg('set_index_number_of_replicas', $indices) or return;
1547   $self->brik_help_run_invalid_arg('set_index_number_of_replicas', $indices, 'ARRAY', 'SCALAR')
1548      or return;
1549
1550   my $settings = { number_of_replicas => $number };
1551
1552   return $self->put_settings($settings, $indices);
1553}
1554
1555sub set_index_refresh_interval {
1556   my $self = shift;
1557   my ($indices, $number) = @_;
1558
1559   my $es = $self->_es;
1560   $self->brik_help_run_undef_arg('open', $es) or return;
1561   $self->brik_help_run_undef_arg('set_index_refresh_interval', $indices) or return;
1562   $self->brik_help_run_invalid_arg('set_index_refresh_interval', $indices, 'ARRAY', 'SCALAR')
1563      or return;
1564
1565   # If there is a meaningful value not postfixed with a unity,
1566   # we default to add a `s' for a number of seconds.
1567   if ($number =~ /^\d+$/ && $number > 0) {
1568      $number .= 's';
1569   }
1570
1571   my $settings = { refresh_interval => $number };
1572
1573   return $self->put_settings($settings, $indices);
1574}
1575
1576sub get_index_number_of_replicas {
1577   my $self = shift;
1578   my ($indices) = @_;
1579
1580   my $es = $self->_es;
1581   $self->brik_help_run_undef_arg('open', $es) or return;
1582   $self->brik_help_run_undef_arg('get_index_number_of_replicas', $indices) or return;
1583   $self->brik_help_run_invalid_arg('get_index_number_of_replicas', $indices, 'ARRAY', 'SCALAR')
1584      or return;
1585
1586   my $settings = $self->get_settings($indices);
1587
1588   my %indices = ();
1589   for (keys %$settings) {
1590      $indices{$_} = $settings->{$_}{settings}{index}{number_of_replicas};
1591   }
1592
1593   return \%indices;
1594}
1595
1596sub get_index_refresh_interval {
1597   my $self = shift;
1598   my ($indices, $number) = @_;
1599
1600   my $es = $self->_es;
1601   $self->brik_help_run_undef_arg('open', $es) or return;
1602   $self->brik_help_run_undef_arg('get_index_refresh_interval', $indices) or return;
1603   $self->brik_help_run_invalid_arg('get_index_refresh_interval', $indices, 'ARRAY', 'SCALAR')
1604      or return;
1605
1606   my $settings = $self->get_settings($indices);
1607
1608   my %indices = ();
1609   for (keys %$settings) {
1610      $indices{$_} = $settings->{$_}{settings}{index}{refresh_interval};
1611   }
1612
1613   return \%indices;
1614}
1615
1616sub get_index_number_of_shards {
1617   my $self = shift;
1618   my ($indices, $number) = @_;
1619
1620   my $es = $self->_es;
1621   $self->brik_help_run_undef_arg('open', $es) or return;
1622   $self->brik_help_run_undef_arg('get_index_number_of_shards', $indices) or return;
1623   $self->brik_help_run_invalid_arg('get_index_number_of_shards', $indices, 'ARRAY', 'SCALAR')
1624      or return;
1625
1626   my $settings = $self->get_settings($indices);
1627
1628   my %indices = ();
1629   for (keys %$settings) {
1630      $indices{$_} = $settings->{$_}{settings}{index}{number_of_shards};
1631   }
1632
1633   return \%indices;
1634}
1635
1636#
1637# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
1638#
1639sub delete_template {
1640   my $self = shift;
1641   my ($name) = @_;
1642
1643   my $es = $self->_es;
1644   $self->brik_help_run_undef_arg('open', $es) or return;
1645   $self->brik_help_run_undef_arg('delete_template', $name) or return;
1646
1647   my $r;
1648   eval {
1649      $r = $es->indices->delete_template(
1650         name => $name,
1651      );
1652   };
1653   if ($@) {
1654      chomp($@);
1655      return $self->log->error("delete_template: failed for name [$name]: [$@]");
1656   }
1657
1658   return $r;
1659}
1660
1661#
1662# Return a boolean to state for index existence
1663#
1664sub is_index_exists {
1665   my $self = shift;
1666   my ($index) = @_;
1667
1668   my $es = $self->_es;
1669   $self->brik_help_run_undef_arg('open', $es) or return;
1670   $self->brik_help_run_undef_arg('is_index_exists', $index) or return;
1671
1672   my $r;
1673   eval {
1674      $r = $es->indices->exists(
1675         index => $index,
1676      );
1677   };
1678   if ($@) {
1679      chomp($@);
1680      return $self->log->error("is_index_exists: failed for index [$index]: [$@]");
1681   }
1682
1683   return $r ? 1 : 0;
1684}
1685
1686#
1687# Return a boolean to state for index with type existence
1688#
1689sub is_type_exists {
1690   my $self = shift;
1691   my ($index, $type) = @_;
1692
1693   my $es = $self->_es;
1694   $self->brik_help_run_undef_arg('open', $es) or return;
1695   $self->brik_help_run_undef_arg('is_type_exists', $index) or return;
1696   $self->brik_help_run_undef_arg('is_type_exists', $type) or return;
1697
1698   my $r;
1699   eval {
1700      $r = $es->indices->exists_type(
1701         index => $index,
1702         type => $type,
1703      );
1704   };
1705   if ($@) {
1706      chomp($@);
1707      return $self->log->error("is_type_exists: failed for index [$index] and ".
1708         "type [$type]: [$@]");
1709   }
1710
1711   return $r ? 1 : 0;
1712}
1713
1714#
1715# Return a boolean to state for document existence
1716#
1717sub is_document_exists {
1718   my $self = shift;
1719   my ($index, $type, $document) = @_;
1720
1721   my $es = $self->_es;
1722   $self->brik_help_run_undef_arg('open', $es) or return;
1723   $self->brik_help_run_undef_arg('is_document_exists', $index) or return;
1724   $self->brik_help_run_undef_arg('is_document_exists', $type) or return;
1725   $self->brik_help_run_undef_arg('is_document_exists', $document) or return;
1726   $self->brik_help_run_invalid_arg('is_document_exists', $document, 'HASH') or return;
1727
1728   my $r;
1729   eval {
1730      $r = $es->exists(
1731         index => $index,
1732         type => $type,
1733         %$document,
1734      );
1735   };
1736   if ($@) {
1737      chomp($@);
1738      return $self->log->error("is_document_exists: failed for index [$index] and ".
1739         "type [$type]: [$@]");
1740   }
1741
1742   return $r ? 1 : 0;
1743}
1744
1745sub parse_error_string {
1746   my $self = shift;
1747   my ($string) = @_;
1748
1749   $self->brik_help_run_undef_arg('parse_error_string', $string) or return;
1750
1751   # [Timeout] ** [http://X.Y.Z.1:9200]-[599] Timed out while waiting for socket to become ready for reading, called from sub Search::Elasticsearch::Role::Client::Direct::__ANON__ at /usr/local/lib/perl5/site_perl/Metabrik/Client/Elasticsearch.pm line 1466. With vars: {'status_code' => 599,'request' => {'body' => undef,'qs' => {},'ignore' => [],'serialize' => 'std','path' => '/index-thing/_refresh','method' => 'POST'}}
1752
1753   my ($class, $node, $code, $message, $dump) = $string =~
1754      m{^\[([^]]+)\] \*\* \[([^]]+)\]\-\[(\d+)\] (.+)\. With vars: (.+)$};
1755
1756   if (defined($dump) && length($dump)) {
1757      my $sd = Metabrik::String::Dump->new_from_brik_init($self) or return;
1758      $dump = $sd->decode($dump);
1759   }
1760
1761   # Sanity check
1762   if ($node =~ m{^http} && $code =~ m{^\d+$} && defined($dump) && ref($dump) eq 'HASH') {
1763      return {
1764         class => $class,
1765         node => $node,
1766         code => $code,
1767         message => $message,
1768         dump => $dump,
1769      };
1770   }
1771
1772   # Were not able to decode, we return as-is.
1773   return {
1774      message => $string,
1775   };
1776}
1777
1778#
1779# Refresh an index to receive latest additions
1780#
1781# Search::Elasticsearch::Client::5_0::Direct::Indices
1782# https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-refresh.html
1783#
1784sub refresh_index {
1785   my $self = shift;
1786   my ($index) = @_;
1787
1788   my $es = $self->_es;
1789   $self->brik_help_run_undef_arg('open', $es) or return;
1790   $self->brik_help_run_undef_arg('refresh_index', $index) or return;
1791
1792   my $try = $self->try;
1793
1794RETRY:
1795
1796   my $r;
1797   eval {
1798      $r = $es->indices->refresh(
1799         index => $index,
1800      );
1801   };
1802   if ($@) {
1803      if (--$try == 0) {
1804         chomp($@);
1805         my $p = $self->parse_error_string($@);
1806         if (defined($p) && exists($p->{class})) {
1807            my $class = $p->{class};
1808            my $code = $p->{code};
1809            my $node = $p->{node};
1810            return $self->log->error("refresh_index: failed for index [$index] ".
1811               "after [$try] tries with error [$class] code [$code] for node [$node]");
1812         }
1813         else {
1814            return $self->log->error("refresh_index: failed for index [$index] ".
1815               "after [$try]: [$@]");
1816         }
1817      }
1818      sleep 60;
1819      goto RETRY;
1820   }
1821
1822   return $r;
1823}
1824
1825sub export_as_csv {
1826   my $self = shift;
1827   my ($index, $size) = @_;
1828
1829   $size ||= 10_000;
1830   my $es = $self->_es;
1831   $self->brik_help_run_undef_arg('open', $es) or return;
1832   $self->brik_help_run_undef_arg('export_as_csv', $index) or return;
1833   $self->brik_help_run_undef_arg('export_as_csv', $size) or return;
1834
1835   my $max = $self->max;
1836
1837   my $scroll;
1838   my $version = $self->version or return;
1839   if ($version lt "5.0.0") {
1840      $scroll = $self->open_scroll_scan_mode($index, $size) or return;
1841   }
1842   else {
1843      $scroll = $self->open_scroll($index, $size) or return;
1844   }
1845
1846   my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;
1847
1848   my $fc = Metabrik::File::Csv->new_from_brik_init($self) or return;
1849   $fc->separator(',');
1850   $fc->escape('\\');
1851   $fc->append(1);
1852   $fc->first_line_is_header(0);
1853   $fc->write_header(1);
1854   $fc->use_quoting(1);
1855   $fc->encoded_fields($self->csv_encoded_fields);
1856   $fc->object_fields($self->csv_object_fields);
1857
1858   my $total = $self->total_scroll;
1859   $self->log->info("export_as_csv: total [$total] for index [$index]");
1860
1861   my $h = {};
1862   my %types = ();
1863   my $read = 0;
1864   my $skipped = 0;
1865   my $exported = 0;
1866   my $start = time();
1867   my $done = 'output.exported';
1868   my $start_time = time();
1869   while (my $this = $self->next_scroll) {
1870      $read++;
1871      my $id = $this->{_id};
1872      my $doc = $this->{_source};
1873      my $type = $this->{_type};
1874      if (! exists($types{$type})) {
1875         my $fields = $self->list_index_fields($index, $type) or return;
1876         #$types{$type}{header} = [ '_id', sort { $a cmp $b } keys %$doc ];
1877         $types{$type}{header} = [ '_id', @$fields ];
1878         $types{$type}{output} = "$index:$type.csv";
1879         $done = $types{$type}{output_exported} = "$index:$type.csv.exported";
1880
1881         # Verify it has not been exported yet
1882         if (-f $types{$type}{output_exported}) {
1883            return $self->log->error("export_as_csv: export already done for index ".
1884               "[$index] with type [$type] and file [$index:$type.csv]");
1885         }
1886
1887         $self->log->info("export_as_csv: exporting to file [$index:$type.csv] ".
1888            "for new type [$type], using chunk size of [$size]");
1889      }
1890
1891      $h->{_id} = $id;
1892
1893      for my $k (keys %$doc) {
1894         $h->{$k} = $doc->{$k};
1895      }
1896
1897      $fc->header($types{$type}{header});
1898      my $r = $fc->write([ $h ], $types{$type}{output});
1899      if (!defined($r)) {
1900         $self->log->warning("export_as_csv: unable to process entry, skipping");
1901         $skipped++;
1902         next;
1903      }
1904
1905      # Log a status sometimes.
1906      if (! (++$exported % 100_000)) {
1907         my $now = time();
1908         $self->log->info("export_as_csv: fetched [$exported/$total] elements in ".
1909            ($now - $start)." second(s) from index [$index]");
1910         $start = time();
1911      }
1912
1913      # Limit export to specified maximum
1914      if ($max > 0 && $exported >= $max) {
1915         $self->log->info("export_as_csv: max export reached [$exported] for index ".
1916            "[$index], stopping");
1917         last;
1918      }
1919   }
1920
1921   $self->close_scroll;
1922
1923   my $stop_time = time();
1924   my $duration = $stop_time - $start_time;
1925   my $eps = $exported;
1926   if ($duration > 0) {
1927      $eps = $exported / $duration;
1928   }
1929
1930   my $result = {
1931      read => $read,
1932      exported => $exported,
1933      skipped => $read - $exported,
1934      total_count => $total,
1935      complete => ($exported == $total) ? 1 : 0,
1936      duration => $duration,
1937      eps => $eps, 
1938   };
1939
1940   # Say the file has been processed, and put resulting stats.
1941   $fd->write($result, $done) or return;
1942
1943   return $result;
1944}
1945
1946#
1947# Optimization instructions:
1948# https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html
1949#
1950sub import_from_csv {
1951   my $self = shift;
1952   my ($input_csv, $index, $type) = @_;
1953
1954   my $es = $self->_es;
1955   $self->brik_help_run_undef_arg('open', $es) or return;
1956   $self->brik_help_run_undef_arg('import_from_csv', $input_csv) or return;
1957   $self->brik_help_run_file_not_found('import_from_csv', $input_csv) or return;
1958
1959   # If index and/or types are not defined, we try to get them from input filename
1960   if (! defined($index) || ! defined($type)) {
1961      # Example: index-DATE:type.csv
1962      if ($input_csv =~ m{^(.+):(.+)\.csv(?:.*)?$}) {
1963         my ($this_index, $this_type) = $input_csv =~ m{^(.+):(.+)\.csv(?:.*)?$};
1964         $index ||= $this_index;
1965         $type ||= $this_type;
1966      }
1967   }
1968
1969   # Verify it has not been indexed yet
1970   my $done = "$input_csv.imported";
1971   if (-f $done) {
1972      $self->log->info("import_from_csv: import already done for file [$input_csv]");
1973      return 0;
1974   }
1975
1976   # And default to Attributes if guess failed.
1977   $index ||= $self->index;
1978   $type ||= $self->type;
1979   $self->brik_help_set_undef_arg('index', $index) or return;
1980   $self->brik_help_set_undef_arg('type', $type) or return;
1981
1982   if ($index eq '*') {
1983      return $self->log->error("import_from_csv: cannot import to invalid index [$index]");
1984   }
1985   if ($type eq '*') {
1986      return $self->log->error("import_from_csv: cannot import to invalid type [$type]");
1987   }
1988
1989   $self->log->debug("input [$input_csv]");
1990   $self->log->debug("index [$index]");
1991   $self->log->debug("type [$type]");
1992
1993   my $count_before = 0;
1994   if ($self->is_index_exists($index)) {
1995      $count_before = $self->count($index, $type);
1996      if (! defined($count_before)) {
1997         return;
1998      }
1999      $self->log->info("import_from_csv: current index [$index] count is ".
2000         "[$count_before]");
2001   }
2002
2003   my $max = $self->max;
2004
2005   $self->open_bulk_mode($index, $type) or return;
2006
2007   $self->log->info("import_from_csv: importing file [$input_csv] to index [$index] ".
2008      "with type [$type]");
2009
2010   my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;
2011
2012   my $fc = Metabrik::File::Csv->new_from_brik_init($self) or return;
2013   $fc->separator(',');
2014   $fc->escape('\\');
2015   $fc->first_line_is_header(1);
2016   $fc->encoded_fields($self->csv_encoded_fields);
2017   $fc->object_fields($self->csv_object_fields);
2018
2019   my $refresh_interval;
2020   my $number_of_replicas;
2021   my $start = time();
2022   my $speed_settings = {};
2023   my $imported = 0;
2024   my $first = 1;
2025   my $read = 0;
2026   my $skipped_chunks = 0;
2027   my $start_time = time();
2028   while (my $this = $fc->read_next($input_csv)) {
2029      $read++;
2030
2031      my $h = {};
2032      my $id = $this->{_id};
2033      delete $this->{_id};
2034      for my $k (keys %$this) {
2035         my $value = $this->{$k};
2036         # We keep only fields when they have a value.
2037         # No need to index data that is empty.
2038         if (defined($value) && length($value)) {
2039            $h->{$k} = $value;
2040         }
2041      }
2042
2043      my $r = $self->index_bulk($h, $index, $type, $id);
2044      if (! defined($r)) {
2045         $self->log->error("import_from_csv: bulk processing failed for index [$index] ".
2046            "at read [$read], skipping chunk");
2047         $skipped_chunks++;
2048         next;
2049      }
2050
2051      # Gather index settings, and set values for speed.
2052      # We don't do it earlier, cause we need index to be created,
2053      # and it should have been done from index_bulk Command.
2054      if ($first && $self->is_index_exists($index)) {
2055         # Save current values so we can restore them at the end of Command.
2056         # We ignore errors here, this is non-blocking for indexing.
2057         $refresh_interval = $self->get_index_refresh_interval($index);
2058         $refresh_interval = $refresh_interval->{$index};
2059         $number_of_replicas = $self->get_index_number_of_replicas($index);
2060         $number_of_replicas = $number_of_replicas->{$index};
2061         if ($self->use_indexing_optimizations) {
2062            $self->set_index_number_of_replicas($index, 0);
2063         }
2064         $self->set_index_refresh_interval($index, -1);
2065         $first = 0;
2066      }
2067
2068      # Log a status sometimes.
2069      if (! (++$imported % 100_000)) {
2070         my $now = time();
2071         $self->log->info("import_from_csv: imported [$imported] entries in ".
2072            ($now - $start)." second(s) to index [$index]");
2073         $start = time();
2074      }
2075
2076      # Limit import to specified maximum
2077      if ($max > 0 && $imported >= $max) {
2078         $self->log->info("import_from_csv: max import reached [$imported] for ".
2079            "index [$index], stopping");
2080         last;
2081      }
2082   }
2083
2084   $self->bulk_flush;
2085
2086   my $stop_time = time();
2087   my $duration = $stop_time - $start_time;
2088   my $eps = $imported / ($duration || 1);  # Avoid divide by zero error.
2089
2090   $self->refresh_index($index);
2091
2092   my $count_current = $self->count($index, $type) or return;
2093   $self->log->info("import_from_csv: after index [$index] count is [$count_current]");
2094
2095   my $skipped = 0;
2096   my $complete = (($count_current - $count_before) == $read) ? 1 : 0;
2097   if ($complete) {  # If complete, import has been retried, and everything is now ok.
2098      $imported = $read;
2099   }
2100   else {
2101      $skipped = $read - ($count_current - $count_before);
2102   }
2103
2104   my $result = {
2105      read => $read,
2106      imported => $imported,
2107      skipped => $skipped,
2108      previous_count => $count_before,
2109      current_count => $count_current,
2110      complete => $complete,
2111      duration => $duration,
2112      eps => $eps,
2113   };
2114
2115   # Say the file has been processed, and put resulting stats.
2116   $fd->write($result, $done) or return;
2117
2118   # Restore previous settings, if any
2119   if (defined($refresh_interval)) {
2120      $self->set_index_refresh_interval($index, $refresh_interval);
2121   }
2122   if (defined($number_of_replicas) && $self->use_indexing_optimizations) {
2123      $self->set_index_number_of_replicas($index, $number_of_replicas);
2124   }
2125
2126   return $result;
2127}
2128
2129#
2130# http://localhost:9200/_nodes/stats/process?pretty
2131#
2132# Search::Elasticsearch::Client::2_0::Direct::Nodes
2133#
2134sub get_stats_process {
2135   my $self = shift;
2136
2137   my $es = $self->_es;
2138   $self->brik_help_run_undef_arg('open', $es) or return;
2139
2140   my $r;
2141   eval {
2142      $r = $es->nodes->stats(
2143         metric => [ qw(process) ],
2144      );
2145   };
2146   if ($@) {
2147      chomp($@);
2148      return $self->log->error("get_stats_process: failed: [$@]");
2149   }
2150
2151   return $r;
2152}
2153
2154#
2155# curl http://localhost:9200/_nodes/process?pretty
2156#
2157# Search::Elasticsearch::Client::2_0::Direct::Nodes
2158#
2159sub get_process {
2160   my $self = shift;
2161
2162   my $es = $self->_es;
2163   $self->brik_help_run_undef_arg('open', $es) or return;
2164
2165   my $r;
2166   eval {
2167      $r = $es->nodes->info(
2168         metric => [ qw(process) ],
2169      );
2170   };
2171   if ($@) {
2172      chomp($@);
2173      return $self->log->error("get_process: failed: [$@]");
2174   }
2175
2176   return $r;
2177}
2178
2179#
2180# Search::Elasticsearch::Client::2_0::Direct::Cluster
2181#
2182sub get_cluster_state {
2183   my $self = shift;
2184
2185   my $es = $self->_es;
2186   $self->brik_help_run_undef_arg('open', $es) or return;
2187
2188   my $r;
2189   eval {
2190      $r = $es->cluster->state;
2191   };
2192   if ($@) {
2193      chomp($@);
2194      return $self->log->error("get_cluster_state: failed: [$@]");
2195   }
2196
2197   return $r;
2198}
2199
2200#
2201# Search::Elasticsearch::Client::2_0::Direct::Cluster
2202#
2203sub get_cluster_health {
2204   my $self = shift;
2205
2206   my $es = $self->_es;
2207   $self->brik_help_run_undef_arg('open', $es) or return;
2208
2209   my $r;
2210   eval {
2211      $r = $es->cluster->health;
2212   };
2213   if ($@) {
2214      chomp($@);
2215      return $self->log->error("get_cluster_health: failed: [$@]");
2216   }
2217
2218   return $r;
2219}
2220
2221#
2222# Search::Elasticsearch::Client::2_0::Direct::Cluster
2223#
2224sub get_cluster_settings {
2225   my $self = shift;
2226
2227   my $es = $self->_es;
2228   $self->brik_help_run_undef_arg('open', $es) or return;
2229
2230   my $r;
2231   eval {
2232      $r = $es->cluster->get_settings;
2233   };
2234   if ($@) {
2235      chomp($@);
2236      return $self->log->error("get_cluster_settings: failed: [$@]");
2237   }
2238
2239   return $r;
2240}
2241
2242#
2243# Search::Elasticsearch::Client::2_0::Direct::Cluster
2244#
2245sub put_cluster_settings {
2246   my $self = shift;
2247   my ($settings) = @_;
2248
2249   my $es = $self->_es;
2250   $self->brik_help_run_undef_arg('open', $es) or return;
2251   $self->brik_help_run_undef_arg('put_cluster_settings', $settings) or return;
2252   $self->brik_help_run_invalid_arg('put_cluster_settings', $settings, 'HASH') or return;
2253
2254   my %args = (
2255      body => $settings,
2256   );
2257
2258   my $r;
2259   eval {
2260      $r = $es->cluster->put_settings(%args);
2261   };
2262   if ($@) {
2263      chomp($@);
2264      return $self->log->error("put_cluster_settings: failed: [$@]");
2265   }
2266
2267   return $r;
2268}
2269
2270sub count_green_indices {
2271   my $self = shift;
2272
2273   my $get = $self->show_indices or return;
2274
2275   my $count = 0;
2276   for (@$get) {
2277      if (/^\s*green\s+/) {
2278         $count++;
2279      }
2280   }
2281
2282   return $count;
2283}
2284
2285sub count_yellow_indices {
2286   my $self = shift;
2287
2288   my $get = $self->show_indices or return;
2289
2290   my $count = 0;
2291   for (@$get) {
2292      if (/^\s*yellow\s+/) {
2293         $count++;
2294      }
2295   }
2296
2297   return $count;
2298}
2299
2300sub count_red_indices {
2301   my $self = shift;
2302
2303   my $get = $self->show_indices or return;
2304
2305   my $count = 0;
2306   for (@$get) {
2307      if (/^\s*red\s+/) {
2308         $count++;
2309      }
2310   }
2311
2312   return $count;
2313}
2314
2315sub count_indices {
2316   my $self = shift;
2317
2318   my $get = $self->show_indices or return;
2319
2320   return scalar @$get;
2321}
2322
2323sub list_indices_status {
2324   my $self = shift;
2325
2326   my $get = $self->show_indices or return;
2327
2328   my $count_red = 0;
2329   my $count_yellow = 0;
2330   my $count_green = 0;
2331   for (@$get) {
2332      if (/^\s*red\s+/) {
2333         $count_red++;
2334      }
2335      elsif (/^\s*yellow\s+/) {
2336         $count_yellow++;
2337      }
2338      elsif (/^\s*green\s+/) {
2339         $count_green++;
2340      }
2341   }
2342
2343   return {
2344      red => $count_red,
2345      yellow => $count_yellow,
2346      green => $count_green,
2347   };
2348}
2349
2350sub count_shards {
2351   my $self = shift;
2352
2353   my $indices = $self->get_indices or return;
2354
2355   my $count = 0;
2356   for (@$indices) {
2357      $count += $_->{shards};
2358   }
2359
2360   return $count;
2361}
2362
2363sub count_size {
2364   my $self = shift;
2365
2366   my $indices = $self->get_indices or return;
2367
2368   my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
2369   $fn->kibi_suffix("kb");
2370   $fn->mebi_suffix("mb");
2371   $fn->gibi_suffix("gb");
2372   $fn->kilo_suffix("KB");
2373   $fn->mega_suffix("MB");
2374   $fn->giga_suffix("GB");
2375
2376   my $size = 0;
2377   for (@$indices) {
2378      $size += $fn->to_number($_->{size});
2379   }
2380
2381   return $fn->from_number($size);
2382}
2383
2384sub count_total_size {
2385   my $self = shift;
2386
2387   my $indices = $self->get_indices or return;
2388
2389   my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
2390   $fn->kibi_suffix("kb");
2391   $fn->mebi_suffix("mb");
2392   $fn->gibi_suffix("gb");
2393   $fn->kilo_suffix("KB");
2394   $fn->mega_suffix("MB");
2395   $fn->giga_suffix("GB");
2396
2397   my $size = 0;
2398   for (@$indices) {
2399      $size += $fn->to_number($_->{total_size});
2400   }
2401
2402   return $fn->from_number($size);
2403}
2404
2405sub count_count {
2406   my $self = shift;
2407
2408   my $indices = $self->get_indices or return;
2409
2410   my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
2411   $fn->kilo_suffix('k');
2412   $fn->mega_suffix('m');
2413   $fn->giga_suffix('M');
2414
2415   my $count = 0;
2416   for (@$indices) {
2417      $count += $_->{count};
2418   }
2419
2420   return $fn->from_number($count);
2421}
2422
2423sub list_green_indices {
2424   my $self = shift;
2425
2426   my $get = $self->get_indices or return;
2427
2428   my @indices = ();
2429   for (@$get) {
2430      if ($_->{color} eq 'green') {
2431         push @indices, $_->{index};
2432      }
2433   }
2434
2435   return \@indices;
2436}
2437
2438sub list_yellow_indices {
2439   my $self = shift;
2440
2441   my $get = $self->get_indices or return;
2442
2443   my @indices = ();
2444   for (@$get) {
2445      if ($_->{color} eq 'yellow') {
2446         push @indices, $_->{index};
2447      }
2448   }
2449
2450   return \@indices;
2451}
2452
2453sub list_red_indices {
2454   my $self = shift;
2455
2456   my $get = $self->get_indices or return;
2457
2458   my @indices = ();
2459   for (@$get) {
2460      if ($_->{color} eq 'red') {
2461         push @indices, $_->{index};
2462      }
2463   }
2464
2465   return \@indices;
2466}
2467
2468#
2469# https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html
2470#
2471sub list_datatypes {
2472   my $self = shift;
2473
2474   return {
2475      core => [ qw(string long integer short byte double float data boolean binary) ],
2476   };
2477}
2478
2479#
2480# Return total hits for last www_search
2481#
2482sub get_hits_total {
2483   my $self = shift;
2484
2485   # Retrieve data stored in the $RUN Variable from Context
2486   my $run = $self->context->do('$RUN');
2487   if (ref($run) eq 'HASH') {
2488      if (exists($run->{hits}) && exists($run->{hits}{total})) {
2489         return $run->{hits}{total};
2490      }
2491   }
2492
2493   return $self->log->error("get_hits_total: last Command not compatible");
2494}
2495
2496sub disable_shard_allocation {
2497   my $self = shift;
2498
2499   my $settings = {
2500      persistent => {
2501         'cluster.routing.allocation.enable' => 'none',
2502      }
2503   };
2504
2505   return $self->put_cluster_settings($settings);
2506}
2507
2508sub enable_shard_allocation {
2509   my $self = shift;
2510
2511   my $settings = {
2512      persistent => { 
2513         'cluster.routing.allocation.enable' => 'all',
2514      }
2515   };
2516
2517   return $self->put_cluster_settings($settings);
2518}
2519
2520sub flush_synced {
2521   my $self = shift;
2522
2523   my $es = $self->_es;
2524   $self->brik_help_run_undef_arg('open', $es) or return;
2525
2526   my $r;
2527   eval {
2528      $r = $es->indices->flush_synced;
2529   };
2530   if ($@) {
2531      chomp($@);
2532      return $self->log->error("flush_synced: failed: [$@]");
2533   }
2534
2535   return $r;
2536}
2537
2538#
2539# https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html
2540#
2541# run client::elasticsearch create_snapshot_repository myrepo
2542#      "{ type => 'fs', settings => { compress => 'true', location => '/path/' } }"
2543#
2544# You have to set path.repo in elasticsearch.yml like:
2545# path.repo: ["/home/gomor/es-backups"]
2546#
2547# Search::Elasticsearch::Client::2_0::Direct::Snapshot
2548#
2549sub create_snapshot_repository {
2550   my $self = shift;
2551   my ($body, $repository_name) = @_;
2552
2553   my $es = $self->_es;
2554   $self->brik_help_run_undef_arg('open', $es) or return;
2555   $self->brik_help_run_undef_arg('create_snapshot_repository', $body) or return;
2556
2557   $repository_name ||= 'repository';
2558
2559   my %args = (
2560      repository => $repository_name,
2561      body => $body,
2562   );
2563
2564   my $r;
2565   eval {
2566      $r = $es->snapshot->create_repository(%args);
2567   };
2568   if ($@) {
2569      chomp($@);
2570      return $self->log->error("create_snapshot_repository: failed: [$@]");
2571   }
2572
2573   return $r;
2574}
2575
2576sub create_shared_fs_snapshot_repository {
2577   my $self = shift;
2578   my ($location, $repository_name) = @_;
2579
2580   $repository_name ||= 'repository';
2581   $self->brik_help_run_undef_arg('create_shared_fs_snapshot_repository', $location) or return;
2582
2583   if ($location !~ m{^/}) {
2584      return $self->log->error("create_shared_fs_snapshot_repository: you have to give ".
2585         "a full directory path, this one is invalid [$location]");
2586   }
2587
2588   my $body = {
2589      type => 'fs',
2590      settings => {
2591         compress => 'true',
2592         location => $location,
2593      },
2594   };
2595
2596   return $self->create_snapshot_repository($body, $repository_name);
2597}
2598
2599#
2600# Search::Elasticsearch::Client::2_0::Direct::Snapshot
2601#
2602sub get_snapshot_repositories {
2603   my $self = shift;
2604
2605   my $es = $self->_es;
2606   $self->brik_help_run_undef_arg('open', $es) or return;
2607
2608   my $r;
2609   eval {
2610      $r = $es->snapshot->get_repository;
2611   };
2612   if ($@) {
2613      chomp($@);
2614      return $self->log->error("get_snapshot_repositories: failed: [$@]");
2615   }
2616
2617   return $r;
2618}
2619
2620#
2621# Search::Elasticsearch::Client::2_0::Direct::Snapshot
2622#
2623sub get_snapshot_status {
2624   my $self = shift;
2625
2626   my $es = $self->_es;
2627   $self->brik_help_run_undef_arg('open', $es) or return;
2628
2629   my $r;
2630   eval {
2631      $r = $es->snapshot->status;
2632   };
2633   if ($@) {
2634      chomp($@);
2635      return $self->log->error("get_snapshot_status: failed: [$@]");
2636   }
2637
2638   return $r;
2639}
2640
2641#
2642# Search::Elasticsearch::Client::5_0::Direct::Snapshot
2643#
2644sub create_snapshot {
2645   my $self = shift;
2646   my ($snapshot_name, $repository_name, $body) = @_;
2647
2648   my $es = $self->_es;
2649   $self->brik_help_run_undef_arg('open', $es) or return;
2650
2651   $snapshot_name ||= 'snapshot';
2652   $repository_name ||= 'repository';
2653
2654   my %args = (
2655      repository => $repository_name,
2656      snapshot => $snapshot_name,
2657   );
2658   if (defined($body)) {
2659      $args{body} = $body;
2660   }
2661
2662   my $r;
2663   eval {
2664      $r = $es->snapshot->create(%args);
2665   };
2666   if ($@) {
2667      chomp($@);
2668      return $self->log->error("create_snapshot: failed: [$@]");
2669   }
2670
2671   return $r;
2672}
2673
2674sub create_snapshot_for_indices {
2675   my $self = shift;
2676   my ($indices, $snapshot_name, $repository_name) = @_;
2677
2678   $self->brik_help_run_undef_arg('create_snapshot_for_indices', $indices) or return;
2679
2680   $snapshot_name ||= 'snapshot';
2681   $repository_name ||= 'repository';
2682
2683   my $body = {
2684      indices => $indices,
2685   };
2686
2687   return $self->create_snapshot($snapshot_name, $repository_name, $body);
2688}
2689
2690sub is_snapshot_finished {
2691   my $self = shift;
2692
2693   my $status = $self->get_snapshot_status or return;
2694
2695   if (@{$status->{snapshots}} == 0) {
2696      return 1;
2697   }
2698
2699   return 0;
2700}
2701
2702sub get_snapshot_state {
2703   my $self = shift;
2704
2705   if ($self->is_snapshot_finished) {
2706      return $self->log->info("get_snapshot_state: is already finished");
2707   }
2708
2709   my $status = $self->get_snapshot_status or return;
2710
2711   my @indices_done = ();
2712   my @indices_not_done = ();
2713
2714   my $list = $status->{snapshots};
2715   for my $snapshot (@$list) {
2716      my $indices = $snapshot->{indices};
2717      for my $index (@$indices) {
2718         my $done = $index->{shards_stats}{done};
2719         if ($done) {
2720            push @indices_done, $index;
2721         }
2722         else {
2723            push @indices_not_done, $index;
2724         }
2725      }
2726   }
2727
2728   return { done => \@indices_done, not_done => \@indices_not_done };
2729}
2730
2731sub verify_snapshot_repository {
2732}
2733
2734sub delete_snapshot_repository {
2735   my $self = shift;
2736   my ($repository_name) = @_;
2737
2738   my $es = $self->_es;
2739   $self->brik_help_run_undef_arg('open', $es) or return;
2740   $self->brik_help_run_undef_arg('delete_snapshot_repository', $repository_name) or return;
2741
2742   my $r;
2743   eval {
2744      $r = $es->snapshot->delete_repository(
2745         repository => $repository_name,
2746      );
2747   };
2748   if ($@) {
2749      chomp($@);
2750      return $self->log->error("delete_snapshot_repository: failed: [$@]");
2751   }
2752
2753   return $r;
2754}
2755
2756sub get_snapshot {
2757   my $self = shift;
2758   my ($snapshot_name, $repository_name) = @_;
2759
2760   my $es = $self->_es;
2761   $self->brik_help_run_undef_arg('open', $es) or return;
2762
2763   $snapshot_name ||= 'snapshot';
2764   $repository_name ||= 'repository';
2765
2766   my $r;
2767   eval {
2768      $r = $es->snapshot->get(
2769         repository => $repository_name,
2770         snapshot => $snapshot_name,
2771      );
2772   };
2773   if ($@) {
2774      chomp($@);
2775      return $self->log->error("get_snapshot: failed: [$@]");
2776   }
2777
2778   return $r;
2779}
2780
2781#
2782# Search::Elasticsearch::Client::5_0::Direct::Snapshot
2783#
2784sub delete_snapshot {
2785   my $self = shift;
2786   my ($snapshot_name, $repository_name) = @_;
2787
2788   my $es = $self->_es;
2789   $self->brik_help_run_undef_arg('open', $es) or return;
2790   $self->brik_help_run_undef_arg('delete_snapshot', $snapshot_name) or return;
2791   $self->brik_help_run_undef_arg('delete_snapshot', $repository_name) or return;
2792
2793   my $timeout = $self->rtimeout;
2794
2795   my $r;
2796   eval {
2797      $r = $es->snapshot->delete(
2798         repository => $repository_name,
2799         snapshot => $snapshot_name,
2800         master_timeout => "${timeout}s",
2801      );
2802   };
2803   if ($@) {
2804      chomp($@);
2805      return $self->log->error("delete_snapshot: failed: [$@]");
2806   }
2807
2808   return $r;
2809}
2810
2811#
2812# https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html
2813#
2814sub restore_snapshot {
2815   my $self = shift;
2816   my ($snapshot_name, $repository_name, $body) = @_;
2817
2818   my $es = $self->_es;
2819   $snapshot_name ||= 'snapshot';
2820   $repository_name ||= 'repository';
2821   $self->brik_help_run_undef_arg('open', $es) or return;
2822   $self->brik_help_run_undef_arg('restore_snapshot', $snapshot_name) or return;
2823   $self->brik_help_run_undef_arg('restore_snapshot', $repository_name) or return;
2824
2825   my %args = (
2826      repository => $repository_name,
2827      snapshot => $snapshot_name,
2828   );
2829   if (defined($body)) {
2830      $args{body} = $body;
2831   }
2832
2833   my $r;
2834   eval {
2835      $r = $es->snapshot->restore(%args);
2836   };
2837   if ($@) {
2838      chomp($@);
2839      return $self->log->error("restore_snapshot: failed: [$@]");
2840   }
2841
2842   return $r;
2843}
2844
2845sub restore_snapshot_for_indices {
2846   my $self = shift;
2847   my ($indices, $snapshot_name, $repository_name) = @_;
2848
2849   $snapshot_name ||= 'snapshot';
2850   $repository_name ||= 'repository';
2851   $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $indices) or return;
2852   $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $snapshot_name) or return;
2853   $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $repository_name) or return;
2854
2855   my $body = {
2856      indices => $indices,
2857   };
2858
2859   return $self->restore_snapshot($snapshot_name, $repository_name, $body);
2860}
2861
28621;
2863
2864__END__
2865
2866=head1 NAME
2867
2868Metabrik::Client::Elasticsearch - client::elasticsearch Brik
2869
2870=head1 SYNOPSIS
2871
2872   host:~> my $q = { term => { ip => "192.168.57.19" } }
2873   host:~> run client::elasticsearch open
2874   host:~> run client::elasticsearch query $q data-*
2875
2876=head1 DESCRIPTION
2877
2878Template to write a new Metabrik Brik.
2879
2880=head1 COPYRIGHT AND LICENSE
2881
2882Copyright (c) 2014-2017, Patrice E<lt>GomoRE<gt> Auffret
2883
2884You may distribute this module under the terms of The BSD 3-Clause License.
2885See LICENSE file in the source distribution archive.
2886
2887=head1 AUTHOR
2888
2889Patrice E<lt>GomoRE<gt> Auffret
2890
2891=cut
Note: See TracBrowser for help on using the repository browser.