source: repository/lib/Metabrik/Client/Elasticsearch.pm

Last change on this file was 969:50c217684c90, checked in by GomoR <gomor@…>, 5 weeks ago
  • new: api::onyphe: new API Commands
  • bugfix: client::elasticsearch: catch bulk_helper and index_bulk errors
  • update: client::elasticsearch: switch some info messages to verbose
  • update: client::kafka: randomize broker on create_connection Command
  • bugfix: crypto::x509: added to check when PubKeyAlg returns undef
  • update: file::write: on setting unbuffered Attribute
  • bugfix: system::virtualbox: restart_vboxnet is really named reset_vboxnet
File size: 81.2 KB
Line 
1#
2# $Id$
3#
4# client::elasticsearch Brik
5#
6package Metabrik::Client::Elasticsearch;
7use strict;
8use warnings;
9
10use base qw(Metabrik::Client::Rest);
11
12sub brik_properties {
13   return {
14      revision => '$Revision$',
15      tags => [ qw(unstable es es) ],
16      author => 'GomoR <GomoR[at]metabrik.org>',
17      license => 'http://opensource.org/licenses/BSD-3-Clause',
18      attributes => {
19         datadir => [ qw(datadir) ],
20         nodes => [ qw(node_list) ],
21         cxn_pool => [ qw(Sniff|Static|Static::NoPing) ],
22         date => [ qw(date) ],
23         index => [ qw(index) ],
24         type => [ qw(type) ],
25         from => [ qw(number) ],
26         size => [ qw(count) ],
27         max => [ qw(count) ],
28         max_flush_count => [ qw(count) ],
29         max_flush_size => [ qw(count) ],
30         rtimeout => [ qw(seconds) ],
31         sniff_rtimeout => [ qw(seconds) ],
32         try => [ qw(count) ],
33         use_bulk_autoflush => [ qw(0|1) ],
34         use_indexing_optimizations => [ qw(0|1) ],
35         csv_header => [ qw(fields) ],
36         csv_encoded_fields => [ qw(fields) ],
37         csv_object_fields => [ qw(fields) ],
38         _es => [ qw(INTERNAL) ],
39         _bulk => [ qw(INTERNAL) ],
40         _scroll => [ qw(INTERNAL) ],
41      },
42      attributes_default => {
43         nodes => [ qw(http://localhost:9200) ],
44         cxn_pool => 'Sniff',
45         from => 0,
46         size => 10,
47         max => 0,
48         index => '*',
49         type => '*',
50         rtimeout => 60,
51         sniff_rtimeout => 3,
52         try => 3,
53         max_flush_count => 1_000,
54         max_flush_size => 1_000_000,
55         use_bulk_autoflush => 1,
56         use_indexing_optimizations => 0,
57      },
58      commands => {
59         open => [ qw(nodes_list|OPTIONAL cxn_pool|OPTIONAL) ],
60         open_bulk_mode => [ qw(index|OPTIONAL type|OPTIONAL) ],
61         open_scroll_scan_mode => [ qw(index|OPTIONAL size|OPTIONAL) ],
62         open_scroll => [ qw(index|OPTIONAL size|OPTIONAL type|OPTIONAL query|OPTIONAL) ],
63         close_scroll => [ ],
64         total_scroll => [ ],
65         next_scroll => [ qw(count|OPTIONAL) ],
66         index_document => [ qw(document index|OPTIONAL type|OPTIONAL hash|OPTIONAL id|OPTIONAL) ],
67         index_bulk => [ qw(document index|OPTIONAL type|OPTIONAL hash|OPTIONAL id|OPTIONAL) ],
68         update_document => [ qw(document id index|OPTIONAL type|OPTIONAL hash|OPTIONAL) ],
69         update_document_bulk => [ qw(document index|OPTIONAL type|OPTIONAL hash|OPTIONAL id|OPTIONAL) ],
70         bulk_flush => [ qw(index|OPTIONAL) ],
71         query => [ qw($query_hash index|OPTIONAL type|OPTIONAL hash|OPTIONAL) ],
72         count => [ qw(index|OPTIONAL type|OPTIONAL) ],
73         get_from_id => [ qw(id index|OPTIONAL type|OPTIONAL) ],
74         www_search => [ qw(query index|OPTIONAL type|OPTIONAL) ],
75         delete_index => [ qw(index|indices_list) ],
76         update_alias => [ qw(new_index alias) ],
77         delete_document => [ qw(index type id) ],
78         delete_by_query => [ qw($query_hash index type) ],
79         show_indices => [ qw(string_filter|OPTIONAL) ],
80         show_nodes => [ ],
81         show_health => [ ],
82         show_recovery => [ ],
83         show_allocation => [ ],
84         list_indices => [ qw(regex|OPTIONAL) ],
85         get_indices => [ ],
86         get_index => [ qw(index|indices_list) ],
87         list_index_types => [ qw(index) ],
88         list_index_fields => [ qw(index) ],
89         list_indices_version => [ qw(index|indices_list) ],
90         open_index => [ qw(index|indices_list) ],
91         close_index => [ qw(index|indices_list) ],
92         get_aliases => [ qw(index) ],
93         put_alias => [ qw(index alias) ],
94         delete_alias => [ qw(index alias) ],
95         is_mapping_exists => [ qw(index mapping) ],
96         get_mappings => [ qw(index type|OPTIONAL) ],
97         create_index => [ qw(index) ],
98         create_index_with_mappings => [ qw(index mappings) ],
99         info => [ qw(nodes_list|OPTIONAL) ],
100         version => [ qw(nodes_list|OPTIONAL) ],
101         get_templates => [ ],
102         list_templates => [ ],
103         get_template => [ qw(name) ],
104         put_template => [ qw(name template) ],
105         put_template_from_json_file => [ qw(file) ],
106         update_template_from_json_file => [ qw(file) ],
107         get_settings => [ qw(index|indices_list|OPTIONAL name|names_list|OPTIONAL) ],
108         put_settings => [ qw(settings_hash index|indices_list|OPTIONAL) ],
109         set_index_number_of_replicas => [ qw(index|indices_list number) ],
110         set_index_refresh_interval => [ qw(index|indices_list number) ],
111         get_index_number_of_replicas => [ qw(index|indices) ],
112         get_index_refresh_interval => [ qw(index|indices_list) ],
113         get_index_number_of_shards => [ qw(index|indices_list) ],
114         delete_template => [ qw(name) ],
115         is_index_exists => [ qw(index) ],
116         is_type_exists => [ qw(index type) ],
117         is_document_exists => [ qw(index type document) ],
118         parse_error_string => [ qw(string) ],
119         refresh_index => [ qw(index) ],
120         export_as_csv => [ qw(index size|OPTIONAL callback|OPTIONAL) ],
121         import_from_csv => [ qw(input_csv index|OPTIONAL type|OPTIONAL hash|OPTIONAL callback|OPTIONAL) ],
122         get_stats_process => [ ],
123         get_process => [ ],
124         get_cluster_state => [ ],
125         get_cluster_health => [ ],
126         get_cluster_settings => [ ],
127         put_cluster_settings => [ qw(settings) ],
128         count_green_indices => [ ],
129         count_yellow_indices => [ ],
130         count_red_indices => [ ],
131         list_green_indices => [ ],
132         list_yellow_indices => [ ],
133         list_red_indices => [ ],
134         count_indices => [ ],
135         list_indices_status => [ ],
136         count_shards => [ ],
137         count_size => [ ],
138         count_total_size => [ ],
139         count_count => [ ],
140         list_datatypes => [ ],
141         get_hits_total => [ ],
142         disable_shard_allocation => [ ],
143         enable_shard_allocation => [ ],
144         flush_synced => [ ],
145         create_snapshot_repository => [ qw(body repository_name|OPTIONAL) ],
146         create_shared_fs_snapshot_repository => [ qw(location
147            repository_name|OPTIONAL) ],
148         get_snapshot_repositories => [ ],
149         get_snapshot_status => [ ],
150         delete_snapshot_repository => [ qw(repository_name) ],
151         create_snapshot => [ qw(snapshot_name|OPTIONAL repository_name|OPTIONAL
152            body|OPTIONAL) ],
153         create_snapshot_for_indices => [ qw(indices snapshot_name|OPTIONAL
154            repository_name|OPTIONAL) ],
155         is_snapshot_finished => [ ],
156         get_snapshot_state => [ ],
157         get_snapshot => [ qw(snapshot_name|OPTIONAL repository_name|OPTIONAL) ],
158         delete_snapshot => [ qw(snapshot_name repository_name) ],
159         restore_snapshot => [ qw(snapshot_name repository_name body|OPTIONAL) ],
160         restore_snapshot_for_indices => [ qw(indices snapshot_name repository_name) ],
161      },
162      require_modules => {
163         'Metabrik::String::Json' => [ ],
164         'Metabrik::File::Csv' => [ ],
165         'Metabrik::File::Json' => [ ],
166         'Metabrik::File::Dump' => [ ],
167         'Metabrik::Format::Number' => [ ],
168         'Search::Elasticsearch' => [ ],
169      },
170   };
171}
172
173sub brik_preinit {
174   my $self = shift;
175
176   eval("use Search::Elasticsearch;");
177   if ($Search::Elasticsearch::VERSION < 5) {
178      $self->log->error("brik_preinit: please upgrade Search::Elasticsearch module ".
179         "with: run perl::module install Search::Elasticsearch");
180   }
181
182   return $self->SUPER::brik_preinit;
183}
184
185sub open {
186   my $self = shift;
187   my ($nodes, $cxn_pool) = @_;
188
189   $nodes ||= $self->nodes;
190   $cxn_pool ||= $self->cxn_pool;
191   $self->brik_help_run_undef_arg('open', $nodes) or return;
192   $self->brik_help_run_undef_arg('open', $cxn_pool) or return;
193   $self->brik_help_run_invalid_arg('open', $nodes, 'ARRAY') or return;
194   $self->brik_help_run_empty_array_arg('open', $nodes) or return;
195
196   for my $node (@$nodes) {
197      if ($node !~ m{https?://}) {
198         return $self->log->error("open: invalid node[$node], must start with http(s)");
199      }
200   }
201
202   my $timeout = $self->rtimeout;
203
204   my $nodes_str = join('|', @$nodes);
205   $self->log->debug("open: using nodes [$nodes_str]");
206
207   #
208   # Timeout description here:
209   #
210   # Search::Elasticsearch::Role::Cxn
211   #
212
213   my $es = Search::Elasticsearch->new(
214      nodes => $nodes,
215      cxn_pool => $cxn_pool,
216      timeout => $timeout,
217      max_retries => $self->try,
218      retry_on_timeout => 1,
219      sniff_timeout => $self->sniff_rtimeout, # seconds, default 1
220      request_timeout => 60,  # seconds, default 30
221      ping_timeout => 5,  # seconds, default 2
222      dead_timeout => 120,  # seconds, detault 60
223      max_dead_timeout => 3600,  # seconds, default 3600
224      sniff_request_timeout => 15, # seconds, default 2
225      #trace_to => 'Stderr',  # For debug purposes
226   );
227   if (! defined($es)) {
228      return $self->log->error("open: failed");
229   }
230
231   $self->_es($es);
232
233   return $nodes;
234}
235
236#
237# Search::Elasticsearch::Client::5_0::Bulk
238#
239sub open_bulk_mode {
240   my $self = shift;
241   my ($index, $type) = @_;
242
243   $index ||= $self->index;
244   $type ||= $self->type;
245   my $es = $self->_es;
246   $self->brik_help_run_undef_arg('open', $es) or return;
247   $self->brik_help_run_undef_arg('open_bulk_mode', $index) or return;
248   $self->brik_help_run_undef_arg('open_bulk_mode', $type) or return;
249
250   my %args = (
251      index => $index,
252      type => $type,
253      on_error => sub {
254         #my ($action, $response, $i) = @_;
255
256         #print Data::Dumper::Dumper($action)."\n";
257         #print Data::Dumper::Dumper($response)."\n";
258         #print Data::Dumper::Dumper($i)."\n";
259         print Data::Dumper::Dumper(\@_)."\n";
260      },
261   );
262
263   if ($self->use_bulk_autoflush) {
264      my $max_count = $self->max_flush_count || 1_000;
265      my $max_size = $self->max_flush_size || 1_000_000;
266
267      $args{max_count} = $max_count;
268      $args{max_size} = $max_size;
269      $args{max_time} = 0;
270
271      $self->log->info("open_bulk_mode: opening with max_flush_count [$max_count] and ".
272         "max_flush_size [$max_size]");
273   }
274   else {
275      $args{max_count} = 0;
276      $args{max_size} = 0;
277      $args{max_time} = 0;
278      $args{on_error} = undef;
279      #$args{on_success} = sub {
280         #my ($action, $response, $i) = @_;
281      #};
282
283      $self->log->info("open_bulk_mode: opening without automatic flushing");
284   }
285
286   my $bulk;
287   eval {
288      $bulk = $es->bulk_helper(%args);
289   };
290   if ($@) {
291      chomp($@);
292      return $self->log->error("open_bulk_mode: failed: [$@]");
293   }
294
295   $self->_bulk($bulk);
296
297   return $self->nodes;
298}
299
300sub open_scroll_scan_mode {
301   my $self = shift;
302   my ($index, $size) = @_;
303
304   my $version = $self->version or return;
305   if ($version ge "5.0.0") {
306      return $self->log->error("open_scroll_scan_mode: Command not supported for ES version ".
307         "$version, try open_scroll Command instead");
308   }
309
310   $index ||= $self->index;
311   $size ||= $self->size;
312   my $es = $self->_es;
313   $self->brik_help_run_undef_arg('open', $es) or return;
314   $self->brik_help_run_undef_arg('open_scroll_scan_mode', $index) or return;
315   $self->brik_help_run_undef_arg('open_scroll_scan_mode', $size) or return;
316
317   my $scroll;
318   eval {
319      $scroll = $es->scroll_helper(
320         index => $index,
321         search_type => 'scan',
322         size => $size,
323      );
324   };
325   if ($@) {
326      chomp($@);
327      return $self->log->error("open_scroll_scan_mode: failed: $@");
328   }
329
330   $self->_scroll($scroll);
331
332   return $self->nodes;
333}
334
335#
336# Search::Elasticsearch::Client::5_0::Scroll
337#
338sub open_scroll {
339   my $self = shift;
340   my ($index, $size, $type, $query) = @_;
341
342   my $version = $self->version or return;
343   if ($version lt "5.0.0") {
344      return $self->log->error("open_scroll: Command not supported for ES version ".
345         "$version, try open_scroll_scan_mode Command instead");
346   }
347
348   $query ||= { query => { match_all => {} } };
349   $index ||= $self->index;
350   $type ||= $self->type;
351   $size ||= $self->size;
352   my $es = $self->_es;
353   $self->brik_help_run_undef_arg('open', $es) or return;
354   $self->brik_help_run_undef_arg('open_scroll', $index) or return;
355   $self->brik_help_run_undef_arg('open_scroll', $size) or return;
356
357   my $timeout = $self->rtimeout;
358
359   my %args = (
360      scroll => "${timeout}s",
361      scroll_in_qs => 1,  # By default (0), pass scroll_id in request body. When 1, pass
362                          # it in query string.
363      index => $index,
364      size => $size,
365      body => $query,
366   );
367   if ($type ne '*') {
368      $args{type} = $type;
369   }
370
371   #
372   # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html
373   #
374   my $scroll;
375   eval {
376      $scroll = $es->scroll_helper(%args);
377   };
378   if ($@) {
379      chomp($@);
380      return $self->log->error("open_scroll: failed: $@");
381   }
382
383   $self->_scroll($scroll);
384
385   $self->log->verbose("open_scroll: opened with size [$size] and timeout [${timeout}s]");
386
387   return $self->nodes;
388}
389
390#
391# Search::Elasticsearch::Client::5_0::Scroll
392#
393sub close_scroll {
394   my $self = shift;
395
396   my $scroll = $self->_scroll;
397   if (! defined($scroll)) {
398      return 1;
399   }
400
401   $scroll->finish;
402   $self->_scroll(undef);
403
404   return 1;
405}
406
407sub total_scroll {
408   my $self = shift;
409
410   my $scroll = $self->_scroll;
411   $self->brik_help_run_undef_arg('open_scroll', $scroll) or return;
412
413   my $total;
414   eval {
415      $total = $scroll->total;
416   };
417   if ($@) {
418      chomp($@);
419      return $self->log->error("total_scroll: failed with: [$@]");
420   }
421
422   return $total;
423}
424
425sub next_scroll {
426   my $self = shift;
427   my ($count) = @_;
428
429   $count ||= 1;
430
431   my $scroll = $self->_scroll;
432   $self->brik_help_run_undef_arg('open_scroll', $scroll) or return;
433
434   my $next;
435   eval {
436      if ($count > 1) {
437         my @docs = $scroll->next($count);
438         if (@docs > 0) {
439            $next = \@docs;
440         }
441      }
442      else {
443         $next = $scroll->next;
444      }
445   };
446   if ($@) {
447      chomp($@);
448      return $self->log->error("next_scroll: failed with: [$@]");
449   }
450
451   return $next;
452}
453
454#
455# Search::Elasticsearch::Client::5_0::Direct
456#
457sub index_document {
458   my $self = shift;
459   my ($doc, $index, $type, $hash, $id) = @_;
460
461   $index ||= $self->index;
462   $type ||= $self->type;
463   my $es = $self->_es;
464   $self->brik_help_run_undef_arg('open', $es) or return;
465   $self->brik_help_run_undef_arg('index_document', $doc) or return;
466   $self->brik_help_run_invalid_arg('index_document', $doc, 'HASH') or return;
467   $self->brik_help_set_undef_arg('index', $index) or return;
468   $self->brik_help_set_undef_arg('type', $type) or return;
469
470   my %args = (
471      index => $index,
472      type => $type,
473      body => $doc,
474   );
475   if (defined($id)) {
476      $args{id} = $id;
477   }
478
479   if (defined($hash)) {
480      $self->brik_help_run_invalid_arg('index_document', $hash, 'HASH') or return;
481      %args = ( %args, %$hash );
482   }
483
484   my $r;
485   eval {
486      $r = $es->index(%args);
487   };
488   if ($@) {
489      chomp($@);
490      return $self->log->error("index_document: index failed for index [$index]: [$@]");
491   }
492
493   return $r;
494}
495
496#
497# Search::Elasticsearch::Client::5_0::Direct
498#
499sub update_document {
500   my $self = shift;
501   my ($doc, $id, $index, $type, $hash) = @_;
502
503   $index ||= $self->index;
504   $type ||= $self->type;
505   my $es = $self->_es;
506   $self->brik_help_run_undef_arg('open', $es) or return;
507   $self->brik_help_run_undef_arg('update_document', $doc) or return;
508   $self->brik_help_run_invalid_arg('update_document', $doc, 'HASH') or return;
509   $self->brik_help_run_undef_arg('update_document', $id) or return;
510   $self->brik_help_set_undef_arg('index', $index) or return;
511   $self->brik_help_set_undef_arg('type', $type) or return;
512
513   my %args = (
514      id => $id,
515      index => $index,
516      type => $type,
517      body => { doc => $doc },
518   );
519
520   if (defined($hash)) {
521      $self->brik_help_run_invalid_arg('update_document', $hash, 'HASH') or return;
522      %args = ( %args, %$hash );
523   }
524
525   my $r;
526   eval {
527      $r = $es->update(%args);
528   };
529   if ($@) {
530      chomp($@);
531      return $self->log->error("update_document: index failed for index [$index]: [$@]");
532   }
533
534   return $r;
535}
536
537#
538# Search::Elasticsearch::Client::5_0::Bulk
539#
540sub index_bulk {
541   my $self = shift;
542   my ($doc, $index, $type, $hash, $id) = @_;
543
544   my $bulk = $self->_bulk;
545   $index ||= $self->index;
546   $type ||= $self->type;
547   $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
548   $self->brik_help_run_undef_arg('index_bulk', $doc) or return;
549   $self->brik_help_set_undef_arg('index', $index) or return;
550   $self->brik_help_set_undef_arg('type', $type) or return;
551
552   my %args = (
553      source => $doc,
554   );
555   if (defined($id)) {
556      $args{id} = $id;
557   }
558
559   if (defined($hash)) {
560      $self->brik_help_run_invalid_arg('index_bulk', $hash, 'HASH') or return;
561      %args = ( %args, %$hash );
562   }
563
564   my $r;
565   eval {
566      $r = $bulk->add_action(index => \%args);
567   };
568   if ($@) {
569      chomp($@);
570      my $p = $self->parse_error_string($@);
571      if (defined($p) && exists($p->{class})) {
572         my $class = $p->{class};
573         my $code = $p->{code};
574         my $node = $p->{node};
575         return $self->log->error("index_bulk: failed for index [$index] with error ".
576            "[$class] code [$code] for node [$node]");
577      }
578      else {
579         return $self->log->error("index_bulk: index failed for index [$index]: [$@]");
580      }
581   }
582
583   return $r;
584}
585
586sub update_document_bulk {
587   my $self = shift;
588   my ($doc, $index, $type, $hash, $id) = @_;
589
590   my $bulk = $self->_bulk;
591   $index ||= $self->index;
592   $type ||= $self->type;
593   $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
594   $self->brik_help_run_undef_arg('update_document_bulk', $doc) or return;
595   $self->brik_help_set_undef_arg('index', $index) or return;
596   $self->brik_help_set_undef_arg('type', $type) or return;
597
598   my %args = (
599      index => $index,
600      type => $type,
601      doc => $doc,
602   );
603   if (defined($id)) {
604      $args{id} = $id;
605   }
606
607   if (defined($hash)) {
608      $self->brik_help_run_invalid_arg('update_document_bulk', $hash, 'HASH') or return;
609      %args = ( %args, %$hash );
610   }
611
612   my $r;
613   eval {
614      $r = $bulk->update(\%args);
615   };
616   if ($@) {
617      chomp($@);
618      my $p = $self->parse_error_string($@);
619      if (defined($p) && exists($p->{class})) {
620         my $class = $p->{class};
621         my $code = $p->{code};
622         my $node = $p->{node};
623         return $self->log->error("update_document_bulk: failed for index [$index] ".
624            "with error [$class] code [$code] for node [$node]");
625      }
626      else {
627         return $self->log->error("update_document_bulk: index failed for ".
628            "index [$index]: [$@]");
629      }
630   }
631
632   return $r;
633}
634
635#
636# We may have to call refresh_index after a bulk_flush, so we give an additional
637# optional Argument for given index.
638#
639sub bulk_flush {
640   my $self = shift;
641   my ($index) = @_;
642
643   my $bulk = $self->_bulk;
644   $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
645
646   my $try = $self->try;
647
648RETRY:
649
650   my $r;
651   eval {
652      $r = $bulk->flush;
653   };
654   if ($@) {
655      chomp($@);
656      if (--$try == 0) {
657         my $p = $self->parse_error_string($@);
658         if (defined($p) && exists($p->{class})) {
659            my $class = $p->{class};
660            my $code = $p->{code};
661            my $node = $p->{node};
662            return $self->log->error("bulk_flush: failed after [$try] tries with error ".
663               "[$class] code [$code] for node [$node]");
664         }
665         else {
666            return $self->log->error("bulk_flush: failed after [$try]: [$@]");
667         }
668      }
669      $self->log->warning("bulk_flush: sleeping 10 seconds before retry cause error ".
670               "[$@]");
671      sleep 10;
672      goto RETRY;
673   }
674
675   if (defined($index)) {
676      $self->refresh_index($index);
677   }
678
679   return $r;
680}
681
682#
683# Search::Elasticsearch::Client::2_0::Direct
684# Search::Elasticsearch::Client::5_0::Direct
685#
686sub count {
687   my $self = shift;
688   my ($index, $type) = @_;
689
690   $index ||= $self->index;
691   $type ||= $self->type;
692   my $es = $self->_es;
693   $self->brik_help_run_undef_arg('open', $es) or return;
694
695   my %args = ();
696   if (defined($index) && $index ne '*') {
697      $args{index} = $index;
698   }
699   if (defined($type) && $type ne '*') {
700      $args{type} = $type;
701   }
702
703   #$args{body} = {
704      #query => {
705         #match => { title => 'Elasticsearch clients' },
706      #},
707   #}
708
709   my $r;
710   my $version = $self->version or return;
711   if ($version ge "5.0.0") {
712      eval {
713         $r = $es->count(%args);
714      };
715   }
716   else {
717      eval {
718         $r = $es->search(
719            index => $index,
720            type => $type,
721            search_type => 'count',
722            body => {
723               query => {
724                  match_all => {},
725               },
726            },
727         );
728      };
729   }
730   if ($@) {
731      chomp($@);
732      return $self->log->error("count: count failed for index [$index]: [$@]");
733   }
734
735   if ($version ge "5.0.0") {
736      if (exists($r->{count})) {
737         return $r->{count};
738      }
739   }
740   elsif (exists($r->{hits}) && exists($r->{hits}{total})) {
741      return $r->{hits}{total};
742   }
743
744   return $self->log->error("count: nothing found");
745}
746
747#
748# https://www.elastic.co/guide/en/elasticsearch/reference/current/full-text-queries.html
749# https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html
750#
751# Example: my $q = { query => { term => { ip => "192.168.57.19" } } }
752#
753sub query {
754   my $self = shift;
755   my ($query, $index, $type, $hash) = @_;
756
757   $index ||= $self->index;
758   $type ||= $self->type;
759   my $es = $self->_es;
760   $self->brik_help_run_undef_arg('open', $es) or return;
761   $self->brik_help_run_undef_arg('query', $query) or return;
762   $self->brik_help_set_undef_arg('index', $index) or return;
763   $self->brik_help_set_undef_arg('type', $type) or return;
764   $self->brik_help_run_invalid_arg('query', $query, 'HASH') or return;
765
766   my $timeout = $self->rtimeout;
767
768   my %args = (
769      index => $index,
770      body => $query,
771   );
772
773   if (defined($hash)) {
774      $self->brik_help_run_invalid_arg('query', $hash, 'HASH') or return;
775      %args = ( %args, %$hash );
776   }
777
778   if ($type ne '*') {
779      $args{type} = $type;
780   }
781
782   my $r;
783   eval {
784      $r = $es->search(%args);
785   };
786   if ($@) {
787      chomp($@);
788      return $self->log->error("query: failed for index [$index]: [$@]");
789   }
790
791   return $r;
792}
793
794sub get_from_id {
795   my $self = shift;
796   my ($id, $index, $type) = @_;
797
798   $index ||= $self->index;
799   $type ||= $self->type;
800   my $es = $self->_es;
801   $self->brik_help_run_undef_arg('open', $es) or return;
802   $self->brik_help_run_undef_arg('get_from_id', $id) or return;
803   $self->brik_help_set_undef_arg('index', $index) or return;
804   $self->brik_help_set_undef_arg('type', $type) or return;
805
806   my $r;
807   eval {
808      $r = $es->get(
809         index => $index,
810         type => $type,
811         id => $id,
812      );
813   };
814   if ($@) {
815      chomp($@);
816      return $self->log->error("get_from_id: get failed for index [$index]: [$@]");
817   }
818
819   return $r;
820}
821
822#
823# https://www.elastic.co/guide/en/elasticsearch/reference/current/search-uri-request.html
824#
825sub www_search {
826   my $self = shift;
827   my ($query, $index, $type) = @_;
828
829   $index ||= $self->index;
830   $type ||= $self->type;
831   $self->brik_help_run_undef_arg('www_search', $query) or return;
832   $self->brik_help_set_undef_arg('index', $index) or return;
833   $self->brik_help_set_undef_arg('type', $type) or return;
834
835   my $from = $self->from;
836   my $size = $self->size;
837
838   my $sj = Metabrik::String::Json->new_from_brik_init($self) or return;
839
840   my $nodes = $self->nodes;
841   for my $node (@$nodes) {
842      # http://localhost:9200/INDEX/TYPE/_search/?size=SIZE&q=QUERY
843      my $url = "$node/$index";
844      if ($type ne '*') {
845         $url .= "/$type";
846      }
847      $url .= "/_search/?from=$from&size=$size&q=".$query;
848
849      my $get = $self->SUPER::get($url) or next;
850      my $body = $get->{content};
851
852      my $decoded = $sj->decode($body) or next;
853
854      return $decoded;
855   }
856
857   return;
858}
859
860#
861# Search::Elasticsearch::Client::2_0::Direct::Indices
862#
863sub delete_index {
864   my $self = shift;
865   my ($index) = @_;
866
867   my $es = $self->_es;
868   $self->brik_help_run_undef_arg('open', $es) or return;
869   $self->brik_help_run_undef_arg('delete_index', $index) or return;
870   $self->brik_help_run_invalid_arg('delete_index', $index, 'ARRAY', 'SCALAR') or return;
871
872   my %args = (
873      index => $index,
874   );
875
876   my $r;
877   eval {
878      $r = $es->indices->delete(%args);
879   };
880   if ($@) {
881      chomp($@);
882      return $self->log->error("delete_index: delete failed for index [$index]: [$@]");
883   }
884
885   return $r;
886}
887
888#
889# Search::Elasticsearch::Client::2_0::Direct::Indices
890#
891sub delete_document {
892   my $self = shift;
893   my ($index, $type, $id, $hash) = @_;
894
895   my $es = $self->_es;
896   $self->brik_help_run_undef_arg('open', $es) or return;
897   $self->brik_help_run_undef_arg('delete_document', $index) or return;
898   $self->brik_help_run_undef_arg('delete_document', $type) or return;
899   $self->brik_help_run_undef_arg('delete_document', $id) or return;
900
901   my %args = (
902      index => $index,
903      type => $type,
904      id => $id,
905   );
906
907   if (defined($hash)) {
908      $self->brik_help_run_invalid_arg('delete_document', $hash, 'HASH') or return;
909      %args = ( %args, %$hash );
910   }
911
912   my $r;
913   eval {
914      $r = $es->delete(%args);
915   };
916   if ($@) {
917      chomp($@);
918      return $self->log->error("delete_document: delete failed for index [$index]: [$@]");
919   }
920
921   return $r;
922}
923
924#
925# https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html
926#
927# Example: my $q = { query => { term => { ip => "192.168.57.19" } } }
928#
929sub delete_by_query {
930   my $self = shift;
931   my ($query, $index, $type) = @_;
932
933   my $es = $self->_es;
934   $self->brik_help_run_undef_arg('open', $es) or return;
935   $self->brik_help_run_undef_arg('delete_by_query', $query) or return;
936   $self->brik_help_run_undef_arg('delete_by_query', $index) or return;
937   $self->brik_help_run_undef_arg('delete_by_query', $type) or return;
938   $self->brik_help_run_invalid_arg('delete_by_query', $query, 'HASH') or return;
939
940   my $timeout = $self->rtimeout;
941
942   my %args = (
943      index => $index,
944      type => $type,
945      body => $query,
946   );
947
948   my $r;
949   eval {
950      $r = $es->delete_by_query(%args);
951   };
952   if ($@) {
953      chomp($@);
954      return $self->log->error("delete_by_query: failed for index [$index]: [$@]");
955   }
956
957   # This may fail, we ignore it.
958   $self->refresh_index($index);
959
960   return $r;
961}
962
963#
964# Search::Elasticsearch::Client::2_0::Direct::Cat
965#
966# https://www.elastic.co/guide/en/elasticsearch/reference/current/cat-indices.html
967#
968sub show_indices {
969   my $self = shift;
970   my ($string) = @_;
971
972   my $es = $self->_es;
973   $self->brik_help_run_undef_arg('open', $es) or return;
974
975   my $r;
976   eval {
977      $r = $es->cat->indices;
978   };
979   if ($@) {
980      chomp($@);
981      return $self->log->error("show_indices: failed: [$@]");
982   }
983
984   my @lines = split(/\n/, $r);
985
986   if (@lines == 0) {
987      $self->log->warning("show_indices: nothing returned, no index?");
988   }
989
990   my @filtered = ();
991   if (defined($string)) {
992      for (@lines) {
993         if (m{$string}) {
994            push @filtered, $_;
995         }
996      }
997      @lines = @filtered;
998   }
999
1000   return \@lines;
1001}
1002
1003#
1004# Search::Elasticsearch::Client::2_0::Direct::Cat
1005#
1006sub show_nodes {
1007   my $self = shift;
1008
1009   my $es = $self->_es;
1010   $self->brik_help_run_undef_arg('open', $es) or return;
1011
1012   my $r;
1013   eval {
1014      $r = $es->cat->nodes;
1015   };
1016   if ($@) {
1017      chomp($@);
1018      return $self->log->error("show_nodes: failed: [$@]");
1019   }
1020
1021   my @lines = split(/\n/, $r);
1022
1023   if (@lines == 0) {
1024      $self->log->warning("show_nodes: nothing returned, no nodes?");
1025   }
1026
1027   return \@lines;
1028}
1029
1030#
1031# Search::Elasticsearch::Client::2_0::Direct::Cat
1032#
1033sub show_health {
1034   my $self = shift;
1035
1036   my $es = $self->_es;
1037   $self->brik_help_run_undef_arg('open', $es) or return;
1038
1039   my $r;
1040   eval {
1041      $r = $es->cat->health;
1042   };
1043   if ($@) {
1044      chomp($@);
1045      return $self->log->error("show_health: failed: [$@]");
1046   }
1047
1048   my @lines = split(/\n/, $r);
1049
1050   if (@lines == 0) {
1051      $self->log->warning("show_health: nothing returned, no recovery?");
1052   }
1053
1054   return \@lines;
1055}
1056
1057#
1058# Search::Elasticsearch::Client::2_0::Direct::Cat
1059#
1060sub show_recovery {
1061   my $self = shift;
1062
1063   my $es = $self->_es;
1064   $self->brik_help_run_undef_arg('open', $es) or return;
1065
1066   my $r;
1067   eval {
1068      $r = $es->cat->recovery;
1069   };
1070   if ($@) {
1071      chomp($@);
1072      return $self->log->error("show_recovery: failed: [$@]");
1073   }
1074
1075   my @lines = split(/\n/, $r);
1076
1077   if (@lines == 0) {
1078      $self->log->warning("show_recovery: nothing returned, no index?");
1079   }
1080
1081   return \@lines;
1082}
1083
1084#
1085# curl -s 'localhost:9200/_cat/allocation?v'
1086#
1087sub show_allocation {
1088   my $self = shift;
1089
1090   my $es = $self->_es;
1091   $self->brik_help_run_undef_arg('open', $es) or return;
1092
1093   my $r;
1094   eval {
1095      $r = $es->cat->allocation;
1096   };
1097   if ($@) {
1098      chomp($@);
1099      return $self->log->error("show_allocation: failed: [$@]");
1100   }
1101
1102   my @lines = split(/\n/, $r);
1103
1104   if (@lines == 0) {
1105      $self->log->warning("show_allocation: nothing returned, no index?");
1106   }
1107
1108   return \@lines;
1109}
1110
1111sub list_indices {
1112   my $self = shift;
1113   my ($regex) = @_;
1114
1115   my $get = $self->get_indices or return;
1116
1117   my @indices = ();
1118   for (@$get) {
1119      if (defined($regex)) {
1120         if ($_->{index} =~ m{$regex}) {
1121            push @indices, $_->{index};
1122         }
1123      }
1124      else {
1125         push @indices, $_->{index};
1126      }
1127   }
1128
1129   return [ sort { $a cmp $b } @indices ];
1130}
1131
1132sub get_indices {
1133   my $self = shift;
1134
1135   my $lines = $self->show_indices or return;
1136   if (@$lines == 0) {
1137      $self->log->warning("get_indices: no index found");
1138      return [];
1139   }
1140
1141   #
1142   # Format depends on ElasticSearch version. We try to detect the format.
1143   #
1144   # 5.0.0:
1145   # "yellow open www-2016-08-14 BmNE9RaBRSCKqB5Oe8yZcw 5 1  146 0 251.8kb 251.8kb"
1146   #
1147   my @indices = ();
1148   for (@$lines) {
1149      my @t = split(/\s+/);
1150      if (@t == 10) {  # Version 5.0.0
1151         my $color = $t[0];
1152         my $state = $t[1];
1153         my $index = $t[2];
1154         my $id = $t[3];
1155         my $shards = $t[4];
1156         my $replicas = $t[5];
1157         my $count = $t[6];
1158         my $count2 = $t[7];
1159         my $total_size = $t[8];
1160         my $size = $t[9];
1161         push @indices, {
1162            color => $color,
1163            state => $state,
1164            index => $index,
1165            id => $id,
1166            shards => $shards,
1167            replicas => $replicas,
1168            count => $count,
1169            total_size => $total_size,
1170            size => $size,
1171         };
1172      }
1173      elsif (@t == 9) {
1174         my $index = $t[2];
1175         push @indices, {
1176            index => $index,
1177         };
1178      }
1179      elsif (@t == 8) {
1180         my $index = $t[1];
1181         push @indices, {
1182            index => $index,
1183         };
1184      }
1185   }
1186
1187   return \@indices;
1188}
1189
1190#
1191# Search::Elasticsearch::Client::5_0::Direct::Indices
1192#
1193sub get_index {
1194   my $self = shift;
1195   my ($index) = @_;
1196 
1197   my $es = $self->_es;
1198   $self->brik_help_run_undef_arg('open', $es) or return;
1199   $self->brik_help_run_undef_arg('get_index', $index) or return;
1200   $self->brik_help_run_invalid_arg('get_index', $index, 'ARRAY', 'SCALAR') or return;
1201
1202   my %args = (
1203      index => $index,
1204   );
1205
1206   my $r;
1207   eval {
1208      $r = $es->indices->get(%args);
1209   };
1210   if ($@) {
1211      chomp($@);
1212      return $self->log->error("get_index: get failed for index [$index]: [$@]");
1213   }
1214
1215   return $r;
1216}
1217
1218sub list_index_types {
1219   my $self = shift;
1220   my ($index) = @_;
1221
1222   my $es = $self->_es;
1223   $self->brik_help_run_undef_arg('open', $es) or return;
1224   $self->brik_help_run_undef_arg('list_index_types', $index) or return;
1225   $self->brik_help_run_invalid_arg('list_index_types', $index, 'SCALAR') or return;
1226
1227   my $r = $self->get_mappings($index) or return;
1228   if (keys %$r > 1) {
1229      return $self->log->error("list_index_types: multiple indices found, choose one");
1230   }
1231
1232   my @types = ();
1233   for my $this_index (keys %$r) {
1234      my $mappings = $r->{$this_index}{mappings};
1235      push @types, keys %$mappings;
1236   }
1237
1238   my %uniq = map { $_ => 1 } @types;
1239
1240   return [ sort { $a cmp $b } keys %uniq ];
1241}
1242
1243#
1244# By default, if you provide only one index and no type,
1245# all types will be merged (including _default_)
1246# If you specify one type (other than _default_), _default_ will be merged to it.
1247#
1248sub list_index_fields {
1249   my $self = shift;
1250   my ($index, $type) = @_;
1251
1252   my $es = $self->_es;
1253   $self->brik_help_run_undef_arg('open', $es) or return;
1254   $self->brik_help_run_undef_arg('list_index_fields', $index) or return;
1255   $self->brik_help_run_invalid_arg('list_index_fields', $index, 'SCALAR') or return;
1256
1257   my $r;
1258   if (defined($type)) {
1259      $r = $self->get_mappings($index, $type) or return;
1260      if (keys %$r > 1) {
1261         return $self->log->error("list_index_fields: multiple indices found, ".
1262            "choose one");
1263      }
1264      # _default_ mapping may not exists.
1265      if ($self->is_mapping_exists($index, '_default_')) {
1266         my $r2 = $self->get_mappings($index, '_default_');
1267         # Merge
1268         for my $this_index (keys %$r2) {
1269            my $default = $r2->{$this_index}{mappings}{'_default_'};
1270            $r->{$this_index}{mappings}{_default_} = $default;
1271         }
1272      }
1273   }
1274   else {
1275      $r = $self->get_mappings($index) or return;
1276      if (keys %$r > 1) {
1277         return $self->log->error("list_index_fields: multiple indices found, ".
1278            "choose one");
1279      }
1280   }
1281
1282   my @fields = ();
1283   for my $this_index (keys %$r) {
1284      my $mappings = $r->{$this_index}{mappings};
1285      for my $this_type (keys %$mappings) {
1286         my $properties = $mappings->{$this_type}{properties};
1287         push @fields, keys %$properties;
1288      }
1289   }
1290
1291   my %uniq = map { $_ => 1 } @fields;
1292
1293   return [ sort { $a cmp $b } keys %uniq ];
1294}
1295
1296sub list_indices_version {
1297   my $self = shift;
1298   my ($index) = @_;
1299
1300   my $es = $self->_es;
1301   $self->brik_help_run_undef_arg('open', $es) or return;
1302   $self->brik_help_run_undef_arg('list_indices_version', $index) or return;
1303   $self->brik_help_run_invalid_arg('list_indices_version', $index, 'ARRAY', 'SCALAR')
1304      or return;
1305
1306   my $r = $self->get_index($index) or return;
1307
1308   my @list = ();
1309   for my $this (keys %$r) {
1310      my $name = $this;
1311      my $version = $r->{$this}{settings}{index}{version}{created};
1312      push @list, {
1313         index => $name,
1314         version => $version,
1315      };
1316   }
1317
1318   return \@list;
1319}
1320
1321sub open_index {
1322   my $self = shift;
1323   my ($index) = @_;
1324
1325   my $es = $self->_es;
1326   $self->brik_help_run_undef_arg('open', $es) or return;
1327   $self->brik_help_run_undef_arg('open_index', $index) or return;
1328   $self->brik_help_run_invalid_arg('open_index', $index, 'ARRAY', 'SCALAR') or return;
1329
1330   my $r;
1331   eval {
1332      $r = $es->indices->open(
1333         index => $index,
1334      );
1335   };
1336   if ($@) {
1337      chomp($@);
1338      return $self->log->error("open_index: failed: [$@]");
1339   }
1340
1341   return $r;
1342}
1343
1344sub close_index {
1345   my $self = shift;
1346   my ($index) = @_;
1347
1348   my $es = $self->_es;
1349   $self->brik_help_run_undef_arg('open', $es) or return;
1350   $self->brik_help_run_undef_arg('close_index', $index) or return;
1351   $self->brik_help_run_invalid_arg('close_index', $index, 'ARRAY', 'SCALAR') or return;
1352
1353   my $r;
1354   eval {
1355      $r = $es->indices->close(
1356         index => $index,
1357      );
1358   };
1359   if ($@) {
1360      chomp($@);
1361      return $self->log->error("close_index: failed: [$@]");
1362   }
1363
1364   return $r;
1365}
1366
1367#
1368# Search::Elasticsearch::Client::5_0::Direct::Indices
1369#
1370sub get_aliases {
1371   my $self = shift;
1372   my ($index) = @_;
1373
1374   $index ||= $self->index;
1375   my $es = $self->_es;
1376   $self->brik_help_run_undef_arg('open', $es) or return;
1377
1378   my %args = (
1379      index => $index,
1380   );
1381
1382   my $r;
1383   eval {
1384      $r = $es->indices->get(%args);
1385   };
1386   if ($@) {
1387      chomp($@);
1388      return $self->log->error("get_aliases: get_aliases failed: [$@]");
1389   }
1390
1391   my %aliases = ();
1392   for my $this (keys %$r) {
1393      $aliases{$this} = $r->{$this}{aliases};
1394   }
1395
1396   return \%aliases;
1397}
1398
1399#
1400# Search::Elasticsearch::Client::5_0::Direct::Indices
1401#
1402sub put_alias {
1403   my $self = shift;
1404   my ($index, $alias) = @_;
1405
1406   my $es = $self->_es;
1407   $self->brik_help_run_undef_arg('open', $es) or return;
1408   $self->brik_help_run_undef_arg('put_alias', $index) or return;
1409   $self->brik_help_run_undef_arg('put_alias', $alias) or return;
1410
1411   my %args = (
1412      index => $index,
1413      name => $alias,
1414   );
1415
1416   my $r;
1417   eval {
1418      $r = $es->indices->put_alias(%args);
1419   };
1420   if ($@) {
1421      chomp($@);
1422      return $self->log->error("put_alias: put_alias failed: [$@]");
1423   }
1424
1425   return $r;
1426}
1427
1428#
1429# Search::Elasticsearch::Client::5_0::Direct::Indices
1430#
1431sub delete_alias {
1432   my $self = shift;
1433   my ($index, $alias) = @_;
1434
1435   my $es = $self->_es;
1436   $self->brik_help_run_undef_arg('open', $es) or return;
1437   $self->brik_help_run_undef_arg('delete_alias', $index) or return;
1438   $self->brik_help_run_undef_arg('delete_alias', $alias) or return;
1439
1440   my %args = (
1441      index => $index,
1442      name => $alias,
1443   );
1444
1445   my $r;
1446   eval {
1447      $r = $es->indices->delete_alias(%args);
1448   };
1449   if ($@) {
1450      chomp($@);
1451      return $self->log->error("delete_alias: delete_alias failed: [$@]");
1452   }
1453
1454   return $r;
1455}
1456
1457sub update_alias {
1458   my $self = shift;
1459   my ($new_index, $alias) = @_;
1460
1461   my $es = $self->_es;
1462   $self->brik_help_run_undef_arg('open', $es) or return;
1463   $self->brik_help_run_undef_arg('update_alias', $new_index) or return;
1464   $self->brik_help_run_undef_arg('update_alias', $alias) or return;
1465
1466   # Search for previous index with that alias, if any.
1467   my $prev_index;
1468   my $aliases = $self->get_aliases or return;
1469   while (my ($k, $v) = each %$aliases) {
1470      for my $this (keys %$v) {
1471         if ($this eq $alias) {
1472            $prev_index = $k;
1473            last;
1474         }
1475      }
1476      last if $prev_index;
1477   }
1478
1479   # Delete previous alias if it exists.
1480   if (defined($prev_index)) {
1481      $self->delete_alias($prev_index, $alias) or return;
1482   }
1483
1484   return $self->put_alias($new_index, $alias);
1485}
1486
1487sub is_mapping_exists {
1488   my $self = shift;
1489   my ($index, $mapping) = @_;
1490
1491   $self->brik_help_run_undef_arg('is_mapping_exists', $index) or return;
1492   $self->brik_help_run_undef_arg('is_mapping_exists', $mapping) or return;
1493
1494   if (! $self->is_index_exists($index)) {
1495      return 0;
1496   }
1497
1498   my $all = $self->get_mappings($index) or return;
1499   for my $this_index (keys %$all) {
1500      my $mappings = $all->{$this_index}{mappings};
1501      for my $this_mapping (keys %$mappings) {
1502         if ($this_mapping eq $mapping) {
1503            return 1;
1504         }
1505      }
1506   }
1507
1508   return 0;
1509}
1510
1511#
1512# Search::Elasticsearch::Client::2_0::Direct::Indices
1513#
1514sub get_mappings {
1515   my $self = shift;
1516   my ($index, $type) = @_;
1517
1518   my $es = $self->_es;
1519   $self->brik_help_run_undef_arg('open', $es) or return;
1520   $self->brik_help_run_undef_arg('get_mappings', $index) or return;
1521   $self->brik_help_run_invalid_arg('get_mappings', $index, 'ARRAY', 'SCALAR') or return;
1522
1523   my %args = (
1524      index => $index,
1525      type => $type,
1526   );
1527
1528   my $r;
1529   eval {
1530      $r = $es->indices->get_mapping(%args);
1531   };
1532   if ($@) {
1533      chomp($@);
1534      return $self->log->error("get_mappings: get_mapping failed for index [$index]: ".
1535         "[$@]");
1536   }
1537
1538   return $r;
1539}
1540
1541#
1542# Search::Elasticsearch::Client::2_0::Direct::Indices
1543#
1544sub create_index {
1545   my $self = shift;
1546   my ($index, $shards_count) = @_;
1547
1548   my $es = $self->_es;
1549   $self->brik_help_run_undef_arg('open', $es) or return;
1550   $self->brik_help_run_undef_arg('create_index', $index) or return;
1551         
1552   my $r;
1553   eval {
1554      $r = $es->indices->create(
1555         index => $index,
1556      );
1557   };
1558   if ($@) {
1559      chomp($@);
1560      return $self->log->error("create_index: create failed for index [$index]: [$@]");
1561   }
1562   
1563   return $r;
1564}
1565
1566#
1567# https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html
1568#
1569sub create_index_with_mappings {
1570   my $self = shift;
1571   my ($index, $mappings) = @_;
1572
1573   my $es = $self->_es;
1574   $self->brik_help_run_undef_arg('open', $es) or return;
1575   $self->brik_help_run_undef_arg('create_index_with_mappings', $index) or return;
1576   $self->brik_help_run_undef_arg('create_index_with_mappings', $mappings) or return;
1577   $self->brik_help_run_invalid_arg('create_index_with_mappings', $mappings, 'HASH') or return;
1578
1579   my $r;
1580   eval {
1581      $r = $es->indices->create(
1582         index => $index,
1583         body => {
1584            mappings => $mappings,
1585         },
1586      );
1587   };
1588   if ($@) {
1589      chomp($@);
1590      return $self->log->error("create_index_with_mappings: create failed for index [$index]: [$@]");
1591   }
1592
1593   return $r;
1594}
1595
1596# GET http://localhost:9200/
1597sub info {
1598   my $self = shift;
1599   my ($nodes) = @_;
1600
1601   $nodes ||= $self->nodes;
1602   $self->brik_help_run_undef_arg('info', $nodes) or return;
1603   $self->brik_help_run_invalid_arg('info', $nodes, 'ARRAY') or return;
1604   $self->brik_help_run_empty_array_arg('info', $nodes) or return;
1605
1606   my $first = $nodes->[0];
1607
1608   $self->get($first) or return;
1609
1610   return $self->content;
1611}
1612
1613sub version {
1614   my $self = shift;
1615   my ($nodes) = @_;
1616
1617   $nodes ||= $self->nodes;
1618   $self->brik_help_run_undef_arg('version', $nodes) or return;
1619   $self->brik_help_run_invalid_arg('version', $nodes, 'ARRAY') or return;
1620   $self->brik_help_run_empty_array_arg('version', $nodes) or return;
1621
1622   my $first = $nodes->[0];
1623
1624   $self->get($first) or return;
1625   my $content = $self->content or return;
1626
1627   return $content->{version}{number};
1628}
1629
1630#
1631# Search::Elasticsearch::Client::2_0::Direct::Indices
1632#
1633sub get_templates {
1634   my $self = shift;
1635
1636   my $es = $self->_es;
1637   $self->brik_help_run_undef_arg('open', $es) or return;
1638
1639   my $r;
1640   eval {
1641      $r = $es->indices->get_template;
1642   };
1643   if ($@) {
1644      chomp($@);
1645      return $self->log->error("get_templates: failed: [$@]");
1646   }
1647
1648   return $r;
1649}
1650
1651sub list_templates {
1652   my $self = shift;
1653
1654   my $content = $self->get_templates or return;
1655
1656   return [ sort { $a cmp $b } keys %$content ];
1657}
1658
1659#
1660# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
1661#
1662sub get_template {
1663   my $self = shift;
1664   my ($template) = @_;
1665
1666   my $es = $self->_es;
1667   $self->brik_help_run_undef_arg('open', $es) or return;
1668   $self->brik_help_run_undef_arg('get_template', $template) or return;
1669
1670   my $r;
1671   eval {
1672      $r = $es->indices->get_template(
1673         name => $template,
1674      );
1675   };
1676   if ($@) {
1677      chomp($@);
1678      return $self->log->error("get_template: template failed for name [$template]: [$@]");
1679   }
1680
1681   return $r;
1682}
1683
1684#
1685# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
1686#
1687sub put_template {
1688   my $self = shift;
1689   my ($name, $template) = @_;
1690
1691   my $es = $self->_es;
1692   $self->brik_help_run_undef_arg('open', $es) or return;
1693   $self->brik_help_run_undef_arg('put_template', $name) or return;
1694   $self->brik_help_run_undef_arg('put_template', $template) or return;
1695   $self->brik_help_run_invalid_arg('put_template', $template, 'HASH') or return;
1696
1697   my $r;
1698   eval {
1699      $r = $es->indices->put_template(
1700         name => $name,
1701         body => $template,
1702      );
1703   };
1704   if ($@) {
1705      chomp($@);
1706      return $self->log->error("put_template: template failed for name [$name]: [$@]");
1707   }
1708
1709   return $r;
1710}
1711
1712sub put_template_from_json_file {
1713   my $self = shift;
1714   my ($json_file) = @_;
1715
1716   my $es = $self->_es;
1717   $self->brik_help_run_undef_arg('open', $es) or return;
1718   $self->brik_help_run_undef_arg('put_template_from_json_file', $json_file) or return;
1719   $self->brik_help_run_file_not_found('put_template_from_json_file', $json_file) or return;
1720
1721   my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
1722   my $data = $fj->read($json_file) or return;
1723
1724   if (! exists($data->{template})) {
1725      return $self->log->error("put_template_from_json_file: no template name found");
1726   }
1727
1728   my $name = $data->{template};
1729
1730   return $self->put_template($name, $data);
1731}
1732
1733sub update_template_from_json_file {
1734   my $self = shift;
1735   my ($json_file) = @_;
1736
1737   my $es = $self->_es;
1738   $self->brik_help_run_undef_arg('open', $es) or return;
1739   $self->brik_help_run_undef_arg('update_template_from_json_file', $json_file) or return;
1740   $self->brik_help_run_file_not_found('update_template_from_json_file', $json_file) or return;
1741
1742   my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
1743   my $data = $fj->read($json_file) or return;
1744
1745   if (! exists($data->{template})) {
1746      return $self->log->error("put_template_from_json_file: no template name found");
1747   }
1748
1749   my $name = $data->{template};
1750
1751   $self->delete_template($name);  # We ignore errors, template may not exist.
1752
1753   return $self->put_template($name, $data);
1754}
1755
1756#
1757# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-settings.html
1758# Search::Elasticsearch::Client::2_0::Direct::Indices
1759#
1760sub get_settings {
1761   my $self = shift;
1762   my ($indices, $names) = @_;
1763
1764   my $es = $self->_es;
1765   $self->brik_help_run_undef_arg('open', $es) or return;
1766
1767   my %args = ();
1768   if (defined($indices)) {
1769      $self->brik_help_run_undef_arg('get_settings', $indices) or return;
1770      my $ref = $self->brik_help_run_invalid_arg('get_settings', $indices, 'ARRAY', 'SCALAR')
1771         or return;
1772      $args{index} = $indices;
1773   }
1774   if (defined($names)) {
1775      $self->brik_help_run_file_not_found('get_settings', $names) or return;
1776      my $ref = $self->brik_help_run_invalid_arg('get_settings', $names, 'ARRAY', 'SCALAR')
1777         or return;
1778      $args{name} = $names;
1779   }
1780
1781   my $r;
1782   eval {
1783      $r = $es->indices->get_settings(%args);
1784   };
1785   if ($@) {
1786      chomp($@);
1787      return $self->log->error("get_settings: failed: [$@]");
1788   }
1789
1790   return $r;
1791}
1792
1793#
1794# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-settings.html
1795# Search::Elasticsearch::Client::2_0::Direct::Indices
1796#
1797# Example:
1798#
1799# run client::elasticsearch put_settings "{ index => { refresh_interval => -1 } }"
1800#
1801# XXX: should be renamed to put_index_settings
1802#
1803sub put_settings {
1804   my $self = shift;
1805   my ($settings, $indices) = @_;
1806
1807   my $es = $self->_es;
1808   $self->brik_help_run_undef_arg('open', $es) or return;
1809   $self->brik_help_run_undef_arg('put_settings', $settings) or return;
1810   $self->brik_help_run_invalid_arg('put_settings', $settings, 'HASH') or return;
1811
1812   my %args = (
1813      body => $settings,
1814   );
1815   if (defined($indices)) {
1816      $self->brik_help_run_undef_arg('put_settings', $indices) or return;
1817      my $ref = $self->brik_help_run_invalid_arg('put_settings', $indices, 'ARRAY', 'SCALAR')
1818         or return;
1819      $args{index} = $indices;
1820   }
1821
1822   my $r;
1823   eval {
1824      $r = $es->indices->put_settings(%args);
1825   };
1826   if ($@) {
1827      chomp($@);
1828      return $self->log->error("put_settings: failed: [$@]");
1829   }
1830
1831   return $r;
1832}
1833
1834sub set_index_number_of_replicas {
1835   my $self = shift;
1836   my ($indices, $number) = @_;
1837
1838   my $es = $self->_es;
1839   $self->brik_help_run_undef_arg('open', $es) or return;
1840   $self->brik_help_run_undef_arg('set_index_number_of_replicas', $indices) or return;
1841   $self->brik_help_run_invalid_arg('set_index_number_of_replicas', $indices, 'ARRAY', 'SCALAR')
1842      or return;
1843
1844   my $settings = { number_of_replicas => $number };
1845
1846   return $self->put_settings($settings, $indices);
1847}
1848
1849sub set_index_refresh_interval {
1850   my $self = shift;
1851   my ($indices, $number) = @_;
1852
1853   my $es = $self->_es;
1854   $self->brik_help_run_undef_arg('open', $es) or return;
1855   $self->brik_help_run_undef_arg('set_index_refresh_interval', $indices) or return;
1856   $self->brik_help_run_invalid_arg('set_index_refresh_interval', $indices, 'ARRAY', 'SCALAR')
1857      or return;
1858
1859   # If there is a meaningful value not postfixed with a unity,
1860   # we default to add a `s' for a number of seconds.
1861   if ($number =~ /^\d+$/ && $number > 0) {
1862      $number .= 's';
1863   }
1864
1865   my $settings = { refresh_interval => $number };
1866
1867   return $self->put_settings($settings, $indices);
1868}
1869
1870sub get_index_number_of_replicas {
1871   my $self = shift;
1872   my ($indices) = @_;
1873
1874   my $es = $self->_es;
1875   $self->brik_help_run_undef_arg('open', $es) or return;
1876   $self->brik_help_run_undef_arg('get_index_number_of_replicas', $indices) or return;
1877   $self->brik_help_run_invalid_arg('get_index_number_of_replicas', $indices, 'ARRAY', 'SCALAR')
1878      or return;
1879
1880   my $settings = $self->get_settings($indices);
1881
1882   my %indices = ();
1883   for (keys %$settings) {
1884      $indices{$_} = $settings->{$_}{settings}{index}{number_of_replicas};
1885   }
1886
1887   return \%indices;
1888}
1889
1890sub get_index_refresh_interval {
1891   my $self = shift;
1892   my ($indices, $number) = @_;
1893
1894   my $es = $self->_es;
1895   $self->brik_help_run_undef_arg('open', $es) or return;
1896   $self->brik_help_run_undef_arg('get_index_refresh_interval', $indices) or return;
1897   $self->brik_help_run_invalid_arg('get_index_refresh_interval', $indices, 'ARRAY', 'SCALAR')
1898      or return;
1899
1900   my $settings = $self->get_settings($indices);
1901
1902   my %indices = ();
1903   for (keys %$settings) {
1904      $indices{$_} = $settings->{$_}{settings}{index}{refresh_interval};
1905   }
1906
1907   return \%indices;
1908}
1909
1910sub get_index_number_of_shards {
1911   my $self = shift;
1912   my ($indices, $number) = @_;
1913
1914   my $es = $self->_es;
1915   $self->brik_help_run_undef_arg('open', $es) or return;
1916   $self->brik_help_run_undef_arg('get_index_number_of_shards', $indices) or return;
1917   $self->brik_help_run_invalid_arg('get_index_number_of_shards', $indices, 'ARRAY', 'SCALAR')
1918      or return;
1919
1920   my $settings = $self->get_settings($indices);
1921
1922   my %indices = ();
1923   for (keys %$settings) {
1924      $indices{$_} = $settings->{$_}{settings}{index}{number_of_shards};
1925   }
1926
1927   return \%indices;
1928}
1929
1930#
1931# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
1932#
1933sub delete_template {
1934   my $self = shift;
1935   my ($name) = @_;
1936
1937   my $es = $self->_es;
1938   $self->brik_help_run_undef_arg('open', $es) or return;
1939   $self->brik_help_run_undef_arg('delete_template', $name) or return;
1940
1941   my $r;
1942   eval {
1943      $r = $es->indices->delete_template(
1944         name => $name,
1945      );
1946   };
1947   if ($@) {
1948      chomp($@);
1949      return $self->log->error("delete_template: failed for name [$name]: [$@]");
1950   }
1951
1952   return $r;
1953}
1954
1955#
1956# Return a boolean to state for index existence
1957#
1958sub is_index_exists {
1959   my $self = shift;
1960   my ($index) = @_;
1961
1962   my $es = $self->_es;
1963   $self->brik_help_run_undef_arg('open', $es) or return;
1964   $self->brik_help_run_undef_arg('is_index_exists', $index) or return;
1965
1966   my $r;
1967   eval {
1968      $r = $es->indices->exists(
1969         index => $index,
1970      );
1971   };
1972   if ($@) {
1973      chomp($@);
1974      return $self->log->error("is_index_exists: failed for index [$index]: [$@]");
1975   }
1976
1977   return $r ? 1 : 0;
1978}
1979
1980#
1981# Return a boolean to state for index with type existence
1982#
1983sub is_type_exists {
1984   my $self = shift;
1985   my ($index, $type) = @_;
1986
1987   my $es = $self->_es;
1988   $self->brik_help_run_undef_arg('open', $es) or return;
1989   $self->brik_help_run_undef_arg('is_type_exists', $index) or return;
1990   $self->brik_help_run_undef_arg('is_type_exists', $type) or return;
1991
1992   my $r;
1993   eval {
1994      $r = $es->indices->exists_type(
1995         index => $index,
1996         type => $type,
1997      );
1998   };
1999   if ($@) {
2000      chomp($@);
2001      return $self->log->error("is_type_exists: failed for index [$index] and ".
2002         "type [$type]: [$@]");
2003   }
2004
2005   return $r ? 1 : 0;
2006}
2007
2008#
2009# Return a boolean to state for document existence
2010#
2011sub is_document_exists {
2012   my $self = shift;
2013   my ($index, $type, $document) = @_;
2014
2015   my $es = $self->_es;
2016   $self->brik_help_run_undef_arg('open', $es) or return;
2017   $self->brik_help_run_undef_arg('is_document_exists', $index) or return;
2018   $self->brik_help_run_undef_arg('is_document_exists', $type) or return;
2019   $self->brik_help_run_undef_arg('is_document_exists', $document) or return;
2020   $self->brik_help_run_invalid_arg('is_document_exists', $document, 'HASH') or return;
2021
2022   my $r;
2023   eval {
2024      $r = $es->exists(
2025         index => $index,
2026         type => $type,
2027         %$document,
2028      );
2029   };
2030   if ($@) {
2031      chomp($@);
2032      return $self->log->error("is_document_exists: failed for index [$index] and ".
2033         "type [$type]: [$@]");
2034   }
2035
2036   return $r ? 1 : 0;
2037}
2038
2039sub parse_error_string {
2040   my $self = shift;
2041   my ($string) = @_;
2042
2043   $self->brik_help_run_undef_arg('parse_error_string', $string) or return;
2044
2045   # [Timeout] ** [http://X.Y.Z.1:9200]-[599] Timed out while waiting for socket to become ready for reading, called from sub Search::Elasticsearch::Role::Client::Direct::__ANON__ at /usr/local/lib/perl5/site_perl/Metabrik/Client/Elasticsearch.pm line 1466. With vars: {'status_code' => 599,'request' => {'body' => undef,'qs' => {},'ignore' => [],'serialize' => 'std','path' => '/index-thing/_refresh','method' => 'POST'}}
2046
2047   my ($class, $node, $code, $message, $dump) = $string =~
2048      m{^\[([^]]+)\] \*\* \[([^]]+)\]\-\[(\d+)\] (.+)\. With vars: (.+)$};
2049
2050   if (defined($dump) && length($dump)) {
2051      my $sd = Metabrik::String::Dump->new_from_brik_init($self) or return;
2052      $dump = $sd->decode($dump);
2053   }
2054
2055   # Sanity check
2056   if (defined($node) && $node =~ m{^http} && $code =~ m{^\d+$}
2057   &&  defined($dump) && ref($dump) eq 'HASH') {
2058      return {
2059         class => $class,
2060         node => $node,
2061         code => $code,
2062         message => $message,
2063         dump => $dump,
2064      };
2065   }
2066
2067   # Were not able to decode, we return as-is.
2068   return {
2069      message => $string,
2070   };
2071}
2072
2073#
2074# Refresh an index to receive latest additions
2075#
2076# Search::Elasticsearch::Client::5_0::Direct::Indices
2077# https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-refresh.html
2078#
2079sub refresh_index {
2080   my $self = shift;
2081   my ($index) = @_;
2082
2083   my $es = $self->_es;
2084   $self->brik_help_run_undef_arg('open', $es) or return;
2085   $self->brik_help_run_undef_arg('refresh_index', $index) or return;
2086
2087   my $try = $self->try;
2088
2089RETRY:
2090
2091   my $r;
2092   eval {
2093      $r = $es->indices->refresh(
2094         index => $index,
2095      );
2096   };
2097   if ($@) {
2098      if (--$try == 0) {
2099         chomp($@);
2100         my $p = $self->parse_error_string($@);
2101         if (defined($p) && exists($p->{class})) {
2102            my $class = $p->{class};
2103            my $code = $p->{code};
2104            my $node = $p->{node};
2105            return $self->log->error("refresh_index: failed for index [$index] ".
2106               "after [$try] tries with error [$class] code [$code] for node [$node]");
2107         }
2108         else {
2109            return $self->log->error("refresh_index: failed for index [$index] ".
2110               "after [$try]: [$@]");
2111         }
2112      }
2113      sleep 60;
2114      goto RETRY;
2115   }
2116
2117   return $r;
2118}
2119
2120sub export_as_csv {
2121   my $self = shift;
2122   my ($index, $size, $cb) = @_;
2123
2124   $size ||= 10_000;
2125   my $es = $self->_es;
2126   $self->brik_help_run_undef_arg('open', $es) or return;
2127   $self->brik_help_run_undef_arg('export_as_csv', $index) or return;
2128   $self->brik_help_run_undef_arg('export_as_csv', $size) or return;
2129
2130   my $max = $self->max;
2131   my $datadir = $self->datadir;
2132
2133   $self->log->debug("export_as_csv: selecting scroll Command...");
2134
2135   my $scroll;
2136   my $version = $self->version or return;
2137   if ($version lt "5.0.0") {
2138      $scroll = $self->open_scroll_scan_mode($index, $size) or return;
2139   }
2140   else {
2141      $scroll = $self->open_scroll($index, $size) or return;
2142   }
2143
2144   $self->log->debug("export_as_csv: selecting scroll Command...OK.");
2145
2146   my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;
2147
2148   my $fc = Metabrik::File::Csv->new_from_brik_init($self) or return;
2149   $fc->separator(',');
2150   $fc->escape('\\');
2151   $fc->append(1);
2152   $fc->first_line_is_header(0);
2153   $fc->write_header(1);
2154   $fc->use_quoting(1);
2155   if (defined($self->csv_header)) {
2156      my $sorted = [ sort { $a cmp $b } @{$self->csv_header} ];
2157      $fc->header($sorted);
2158   }
2159   if (defined($self->csv_encoded_fields)) {
2160      $fc->encoded_fields($self->csv_encoded_fields);
2161   }
2162   if (defined($self->csv_object_fields)) {
2163      $fc->object_fields($self->csv_object_fields);
2164   }
2165
2166   my $csv_header = $fc->header;
2167
2168   my $total = $self->total_scroll;
2169   $self->log->info("export_as_csv: total [$total] for index [$index]");
2170
2171   my %types = ();
2172   my $read = 0;
2173   my $skipped = 0;
2174   my $exported = 0;
2175   my $start = time();
2176   my $done = $datadir."/$index.exported";
2177   my $start_time = time();
2178   my %chunk = ();
2179   while (my $next = $self->next_scroll(10000)) {
2180      for my $this (@$next) {
2181         $read++;
2182
2183         if (defined($cb)) {
2184            $this = $cb->($this);
2185            if (! defined($this)) {
2186               $self->log->error("export_as_csv: callback failed for index [$index] ".
2187                  "at read [$read], skipping single entry");
2188               $skipped++;
2189               next;
2190            }
2191         }
2192
2193         my $id = $this->{_id};
2194         my $doc = $this->{_source};
2195         my $type = $this->{_type} || 'doc';  # Prepare for when types will be removed from ES
2196         if (! exists($types{$type})) {
2197            # If not given, we guess the CSV fields to use.
2198            if (! defined($csv_header)) {
2199               my $fields = $self->list_index_fields($index, $type) or return;
2200               #$types{$type}{header} = [ '_id', sort { $a cmp $b } keys %$doc ];
2201               $types{$type}{header} = [ '_id', @$fields ];
2202            }
2203            else {
2204               $types{$type}{header} = [ '_id', @$csv_header ];
2205            }
2206
2207            $types{$type}{output} = $datadir."/$index:$type.csv";
2208
2209            # Verify it has not been exported yet
2210            if (-f $done) {
2211               return $self->log->error("export_as_csv: export already done for index ".
2212                  "[$index]");
2213            }
2214
2215            $self->log->info("export_as_csv: exporting to file [".$types{$type}{output}.
2216               "] for type [$type], using chunk size of [$size]");
2217         }
2218
2219         my $h = { _id => $id };
2220
2221         for my $k (keys %$doc) {
2222            $h->{$k} = $doc->{$k};
2223         }
2224
2225         $fc->header($types{$type}{header});
2226
2227         push @{$chunk{$type}}, $h;
2228         if (@{$chunk{$type}} > 999) {
2229            my $r = $fc->write($chunk{$type}, $types{$type}{output});
2230            if (!defined($r)) {
2231               $self->log->warning("export_as_csv: unable to process entry, skipping");
2232               $skipped++;
2233               next;
2234            }
2235            $chunk{$type} = [];
2236         }
2237
2238         # Log a status sometimes.
2239         if (! (++$exported % 100_000)) {
2240            my $now = time();
2241            my $perc = sprintf("%.02f", $exported / $total * 100);
2242            $self->log->info("export_as_csv: fetched [$exported/$total] ($perc%) ".
2243               "elements in ".($now - $start)." second(s) from index [$index]");
2244            $start = time();
2245         }
2246
2247         # Limit export to specified maximum
2248         if ($max > 0 && $exported >= $max) {
2249            $self->log->info("export_as_csv: max export reached [$exported] for index ".
2250               "[$index], stopping");
2251            last;
2252         }
2253      }
2254   }
2255
2256   # Process remaining data waiting to be written and build output file list
2257   my %files = ();
2258   for my $type (keys %types) {
2259      if (@{$chunk{$type}} > 0) {
2260         $fc->write($chunk{$type}, $types{$type}{output});
2261         $files{$types{$type}{output}}++;
2262      }
2263   }
2264
2265   $self->close_scroll;
2266
2267   my $stop_time = time();
2268   my $duration = $stop_time - $start_time;
2269   my $eps = $exported;
2270   if ($duration > 0) {
2271      $eps = $exported / $duration;
2272   }
2273
2274   my $result = {
2275      read => $read,
2276      exported => $exported,
2277      skipped => $read - $exported,
2278      total_count => $total,
2279      complete => ($exported == $total) ? 1 : 0,
2280      duration => $duration,
2281      eps => $eps, 
2282      files => [ sort { $a cmp $b } keys %files ],
2283   };
2284
2285   # Say the file has been processed, and put resulting stats.
2286   $fd->write($result, $done) or return;
2287
2288   $self->log->info("export_as_csv: done.");
2289
2290   return $result;
2291}
2292
2293#
2294# Optimization instructions:
2295# https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html
2296#
2297sub import_from_csv {
2298   my $self = shift;
2299   my ($input_csv, $index, $type, $hash, $cb) = @_;
2300
2301   my $es = $self->_es;
2302   $self->brik_help_run_undef_arg('open', $es) or return;
2303   $self->brik_help_run_undef_arg('import_from_csv', $input_csv) or return;
2304   $self->brik_help_run_file_not_found('import_from_csv', $input_csv) or return;
2305
2306   # If index and/or types are not defined, we try to get them from input filename
2307   if (! defined($index) || ! defined($type)) {
2308      # Example: index-DATE:type.csv
2309      if ($input_csv =~ m{^(.+):(.+)\.csv(?:.*)?$}) {
2310         my ($this_index, $this_type) = $input_csv =~ m{^(.+):(.+)\.csv(?:.*)?$};
2311         $index ||= $this_index;
2312         $type ||= $this_type;
2313      }
2314   }
2315
2316   # Verify it has not been indexed yet
2317   my $done = "$input_csv.imported";
2318   if (-f $done) {
2319      $self->log->info("import_from_csv: import already done for file [$input_csv]");
2320      return 0;
2321   }
2322
2323   # And default to Attributes if guess failed.
2324   $index ||= $self->index;
2325   $type ||= $self->type;
2326   $self->brik_help_set_undef_arg('index', $index) or return;
2327   $self->brik_help_set_undef_arg('type', $type) or return;
2328
2329   if ($index eq '*') {
2330      return $self->log->error("import_from_csv: cannot import to invalid index [$index]");
2331   }
2332   if ($type eq '*') {
2333      return $self->log->error("import_from_csv: cannot import to invalid type [$type]");
2334   }
2335
2336   $self->log->debug("input [$input_csv]");
2337   $self->log->debug("index [$index]");
2338   $self->log->debug("type [$type]");
2339
2340   my $count_before = 0;
2341   if ($self->is_index_exists($index)) {
2342      $count_before = $self->count($index, $type);
2343      if (! defined($count_before)) {
2344         return;
2345      }
2346      $self->log->info("import_from_csv: current index [$index] count is ".
2347         "[$count_before]");
2348   }
2349
2350   my $max = $self->max;
2351
2352   $self->open_bulk_mode($index, $type) or return;
2353
2354   $self->log->info("import_from_csv: importing file [$input_csv] to index [$index] ".
2355      "with type [$type]");
2356
2357   my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;
2358
2359   my $fc = Metabrik::File::Csv->new_from_brik_init($self) or return;
2360   $fc->separator(',');
2361   $fc->escape('\\');
2362   $fc->first_line_is_header(1);
2363   $fc->encoded_fields($self->csv_encoded_fields);
2364   $fc->object_fields($self->csv_object_fields);
2365
2366   my $refresh_interval;
2367   my $number_of_replicas;
2368   my $start = time();
2369   my $speed_settings = {};
2370   my $imported = 0;
2371   my $first = 1;
2372   my $read = 0;
2373   my $skipped_chunks = 0;
2374   my $start_time = time();
2375   while (my $this = $fc->read_next($input_csv)) {
2376      $read++;
2377
2378      my $h = {};
2379      my $id = $this->{_id};
2380      delete $this->{_id};
2381      for my $k (keys %$this) {
2382         my $value = $this->{$k};
2383         # We keep only fields when they have a value.
2384         # No need to index data that is empty.
2385         if (defined($value) && length($value)) {
2386            $h->{$k} = $value;
2387         }
2388      }
2389
2390      if (defined($cb)) {
2391         $h = $cb->($h);
2392         if (! defined($h)) {
2393            $self->log->error("import_from_csv: callback failed for index [$index] ".
2394               "at read [$read], skipping single entry");
2395            $skipped_chunks++;
2396            next;
2397         }
2398      }
2399
2400      #$self->log->info(Data::Dumper::Dumper($h));
2401
2402      my $r;
2403      eval {
2404         $r = $self->index_bulk($h, $index, $type, $hash, $id);
2405      };
2406      if ($@) {
2407         chomp($@);
2408         $self->log->warning("import_from_csv: error [$@]");
2409      }
2410      if (! defined($r)) {
2411         $self->log->error("import_from_csv: bulk processing failed for index [$index] ".
2412            "at read [$read], skipping chunk");
2413         $skipped_chunks++;
2414         next;
2415      }
2416
2417      # Gather index settings, and set values for speed.
2418      # We don't do it earlier, cause we need index to be created,
2419      # and it should have been done from index_bulk Command.
2420      if ($first && $self->is_index_exists($index)) {
2421         # Save current values so we can restore them at the end of Command.
2422         # We ignore errors here, this is non-blocking for indexing.
2423         $refresh_interval = $self->get_index_refresh_interval($index);
2424         $refresh_interval = $refresh_interval->{$index};
2425         $number_of_replicas = $self->get_index_number_of_replicas($index);
2426         $number_of_replicas = $number_of_replicas->{$index};
2427         if ($self->use_indexing_optimizations) {
2428            $self->set_index_number_of_replicas($index, 0);
2429         }
2430         $self->set_index_refresh_interval($index, -1);
2431         $first = 0;
2432      }
2433
2434      # Log a status sometimes.
2435      if (! (++$imported % 100_000)) {
2436         my $now = time();
2437         $self->log->info("import_from_csv: imported [$imported] entries in ".
2438            ($now - $start)." second(s) to index [$index]");
2439         $start = time();
2440      }
2441
2442      # Limit import to specified maximum
2443      if ($max > 0 && $imported >= $max) {
2444         $self->log->info("import_from_csv: max import reached [$imported] for ".
2445            "index [$index], stopping");
2446         last;
2447      }
2448   }
2449
2450   $self->bulk_flush;
2451
2452   my $stop_time = time();
2453   my $duration = $stop_time - $start_time;
2454   my $eps = $imported / ($duration || 1);  # Avoid divide by zero error.
2455
2456   $self->refresh_index($index);
2457
2458   my $count_current = $self->count($index, $type) or return;
2459   $self->log->info("import_from_csv: after index [$index] count is [$count_current]");
2460
2461   my $skipped = 0;
2462   my $complete = (($count_current - $count_before) == $read) ? 1 : 0;
2463   if ($complete) {  # If complete, import has been retried, and everything is now ok.
2464      $imported = $read;
2465   }
2466   else {
2467      $skipped = $read - ($count_current - $count_before);
2468   }
2469
2470   my $result = {
2471      read => $read,
2472      imported => $imported,
2473      skipped => $skipped,
2474      previous_count => $count_before,
2475      current_count => $count_current,
2476      complete => $complete,
2477      duration => $duration,
2478      eps => $eps,
2479   };
2480
2481   # Say the file has been processed, and put resulting stats.
2482   $fd->write($result, $done) or return;
2483
2484   # Restore previous settings, if any
2485   if (defined($refresh_interval)) {
2486      $self->set_index_refresh_interval($index, $refresh_interval);
2487   }
2488   if (defined($number_of_replicas) && $self->use_indexing_optimizations) {
2489      $self->set_index_number_of_replicas($index, $number_of_replicas);
2490   }
2491
2492   return $result;
2493}
2494
2495#
2496# http://localhost:9200/_nodes/stats/process?pretty
2497#
2498# Search::Elasticsearch::Client::2_0::Direct::Nodes
2499#
2500sub get_stats_process {
2501   my $self = shift;
2502
2503   my $es = $self->_es;
2504   $self->brik_help_run_undef_arg('open', $es) or return;
2505
2506   my $r;
2507   eval {
2508      $r = $es->nodes->stats(
2509         metric => [ qw(process) ],
2510      );
2511   };
2512   if ($@) {
2513      chomp($@);
2514      return $self->log->error("get_stats_process: failed: [$@]");
2515   }
2516
2517   return $r;
2518}
2519
2520#
2521# curl http://localhost:9200/_nodes/process?pretty
2522#
2523# Search::Elasticsearch::Client::2_0::Direct::Nodes
2524#
2525sub get_process {
2526   my $self = shift;
2527
2528   my $es = $self->_es;
2529   $self->brik_help_run_undef_arg('open', $es) or return;
2530
2531   my $r;
2532   eval {
2533      $r = $es->nodes->info(
2534         metric => [ qw(process) ],
2535      );
2536   };
2537   if ($@) {
2538      chomp($@);
2539      return $self->log->error("get_process: failed: [$@]");
2540   }
2541
2542   return $r;
2543}
2544
2545#
2546# Search::Elasticsearch::Client::2_0::Direct::Cluster
2547#
2548sub get_cluster_state {
2549   my $self = shift;
2550
2551   my $es = $self->_es;
2552   $self->brik_help_run_undef_arg('open', $es) or return;
2553
2554   my $r;
2555   eval {
2556      $r = $es->cluster->state;
2557   };
2558   if ($@) {
2559      chomp($@);
2560      return $self->log->error("get_cluster_state: failed: [$@]");
2561   }
2562
2563   return $r;
2564}
2565
2566#
2567# Search::Elasticsearch::Client::2_0::Direct::Cluster
2568#
2569sub get_cluster_health {
2570   my $self = shift;
2571
2572   my $es = $self->_es;
2573   $self->brik_help_run_undef_arg('open', $es) or return;
2574
2575   my $r;
2576   eval {
2577      $r = $es->cluster->health;
2578   };
2579   if ($@) {
2580      chomp($@);
2581      return $self->log->error("get_cluster_health: failed: [$@]");
2582   }
2583
2584   return $r;
2585}
2586
2587#
2588# Search::Elasticsearch::Client::2_0::Direct::Cluster
2589#
2590sub get_cluster_settings {
2591   my $self = shift;
2592
2593   my $es = $self->_es;
2594   $self->brik_help_run_undef_arg('open', $es) or return;
2595
2596   my $r;
2597   eval {
2598      $r = $es->cluster->get_settings;
2599   };
2600   if ($@) {
2601      chomp($@);
2602      return $self->log->error("get_cluster_settings: failed: [$@]");
2603   }
2604
2605   return $r;
2606}
2607
2608#
2609# Search::Elasticsearch::Client::2_0::Direct::Cluster
2610#
2611sub put_cluster_settings {
2612   my $self = shift;
2613   my ($settings) = @_;
2614
2615   my $es = $self->_es;
2616   $self->brik_help_run_undef_arg('open', $es) or return;
2617   $self->brik_help_run_undef_arg('put_cluster_settings', $settings) or return;
2618   $self->brik_help_run_invalid_arg('put_cluster_settings', $settings, 'HASH') or return;
2619
2620   my %args = (
2621      body => $settings,
2622   );
2623
2624   my $r;
2625   eval {
2626      $r = $es->cluster->put_settings(%args);
2627   };
2628   if ($@) {
2629      chomp($@);
2630      return $self->log->error("put_cluster_settings: failed: [$@]");
2631   }
2632
2633   return $r;
2634}
2635
2636sub count_green_indices {
2637   my $self = shift;
2638
2639   my $get = $self->show_indices or return;
2640
2641   my $count = 0;
2642   for (@$get) {
2643      if (/^\s*green\s+/) {
2644         $count++;
2645      }
2646   }
2647
2648   return $count;
2649}
2650
2651sub count_yellow_indices {
2652   my $self = shift;
2653
2654   my $get = $self->show_indices or return;
2655
2656   my $count = 0;
2657   for (@$get) {
2658      if (/^\s*yellow\s+/) {
2659         $count++;
2660      }
2661   }
2662
2663   return $count;
2664}
2665
2666sub count_red_indices {
2667   my $self = shift;
2668
2669   my $get = $self->show_indices or return;
2670
2671   my $count = 0;
2672   for (@$get) {
2673      if (/^\s*red\s+/) {
2674         $count++;
2675      }
2676   }
2677
2678   return $count;
2679}
2680
2681sub count_indices {
2682   my $self = shift;
2683
2684   my $get = $self->show_indices or return;
2685
2686   return scalar @$get;
2687}
2688
2689sub list_indices_status {
2690   my $self = shift;
2691
2692   my $get = $self->show_indices or return;
2693
2694   my $count_red = 0;
2695   my $count_yellow = 0;
2696   my $count_green = 0;
2697   for (@$get) {
2698      if (/^\s*red\s+/) {
2699         $count_red++;
2700      }
2701      elsif (/^\s*yellow\s+/) {
2702         $count_yellow++;
2703      }
2704      elsif (/^\s*green\s+/) {
2705         $count_green++;
2706      }
2707   }
2708
2709   return {
2710      red => $count_red,
2711      yellow => $count_yellow,
2712      green => $count_green,
2713   };
2714}
2715
2716sub count_shards {
2717   my $self = shift;
2718
2719   my $indices = $self->get_indices or return;
2720
2721   my $count = 0;
2722   for (@$indices) {
2723      $count += $_->{shards};
2724   }
2725
2726   return $count;
2727}
2728
2729sub count_size {
2730   my $self = shift;
2731
2732   my $indices = $self->get_indices or return;
2733
2734   my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
2735   $fn->kibi_suffix("kb");
2736   $fn->mebi_suffix("mb");
2737   $fn->gibi_suffix("gb");
2738   $fn->kilo_suffix("KB");
2739   $fn->mega_suffix("MB");
2740   $fn->giga_suffix("GB");
2741
2742   my $size = 0;
2743   for (@$indices) {
2744      $size += $fn->to_number($_->{size});
2745   }
2746
2747   return $fn->from_number($size);
2748}
2749
2750sub count_total_size {
2751   my $self = shift;
2752
2753   my $indices = $self->get_indices or return;
2754
2755   my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
2756   $fn->kibi_suffix("kb");
2757   $fn->mebi_suffix("mb");
2758   $fn->gibi_suffix("gb");
2759   $fn->kilo_suffix("KB");
2760   $fn->mega_suffix("MB");
2761   $fn->giga_suffix("GB");
2762
2763   my $size = 0;
2764   for (@$indices) {
2765      $size += $fn->to_number($_->{total_size});
2766   }
2767
2768   return $fn->from_number($size);
2769}
2770
2771sub count_count {
2772   my $self = shift;
2773
2774   my $indices = $self->get_indices or return;
2775
2776   my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
2777   $fn->kilo_suffix('k');
2778   $fn->mega_suffix('m');
2779   $fn->giga_suffix('M');
2780
2781   my $count = 0;
2782   for (@$indices) {
2783      $count += $_->{count};
2784   }
2785
2786   return $fn->from_number($count);
2787}
2788
2789sub list_green_indices {
2790   my $self = shift;
2791
2792   my $get = $self->get_indices or return;
2793
2794   my @indices = ();
2795   for (@$get) {
2796      if ($_->{color} eq 'green') {
2797         push @indices, $_->{index};
2798      }
2799   }
2800
2801   return \@indices;
2802}
2803
2804sub list_yellow_indices {
2805   my $self = shift;
2806
2807   my $get = $self->get_indices or return;
2808
2809   my @indices = ();
2810   for (@$get) {
2811      if ($_->{color} eq 'yellow') {
2812         push @indices, $_->{index};
2813      }
2814   }
2815
2816   return \@indices;
2817}
2818
2819sub list_red_indices {
2820   my $self = shift;
2821
2822   my $get = $self->get_indices or return;
2823
2824   my @indices = ();
2825   for (@$get) {
2826      if ($_->{color} eq 'red') {
2827         push @indices, $_->{index};
2828      }
2829   }
2830
2831   return \@indices;
2832}
2833
2834#
2835# https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html
2836#
2837sub list_datatypes {
2838   my $self = shift;
2839
2840   return {
2841      core => [ qw(string long integer short byte double float data boolean binary) ],
2842   };
2843}
2844
2845#
2846# Return total hits for last www_search
2847#
2848sub get_hits_total {
2849   my $self = shift;
2850   my ($run) = @_;
2851
2852   $self->brik_help_run_undef_arg('get_hits_total', $run) or return;
2853
2854   if (ref($run) eq 'HASH') {
2855      if (exists($run->{hits}) && exists($run->{hits}{total})) {
2856         return $run->{hits}{total};
2857      }
2858   }
2859
2860   return $self->log->error("get_hits_total: last Command not compatible");
2861}
2862
2863sub disable_shard_allocation {
2864   my $self = shift;
2865
2866   my $settings = {
2867      persistent => {
2868         'cluster.routing.allocation.enable' => 'none',
2869      }
2870   };
2871
2872   return $self->put_cluster_settings($settings);
2873}
2874
2875sub enable_shard_allocation {
2876   my $self = shift;
2877
2878   my $settings = {
2879      persistent => { 
2880         'cluster.routing.allocation.enable' => 'all',
2881      }
2882   };
2883
2884   return $self->put_cluster_settings($settings);
2885}
2886
2887sub flush_synced {
2888   my $self = shift;
2889
2890   my $es = $self->_es;
2891   $self->brik_help_run_undef_arg('open', $es) or return;
2892
2893   my $r;
2894   eval {
2895      $r = $es->indices->flush_synced;
2896   };
2897   if ($@) {
2898      chomp($@);
2899      return $self->log->error("flush_synced: failed: [$@]");
2900   }
2901
2902   return $r;
2903}
2904
2905#
2906# https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html
2907#
2908# run client::elasticsearch create_snapshot_repository myrepo
2909#      "{ type => 'fs', settings => { compress => 'true', location => '/path/' } }"
2910#
2911# You have to set path.repo in elasticsearch.yml like:
2912# path.repo: ["/home/gomor/es-backups"]
2913#
2914# Search::Elasticsearch::Client::2_0::Direct::Snapshot
2915#
2916sub create_snapshot_repository {
2917   my $self = shift;
2918   my ($body, $repository_name) = @_;
2919
2920   my $es = $self->_es;
2921   $self->brik_help_run_undef_arg('open', $es) or return;
2922   $self->brik_help_run_undef_arg('create_snapshot_repository', $body) or return;
2923
2924   $repository_name ||= 'repository';
2925
2926   my %args = (
2927      repository => $repository_name,
2928      body => $body,
2929   );
2930
2931   my $r;
2932   eval {
2933      $r = $es->snapshot->create_repository(%args);
2934   };
2935   if ($@) {
2936      chomp($@);
2937      return $self->log->error("create_snapshot_repository: failed: [$@]");
2938   }
2939
2940   return $r;
2941}
2942
2943sub create_shared_fs_snapshot_repository {
2944   my $self = shift;
2945   my ($location, $repository_name) = @_;
2946
2947   $repository_name ||= 'repository';
2948   $self->brik_help_run_undef_arg('create_shared_fs_snapshot_repository', $location) or return;
2949
2950   if ($location !~ m{^/}) {
2951      return $self->log->error("create_shared_fs_snapshot_repository: you have to give ".
2952         "a full directory path, this one is invalid [$location]");
2953   }
2954
2955   my $body = {
2956      type => 'fs',
2957      settings => {
2958         compress => 'true',
2959         location => $location,
2960      },
2961   };
2962
2963   return $self->create_snapshot_repository($body, $repository_name);
2964}
2965
2966#
2967# Search::Elasticsearch::Client::2_0::Direct::Snapshot
2968#
2969sub get_snapshot_repositories {
2970   my $self = shift;
2971
2972   my $es = $self->_es;
2973   $self->brik_help_run_undef_arg('open', $es) or return;
2974
2975   my $r;
2976   eval {
2977      $r = $es->snapshot->get_repository;
2978   };
2979   if ($@) {
2980      chomp($@);
2981      return $self->log->error("get_snapshot_repositories: failed: [$@]");
2982   }
2983
2984   return $r;
2985}
2986
2987#
2988# Search::Elasticsearch::Client::2_0::Direct::Snapshot
2989#
2990sub get_snapshot_status {
2991   my $self = shift;
2992
2993   my $es = $self->_es;
2994   $self->brik_help_run_undef_arg('open', $es) or return;
2995
2996   my $r;
2997   eval {
2998      $r = $es->snapshot->status;
2999   };
3000   if ($@) {
3001      chomp($@);
3002      return $self->log->error("get_snapshot_status: failed: [$@]");
3003   }
3004
3005   return $r;
3006}
3007
3008#
3009# Search::Elasticsearch::Client::5_0::Direct::Snapshot
3010#
3011sub create_snapshot {
3012   my $self = shift;
3013   my ($snapshot_name, $repository_name, $body) = @_;
3014
3015   my $es = $self->_es;
3016   $self->brik_help_run_undef_arg('open', $es) or return;
3017
3018   $snapshot_name ||= 'snapshot';
3019   $repository_name ||= 'repository';
3020
3021   my %args = (
3022      repository => $repository_name,
3023      snapshot => $snapshot_name,
3024   );
3025   if (defined($body)) {
3026      $args{body} = $body;
3027   }
3028
3029   my $r;
3030   eval {
3031      $r = $es->snapshot->create(%args);
3032   };
3033   if ($@) {
3034      chomp($@);
3035      return $self->log->error("create_snapshot: failed: [$@]");
3036   }
3037
3038   return $r;
3039}
3040
3041sub create_snapshot_for_indices {
3042   my $self = shift;
3043   my ($indices, $snapshot_name, $repository_name) = @_;
3044
3045   $self->brik_help_run_undef_arg('create_snapshot_for_indices', $indices) or return;
3046
3047   $snapshot_name ||= 'snapshot';
3048   $repository_name ||= 'repository';
3049
3050   my $body = {
3051      indices => $indices,
3052   };
3053
3054   return $self->create_snapshot($snapshot_name, $repository_name, $body);
3055}
3056
3057sub is_snapshot_finished {
3058   my $self = shift;
3059
3060   my $status = $self->get_snapshot_status or return;
3061
3062   if (@{$status->{snapshots}} == 0) {
3063      return 1;
3064   }
3065
3066   return 0;
3067}
3068
3069sub get_snapshot_state {
3070   my $self = shift;
3071
3072   if ($self->is_snapshot_finished) {
3073      return $self->log->info("get_snapshot_state: is already finished");
3074   }
3075
3076   my $status = $self->get_snapshot_status or return;
3077
3078   my @indices_done = ();
3079   my @indices_not_done = ();
3080
3081   my $list = $status->{snapshots};
3082   for my $snapshot (@$list) {
3083      my $indices = $snapshot->{indices};
3084      for my $index (@$indices) {
3085         my $done = $index->{shards_stats}{done};
3086         if ($done) {
3087            push @indices_done, $index;
3088         }
3089         else {
3090            push @indices_not_done, $index;
3091         }
3092      }
3093   }
3094
3095   return { done => \@indices_done, not_done => \@indices_not_done };
3096}
3097
3098sub verify_snapshot_repository {
3099}
3100
3101sub delete_snapshot_repository {
3102   my $self = shift;
3103   my ($repository_name) = @_;
3104
3105   my $es = $self->_es;
3106   $self->brik_help_run_undef_arg('open', $es) or return;
3107   $self->brik_help_run_undef_arg('delete_snapshot_repository', $repository_name) or return;
3108
3109   my $r;
3110   eval {
3111      $r = $es->snapshot->delete_repository(
3112         repository => $repository_name,
3113      );
3114   };
3115   if ($@) {
3116      chomp($@);
3117      return $self->log->error("delete_snapshot_repository: failed: [$@]");
3118   }
3119
3120   return $r;
3121}
3122
3123sub get_snapshot {
3124   my $self = shift;
3125   my ($snapshot_name, $repository_name) = @_;
3126
3127   my $es = $self->_es;
3128   $self->brik_help_run_undef_arg('open', $es) or return;
3129
3130   $snapshot_name ||= 'snapshot';
3131   $repository_name ||= 'repository';
3132
3133   my $r;
3134   eval {
3135      $r = $es->snapshot->get(
3136         repository => $repository_name,
3137         snapshot => $snapshot_name,
3138      );
3139   };
3140   if ($@) {
3141      chomp($@);
3142      return $self->log->error("get_snapshot: failed: [$@]");
3143   }
3144
3145   return $r;
3146}
3147
3148#
3149# Search::Elasticsearch::Client::5_0::Direct::Snapshot
3150#
3151sub delete_snapshot {
3152   my $self = shift;
3153   my ($snapshot_name, $repository_name) = @_;
3154
3155   my $es = $self->_es;
3156   $self->brik_help_run_undef_arg('open', $es) or return;
3157   $self->brik_help_run_undef_arg('delete_snapshot', $snapshot_name) or return;
3158   $self->brik_help_run_undef_arg('delete_snapshot', $repository_name) or return;
3159
3160   my $timeout = $self->rtimeout;
3161
3162   my $r;
3163   eval {
3164      $r = $es->snapshot->delete(
3165         repository => $repository_name,
3166         snapshot => $snapshot_name,
3167         master_timeout => "${timeout}s",
3168      );
3169   };
3170   if ($@) {
3171      chomp($@);
3172      return $self->log->error("delete_snapshot: failed: [$@]");
3173   }
3174
3175   return $r;
3176}
3177
3178#
3179# https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html
3180#
3181sub restore_snapshot {
3182   my $self = shift;
3183   my ($snapshot_name, $repository_name, $body) = @_;
3184
3185   my $es = $self->_es;
3186   $snapshot_name ||= 'snapshot';
3187   $repository_name ||= 'repository';
3188   $self->brik_help_run_undef_arg('open', $es) or return;
3189   $self->brik_help_run_undef_arg('restore_snapshot', $snapshot_name) or return;
3190   $self->brik_help_run_undef_arg('restore_snapshot', $repository_name) or return;
3191
3192   my %args = (
3193      repository => $repository_name,
3194      snapshot => $snapshot_name,
3195   );
3196   if (defined($body)) {
3197      $args{body} = $body;
3198   }
3199
3200   my $r;
3201   eval {
3202      $r = $es->snapshot->restore(%args);
3203   };
3204   if ($@) {
3205      chomp($@);
3206      return $self->log->error("restore_snapshot: failed: [$@]");
3207   }
3208
3209   return $r;
3210}
3211
3212sub restore_snapshot_for_indices {
3213   my $self = shift;
3214   my ($indices, $snapshot_name, $repository_name) = @_;
3215
3216   $snapshot_name ||= 'snapshot';
3217   $repository_name ||= 'repository';
3218   $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $indices) or return;
3219   $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $snapshot_name) or return;
3220   $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $repository_name) or return;
3221
3222   my $body = {
3223      indices => $indices,
3224   };
3225
3226   return $self->restore_snapshot($snapshot_name, $repository_name, $body);
3227}
3228
3229# shard occupation
3230#
3231# curl -XGET "http://127.0.0.1:9200/_cat/shards?v
3232# Or https://www.elastic.co/guide/en/elasticsearch/reference/1.6/cluster-nodes-stats.html
3233#
3234# disk occuption:
3235# curl -XGET http://127.0.0.1:9200/_cat/nodes?h=ip,h,diskAvail,diskTotal
3236#
3237#
3238# Who is master: curl -XGET http://127.0.0.1:9200/_cat/master?v
3239#
3240
3241# Check memory lock
3242
3243# curl -XGET 'localhost:9200/_nodes?filter_path=**.mlockall&pretty'
3244# {
3245#  "nodes" : {
3246#    "3XXX" : {
3247#      "process" : {
3248#        "mlockall" : true
3249#      }
3250#    }
3251#  }
3252# }
3253
32541;
3255
3256__END__
3257
3258=head1 NAME
3259
3260Metabrik::Client::Elasticsearch - client::elasticsearch Brik
3261
3262=head1 SYNOPSIS
3263
3264   host:~> my $q = { term => { ip => "192.168.57.19" } }
3265   host:~> run client::elasticsearch open
3266   host:~> run client::elasticsearch query $q data-*
3267
3268=head1 DESCRIPTION
3269
3270Template to write a new Metabrik Brik.
3271
3272=head1 COPYRIGHT AND LICENSE
3273
3274Copyright (c) 2014-2018, Patrice E<lt>GomoRE<gt> Auffret
3275
3276You may distribute this module under the terms of The BSD 3-Clause License.
3277See LICENSE file in the source distribution archive.
3278
3279=head1 AUTHOR
3280
3281Patrice E<lt>GomoRE<gt> Auffret
3282
3283=cut
Note: See TracBrowser for help on using the repository browser.