source: repository/lib/Metabrik/Client/Elasticsearch.pm

Last change on this file was 965:a1411d908af6, checked in by GomoR <gomor@…>, 7 weeks ago
  • bugfix: client::elasticsearch: catch open_scroll errors
File size: 80.7 KB
Line 
1#
2# $Id$
3#
4# client::elasticsearch Brik
5#
6package Metabrik::Client::Elasticsearch;
7use strict;
8use warnings;
9
10use base qw(Metabrik::Client::Rest);
11
12sub brik_properties {
13   return {
14      revision => '$Revision$',
15      tags => [ qw(unstable es es) ],
16      author => 'GomoR <GomoR[at]metabrik.org>',
17      license => 'http://opensource.org/licenses/BSD-3-Clause',
18      attributes => {
19         datadir => [ qw(datadir) ],
20         nodes => [ qw(node_list) ],
21         cxn_pool => [ qw(Sniff|Static|Static::NoPing) ],
22         date => [ qw(date) ],
23         index => [ qw(index) ],
24         type => [ qw(type) ],
25         from => [ qw(number) ],
26         size => [ qw(count) ],
27         max => [ qw(count) ],
28         max_flush_count => [ qw(count) ],
29         max_flush_size => [ qw(count) ],
30         rtimeout => [ qw(seconds) ],
31         sniff_rtimeout => [ qw(seconds) ],
32         try => [ qw(count) ],
33         use_bulk_autoflush => [ qw(0|1) ],
34         use_indexing_optimizations => [ qw(0|1) ],
35         csv_header => [ qw(fields) ],
36         csv_encoded_fields => [ qw(fields) ],
37         csv_object_fields => [ qw(fields) ],
38         _es => [ qw(INTERNAL) ],
39         _bulk => [ qw(INTERNAL) ],
40         _scroll => [ qw(INTERNAL) ],
41      },
42      attributes_default => {
43         nodes => [ qw(http://localhost:9200) ],
44         cxn_pool => 'Sniff',
45         from => 0,
46         size => 10,
47         max => 0,
48         index => '*',
49         type => '*',
50         rtimeout => 60,
51         sniff_rtimeout => 3,
52         try => 3,
53         max_flush_count => 1_000,
54         max_flush_size => 1_000_000,
55         use_bulk_autoflush => 1,
56         use_indexing_optimizations => 0,
57      },
58      commands => {
59         open => [ qw(nodes_list|OPTIONAL cxn_pool|OPTIONAL) ],
60         open_bulk_mode => [ qw(index|OPTIONAL type|OPTIONAL) ],
61         open_scroll_scan_mode => [ qw(index|OPTIONAL size|OPTIONAL) ],
62         open_scroll => [ qw(index|OPTIONAL size|OPTIONAL type|OPTIONAL query|OPTIONAL) ],
63         close_scroll => [ ],
64         total_scroll => [ ],
65         next_scroll => [ qw(count|OPTIONAL) ],
66         index_document => [ qw(document index|OPTIONAL type|OPTIONAL hash|OPTIONAL id|OPTIONAL) ],
67         index_bulk => [ qw(document index|OPTIONAL type|OPTIONAL hash|OPTIONAL id|OPTIONAL) ],
68         update_document => [ qw(document id index|OPTIONAL type|OPTIONAL hash|OPTIONAL) ],
69         update_document_bulk => [ qw(document index|OPTIONAL type|OPTIONAL hash|OPTIONAL id|OPTIONAL) ],
70         bulk_flush => [ qw(index|OPTIONAL) ],
71         query => [ qw($query_hash index|OPTIONAL type|OPTIONAL hash|OPTIONAL) ],
72         count => [ qw(index|OPTIONAL type|OPTIONAL) ],
73         get_from_id => [ qw(id index|OPTIONAL type|OPTIONAL) ],
74         www_search => [ qw(query index|OPTIONAL type|OPTIONAL) ],
75         delete_index => [ qw(index|indices_list) ],
76         update_alias => [ qw(new_index alias) ],
77         delete_document => [ qw(index type id) ],
78         delete_by_query => [ qw($query_hash index type) ],
79         show_indices => [ qw(string_filter|OPTIONAL) ],
80         show_nodes => [ ],
81         show_health => [ ],
82         show_recovery => [ ],
83         show_allocation => [ ],
84         list_indices => [ qw(regex|OPTIONAL) ],
85         get_indices => [ ],
86         get_index => [ qw(index|indices_list) ],
87         list_index_types => [ qw(index) ],
88         list_index_fields => [ qw(index) ],
89         list_indices_version => [ qw(index|indices_list) ],
90         open_index => [ qw(index|indices_list) ],
91         close_index => [ qw(index|indices_list) ],
92         get_aliases => [ qw(index) ],
93         put_alias => [ qw(index alias) ],
94         delete_alias => [ qw(index alias) ],
95         is_mapping_exists => [ qw(index mapping) ],
96         get_mappings => [ qw(index type|OPTIONAL) ],
97         create_index => [ qw(index) ],
98         create_index_with_mappings => [ qw(index mappings) ],
99         info => [ qw(nodes_list|OPTIONAL) ],
100         version => [ qw(nodes_list|OPTIONAL) ],
101         get_templates => [ ],
102         list_templates => [ ],
103         get_template => [ qw(name) ],
104         put_template => [ qw(name template) ],
105         put_template_from_json_file => [ qw(file) ],
106         update_template_from_json_file => [ qw(file) ],
107         get_settings => [ qw(index|indices_list|OPTIONAL name|names_list|OPTIONAL) ],
108         put_settings => [ qw(settings_hash index|indices_list|OPTIONAL) ],
109         set_index_number_of_replicas => [ qw(index|indices_list number) ],
110         set_index_refresh_interval => [ qw(index|indices_list number) ],
111         get_index_number_of_replicas => [ qw(index|indices) ],
112         get_index_refresh_interval => [ qw(index|indices_list) ],
113         get_index_number_of_shards => [ qw(index|indices_list) ],
114         delete_template => [ qw(name) ],
115         is_index_exists => [ qw(index) ],
116         is_type_exists => [ qw(index type) ],
117         is_document_exists => [ qw(index type document) ],
118         parse_error_string => [ qw(string) ],
119         refresh_index => [ qw(index) ],
120         export_as_csv => [ qw(index size|OPTIONAL callback|OPTIONAL) ],
121         import_from_csv => [ qw(input_csv index|OPTIONAL type|OPTIONAL hash|OPTIONAL callback|OPTIONAL) ],
122         get_stats_process => [ ],
123         get_process => [ ],
124         get_cluster_state => [ ],
125         get_cluster_health => [ ],
126         get_cluster_settings => [ ],
127         put_cluster_settings => [ qw(settings) ],
128         count_green_indices => [ ],
129         count_yellow_indices => [ ],
130         count_red_indices => [ ],
131         list_green_indices => [ ],
132         list_yellow_indices => [ ],
133         list_red_indices => [ ],
134         count_indices => [ ],
135         list_indices_status => [ ],
136         count_shards => [ ],
137         count_size => [ ],
138         count_total_size => [ ],
139         count_count => [ ],
140         list_datatypes => [ ],
141         get_hits_total => [ ],
142         disable_shard_allocation => [ ],
143         enable_shard_allocation => [ ],
144         flush_synced => [ ],
145         create_snapshot_repository => [ qw(body repository_name|OPTIONAL) ],
146         create_shared_fs_snapshot_repository => [ qw(location
147            repository_name|OPTIONAL) ],
148         get_snapshot_repositories => [ ],
149         get_snapshot_status => [ ],
150         delete_snapshot_repository => [ qw(repository_name) ],
151         create_snapshot => [ qw(snapshot_name|OPTIONAL repository_name|OPTIONAL
152            body|OPTIONAL) ],
153         create_snapshot_for_indices => [ qw(indices snapshot_name|OPTIONAL
154            repository_name|OPTIONAL) ],
155         is_snapshot_finished => [ ],
156         get_snapshot_state => [ ],
157         get_snapshot => [ qw(snapshot_name|OPTIONAL repository_name|OPTIONAL) ],
158         delete_snapshot => [ qw(snapshot_name repository_name) ],
159         restore_snapshot => [ qw(snapshot_name repository_name body|OPTIONAL) ],
160         restore_snapshot_for_indices => [ qw(indices snapshot_name repository_name) ],
161      },
162      require_modules => {
163         'Metabrik::String::Json' => [ ],
164         'Metabrik::File::Csv' => [ ],
165         'Metabrik::File::Json' => [ ],
166         'Metabrik::File::Dump' => [ ],
167         'Metabrik::Format::Number' => [ ],
168         'Search::Elasticsearch' => [ ],
169      },
170   };
171}
172
173sub brik_preinit {
174   my $self = shift;
175
176   eval("use Search::Elasticsearch;");
177   if ($Search::Elasticsearch::VERSION < 5) {
178      $self->log->error("brik_preinit: please upgrade Search::Elasticsearch module ".
179         "with: run perl::module install Search::Elasticsearch");
180   }
181
182   return $self->SUPER::brik_preinit;
183}
184
185sub open {
186   my $self = shift;
187   my ($nodes, $cxn_pool) = @_;
188
189   $nodes ||= $self->nodes;
190   $cxn_pool ||= $self->cxn_pool;
191   $self->brik_help_run_undef_arg('open', $nodes) or return;
192   $self->brik_help_run_undef_arg('open', $cxn_pool) or return;
193   $self->brik_help_run_invalid_arg('open', $nodes, 'ARRAY') or return;
194   $self->brik_help_run_empty_array_arg('open', $nodes) or return;
195
196   for my $node (@$nodes) {
197      if ($node !~ m{https?://}) {
198         return $self->log->error("open: invalid node[$node], must start with http(s)");
199      }
200   }
201
202   my $timeout = $self->rtimeout;
203
204   my $nodes_str = join('|', @$nodes);
205   $self->log->debug("open: using nodes [$nodes_str]");
206
207   #
208   # Timeout description here:
209   #
210   # Search::Elasticsearch::Role::Cxn
211   #
212
213   my $es = Search::Elasticsearch->new(
214      nodes => $nodes,
215      cxn_pool => $cxn_pool,
216      timeout => $timeout,
217      max_retries => $self->try,
218      retry_on_timeout => 1,
219      sniff_timeout => $self->sniff_rtimeout, # seconds, default 1
220      request_timeout => 60,  # seconds, default 30
221      ping_timeout => 5,  # seconds, default 2
222      dead_timeout => 120,  # seconds, detault 60
223      max_dead_timeout => 3600,  # seconds, default 3600
224      sniff_request_timeout => 15, # seconds, default 2
225      #trace_to => 'Stderr',  # For debug purposes
226   );
227   if (! defined($es)) {
228      return $self->log->error("open: failed");
229   }
230
231   $self->_es($es);
232
233   return $nodes;
234}
235
236#
237# Search::Elasticsearch::Client::5_0::Bulk
238#
239sub open_bulk_mode {
240   my $self = shift;
241   my ($index, $type) = @_;
242
243   $index ||= $self->index;
244   $type ||= $self->type;
245   my $es = $self->_es;
246   $self->brik_help_run_undef_arg('open', $es) or return;
247   $self->brik_help_run_undef_arg('open_bulk_mode', $index) or return;
248   $self->brik_help_run_undef_arg('open_bulk_mode', $type) or return;
249
250   my %args = (
251      index => $index,
252      type => $type,
253   );
254
255   if ($self->use_bulk_autoflush) {
256      my $max_count = $self->max_flush_count || 1_000;
257      my $max_size = $self->max_flush_size || 1_000_000;
258
259      $args{max_count} = $max_count;
260      $args{max_size} = $max_size;
261      $args{max_time} = 0;
262
263      $self->log->info("open_bulk_mode: opening with max_flush_count [$max_count] and ".
264         "max_flush_size [$max_size]");
265   }
266   else {
267      $args{max_count} = 0;
268      $args{max_size} = 0;
269      $args{max_time} = 0;
270      $args{on_error} = undef;
271      #$args{on_success} = sub {
272         #my ($action, $response, $i) = @_;
273      #};
274
275      $self->log->info("open_bulk_mode: opening without automatic flushing");
276   }
277
278   my $bulk = $es->bulk_helper(%args);
279   if (! defined($bulk)) {
280      return $self->log->error("open_bulk_mode: failed");
281   }
282
283   $self->_bulk($bulk);
284
285   return $self->nodes;
286}
287
288sub open_scroll_scan_mode {
289   my $self = shift;
290   my ($index, $size) = @_;
291
292   my $version = $self->version or return;
293   if ($version ge "5.0.0") {
294      return $self->log->error("open_scroll_scan_mode: Command not supported for ES version ".
295         "$version, try open_scroll Command instead");
296   }
297
298   $index ||= $self->index;
299   $size ||= $self->size;
300   my $es = $self->_es;
301   $self->brik_help_run_undef_arg('open', $es) or return;
302   $self->brik_help_run_undef_arg('open_scroll_scan_mode', $index) or return;
303   $self->brik_help_run_undef_arg('open_scroll_scan_mode', $size) or return;
304
305   my $scroll;
306   eval {
307      $scroll = $es->scroll_helper(
308         index => $index,
309         search_type => 'scan',
310         size => $size,
311      );
312   };
313   if ($@) {
314      chomp($@);
315      return $self->log->error("open_scroll_scan_mode: failed: $@");
316   }
317
318   $self->_scroll($scroll);
319
320   return $self->nodes;
321}
322
323#
324# Search::Elasticsearch::Client::5_0::Scroll
325#
326sub open_scroll {
327   my $self = shift;
328   my ($index, $size, $type, $query) = @_;
329
330   my $version = $self->version or return;
331   if ($version lt "5.0.0") {
332      return $self->log->error("open_scroll: Command not supported for ES version ".
333         "$version, try open_scroll_scan_mode Command instead");
334   }
335
336   $query ||= { query => { match_all => {} } };
337   $index ||= $self->index;
338   $type ||= $self->type;
339   $size ||= $self->size;
340   my $es = $self->_es;
341   $self->brik_help_run_undef_arg('open', $es) or return;
342   $self->brik_help_run_undef_arg('open_scroll', $index) or return;
343   $self->brik_help_run_undef_arg('open_scroll', $size) or return;
344
345   my $timeout = $self->rtimeout;
346
347   my %args = (
348      scroll => "${timeout}s",
349      scroll_in_qs => 1,  # By default (0), pass scroll_id in request body. When 1, pass
350                          # it in query string.
351      index => $index,
352      size => $size,
353      body => $query,
354   );
355   if ($type ne '*') {
356      $args{type} = $type;
357   }
358
359   #
360   # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html
361   #
362   my $scroll;
363   eval {
364      $scroll = $es->scroll_helper(%args);
365   };
366   if ($@) {
367      chomp($@);
368      return $self->log->error("open_scroll: failed: $@");
369   }
370
371   $self->_scroll($scroll);
372
373   $self->log->info("open_scroll: opened with size [$size] and timeout [${timeout}s]");
374
375   return $self->nodes;
376}
377
378#
379# Search::Elasticsearch::Client::5_0::Scroll
380#
381sub close_scroll {
382   my $self = shift;
383
384   my $scroll = $self->_scroll;
385   if (! defined($scroll)) {
386      return 1;
387   }
388
389   $scroll->finish;
390   $self->_scroll(undef);
391
392   return 1;
393}
394
395sub total_scroll {
396   my $self = shift;
397
398   my $scroll = $self->_scroll;
399   $self->brik_help_run_undef_arg('open_scroll', $scroll) or return;
400
401   my $total;
402   eval {
403      $total = $scroll->total;
404   };
405   if ($@) {
406      chomp($@);
407      return $self->log->error("total_scroll: failed with: [$@]");
408   }
409
410   return $total;
411}
412
413sub next_scroll {
414   my $self = shift;
415   my ($count) = @_;
416
417   $count ||= 1;
418
419   my $scroll = $self->_scroll;
420   $self->brik_help_run_undef_arg('open_scroll', $scroll) or return;
421
422   my $next;
423   eval {
424      if ($count > 1) {
425         my @docs = $scroll->next($count);
426         if (@docs > 0) {
427            $next = \@docs;
428         }
429      }
430      else {
431         $next = $scroll->next;
432      }
433   };
434   if ($@) {
435      chomp($@);
436      return $self->log->error("next_scroll: failed with: [$@]");
437   }
438
439   return $next;
440}
441
442#
443# Search::Elasticsearch::Client::5_0::Direct
444#
445sub index_document {
446   my $self = shift;
447   my ($doc, $index, $type, $hash, $id) = @_;
448
449   $index ||= $self->index;
450   $type ||= $self->type;
451   my $es = $self->_es;
452   $self->brik_help_run_undef_arg('open', $es) or return;
453   $self->brik_help_run_undef_arg('index_document', $doc) or return;
454   $self->brik_help_run_invalid_arg('index_document', $doc, 'HASH') or return;
455   $self->brik_help_set_undef_arg('index', $index) or return;
456   $self->brik_help_set_undef_arg('type', $type) or return;
457
458   my %args = (
459      index => $index,
460      type => $type,
461      body => $doc,
462   );
463   if (defined($id)) {
464      $args{id} = $id;
465   }
466
467   if (defined($hash)) {
468      $self->brik_help_run_invalid_arg('index_document', $hash, 'HASH') or return;
469      %args = ( %args, %$hash );
470   }
471
472   my $r;
473   eval {
474      $r = $es->index(%args);
475   };
476   if ($@) {
477      chomp($@);
478      return $self->log->error("index_document: index failed for index [$index]: [$@]");
479   }
480
481   return $r;
482}
483
484#
485# Search::Elasticsearch::Client::5_0::Direct
486#
487sub update_document {
488   my $self = shift;
489   my ($doc, $id, $index, $type, $hash) = @_;
490
491   $index ||= $self->index;
492   $type ||= $self->type;
493   my $es = $self->_es;
494   $self->brik_help_run_undef_arg('open', $es) or return;
495   $self->brik_help_run_undef_arg('update_document', $doc) or return;
496   $self->brik_help_run_invalid_arg('update_document', $doc, 'HASH') or return;
497   $self->brik_help_run_undef_arg('update_document', $id) or return;
498   $self->brik_help_set_undef_arg('index', $index) or return;
499   $self->brik_help_set_undef_arg('type', $type) or return;
500
501   my %args = (
502      id => $id,
503      index => $index,
504      type => $type,
505      body => { doc => $doc },
506   );
507
508   if (defined($hash)) {
509      $self->brik_help_run_invalid_arg('update_document', $hash, 'HASH') or return;
510      %args = ( %args, %$hash );
511   }
512
513   my $r;
514   eval {
515      $r = $es->update(%args);
516   };
517   if ($@) {
518      chomp($@);
519      return $self->log->error("update_document: index failed for index [$index]: [$@]");
520   }
521
522   return $r;
523}
524
525#
526# Search::Elasticsearch::Client::5_0::Bulk
527#
528sub index_bulk {
529   my $self = shift;
530   my ($doc, $index, $type, $hash, $id) = @_;
531
532   my $bulk = $self->_bulk;
533   $index ||= $self->index;
534   $type ||= $self->type;
535   $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
536   $self->brik_help_run_undef_arg('index_bulk', $doc) or return;
537   $self->brik_help_set_undef_arg('index', $index) or return;
538   $self->brik_help_set_undef_arg('type', $type) or return;
539
540   my %args = (
541      source => $doc,
542   );
543   if (defined($id)) {
544      $args{id} = $id;
545   }
546
547   if (defined($hash)) {
548      $self->brik_help_run_invalid_arg('index_bulk', $hash, 'HASH') or return;
549      %args = ( %args, %$hash );
550   }
551
552   my $r;
553   eval {
554      $r = $bulk->add_action(index => \%args);
555   };
556   if ($@) {
557      chomp($@);
558      my $p = $self->parse_error_string($@);
559      if (defined($p) && exists($p->{class})) {
560         my $class = $p->{class};
561         my $code = $p->{code};
562         my $node = $p->{node};
563         return $self->log->error("index_bulk: failed for index [$index] with error ".
564            "[$class] code [$code] for node [$node]");
565      }
566      else {
567         return $self->log->error("index_bulk: index failed for index [$index]: [$@]");
568      }
569   }
570
571   return $r;
572}
573
574sub update_document_bulk {
575   my $self = shift;
576   my ($doc, $index, $type, $hash, $id) = @_;
577
578   my $bulk = $self->_bulk;
579   $index ||= $self->index;
580   $type ||= $self->type;
581   $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
582   $self->brik_help_run_undef_arg('update_document_bulk', $doc) or return;
583   $self->brik_help_set_undef_arg('index', $index) or return;
584   $self->brik_help_set_undef_arg('type', $type) or return;
585
586   my %args = (
587      index => $index,
588      type => $type,
589      doc => $doc,
590   );
591   if (defined($id)) {
592      $args{id} = $id;
593   }
594
595   if (defined($hash)) {
596      $self->brik_help_run_invalid_arg('update_document_bulk', $hash, 'HASH') or return;
597      %args = ( %args, %$hash );
598   }
599
600   my $r;
601   eval {
602      $r = $bulk->update(\%args);
603   };
604   if ($@) {
605      chomp($@);
606      my $p = $self->parse_error_string($@);
607      if (defined($p) && exists($p->{class})) {
608         my $class = $p->{class};
609         my $code = $p->{code};
610         my $node = $p->{node};
611         return $self->log->error("update_document_bulk: failed for index [$index] ".
612            "with error [$class] code [$code] for node [$node]");
613      }
614      else {
615         return $self->log->error("update_document_bulk: index failed for ".
616            "index [$index]: [$@]");
617      }
618   }
619
620   return $r;
621}
622
623#
624# We may have to call refresh_index after a bulk_flush, so we give an additional
625# optional Argument for given index.
626#
627sub bulk_flush {
628   my $self = shift;
629   my ($index) = @_;
630
631   my $bulk = $self->_bulk;
632   $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
633
634   my $try = $self->try;
635
636RETRY:
637
638   my $r;
639   eval {
640      $r = $bulk->flush;
641   };
642   if ($@) {
643      chomp($@);
644      if (--$try == 0) {
645         my $p = $self->parse_error_string($@);
646         if (defined($p) && exists($p->{class})) {
647            my $class = $p->{class};
648            my $code = $p->{code};
649            my $node = $p->{node};
650            return $self->log->error("bulk_flush: failed after [$try] tries with error ".
651               "[$class] code [$code] for node [$node]");
652         }
653         else {
654            return $self->log->error("bulk_flush: failed after [$try]: [$@]");
655         }
656      }
657      $self->log->warning("bulk_flush: sleeping 10 seconds before retry cause error ".
658               "[$@]");
659      sleep 10;
660      goto RETRY;
661   }
662
663   if (defined($index)) {
664      $self->refresh_index($index);
665   }
666
667   return $r;
668}
669
670#
671# Search::Elasticsearch::Client::2_0::Direct
672# Search::Elasticsearch::Client::5_0::Direct
673#
674sub count {
675   my $self = shift;
676   my ($index, $type) = @_;
677
678   $index ||= $self->index;
679   $type ||= $self->type;
680   my $es = $self->_es;
681   $self->brik_help_run_undef_arg('open', $es) or return;
682
683   my %args = ();
684   if (defined($index) && $index ne '*') {
685      $args{index} = $index;
686   }
687   if (defined($type) && $type ne '*') {
688      $args{type} = $type;
689   }
690
691   #$args{body} = {
692      #query => {
693         #match => { title => 'Elasticsearch clients' },
694      #},
695   #}
696
697   my $r;
698   my $version = $self->version or return;
699   if ($version ge "5.0.0") {
700      eval {
701         $r = $es->count(%args);
702      };
703   }
704   else {
705      eval {
706         $r = $es->search(
707            index => $index,
708            type => $type,
709            search_type => 'count',
710            body => {
711               query => {
712                  match_all => {},
713               },
714            },
715         );
716      };
717   }
718   if ($@) {
719      chomp($@);
720      return $self->log->error("count: count failed for index [$index]: [$@]");
721   }
722
723   if ($version ge "5.0.0") {
724      if (exists($r->{count})) {
725         return $r->{count};
726      }
727   }
728   elsif (exists($r->{hits}) && exists($r->{hits}{total})) {
729      return $r->{hits}{total};
730   }
731
732   return $self->log->error("count: nothing found");
733}
734
735#
736# https://www.elastic.co/guide/en/elasticsearch/reference/current/full-text-queries.html
737# https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html
738#
739# Example: my $q = { query => { term => { ip => "192.168.57.19" } } }
740#
741sub query {
742   my $self = shift;
743   my ($query, $index, $type, $hash) = @_;
744
745   $index ||= $self->index;
746   $type ||= $self->type;
747   my $es = $self->_es;
748   $self->brik_help_run_undef_arg('open', $es) or return;
749   $self->brik_help_run_undef_arg('query', $query) or return;
750   $self->brik_help_set_undef_arg('index', $index) or return;
751   $self->brik_help_set_undef_arg('type', $type) or return;
752   $self->brik_help_run_invalid_arg('query', $query, 'HASH') or return;
753
754   my $timeout = $self->rtimeout;
755
756   my %args = (
757      index => $index,
758      body => $query,
759   );
760
761   if (defined($hash)) {
762      $self->brik_help_run_invalid_arg('query', $hash, 'HASH') or return;
763      %args = ( %args, %$hash );
764   }
765
766   if ($type ne '*') {
767      $args{type} = $type;
768   }
769
770   my $r;
771   eval {
772      $r = $es->search(%args);
773   };
774   if ($@) {
775      chomp($@);
776      return $self->log->error("query: failed for index [$index]: [$@]");
777   }
778
779   return $r;
780}
781
782sub get_from_id {
783   my $self = shift;
784   my ($id, $index, $type) = @_;
785
786   $index ||= $self->index;
787   $type ||= $self->type;
788   my $es = $self->_es;
789   $self->brik_help_run_undef_arg('open', $es) or return;
790   $self->brik_help_run_undef_arg('get_from_id', $id) or return;
791   $self->brik_help_set_undef_arg('index', $index) or return;
792   $self->brik_help_set_undef_arg('type', $type) or return;
793
794   my $r;
795   eval {
796      $r = $es->get(
797         index => $index,
798         type => $type,
799         id => $id,
800      );
801   };
802   if ($@) {
803      chomp($@);
804      return $self->log->error("get_from_id: get failed for index [$index]: [$@]");
805   }
806
807   return $r;
808}
809
810#
811# https://www.elastic.co/guide/en/elasticsearch/reference/current/search-uri-request.html
812#
813sub www_search {
814   my $self = shift;
815   my ($query, $index, $type) = @_;
816
817   $index ||= $self->index;
818   $type ||= $self->type;
819   $self->brik_help_run_undef_arg('www_search', $query) or return;
820   $self->brik_help_set_undef_arg('index', $index) or return;
821   $self->brik_help_set_undef_arg('type', $type) or return;
822
823   my $from = $self->from;
824   my $size = $self->size;
825
826   my $sj = Metabrik::String::Json->new_from_brik_init($self) or return;
827
828   my $nodes = $self->nodes;
829   for my $node (@$nodes) {
830      # http://localhost:9200/INDEX/TYPE/_search/?size=SIZE&q=QUERY
831      my $url = "$node/$index";
832      if ($type ne '*') {
833         $url .= "/$type";
834      }
835      $url .= "/_search/?from=$from&size=$size&q=".$query;
836
837      my $get = $self->SUPER::get($url) or next;
838      my $body = $get->{content};
839
840      my $decoded = $sj->decode($body) or next;
841
842      return $decoded;
843   }
844
845   return;
846}
847
848#
849# Search::Elasticsearch::Client::2_0::Direct::Indices
850#
851sub delete_index {
852   my $self = shift;
853   my ($index) = @_;
854
855   my $es = $self->_es;
856   $self->brik_help_run_undef_arg('open', $es) or return;
857   $self->brik_help_run_undef_arg('delete_index', $index) or return;
858   $self->brik_help_run_invalid_arg('delete_index', $index, 'ARRAY', 'SCALAR') or return;
859
860   my %args = (
861      index => $index,
862   );
863
864   my $r;
865   eval {
866      $r = $es->indices->delete(%args);
867   };
868   if ($@) {
869      chomp($@);
870      return $self->log->error("delete_index: delete failed for index [$index]: [$@]");
871   }
872
873   return $r;
874}
875
876#
877# Search::Elasticsearch::Client::2_0::Direct::Indices
878#
879sub delete_document {
880   my $self = shift;
881   my ($index, $type, $id, $hash) = @_;
882
883   my $es = $self->_es;
884   $self->brik_help_run_undef_arg('open', $es) or return;
885   $self->brik_help_run_undef_arg('delete_document', $index) or return;
886   $self->brik_help_run_undef_arg('delete_document', $type) or return;
887   $self->brik_help_run_undef_arg('delete_document', $id) or return;
888
889   my %args = (
890      index => $index,
891      type => $type,
892      id => $id,
893   );
894
895   if (defined($hash)) {
896      $self->brik_help_run_invalid_arg('delete_document', $hash, 'HASH') or return;
897      %args = ( %args, %$hash );
898   }
899
900   my $r;
901   eval {
902      $r = $es->delete(%args);
903   };
904   if ($@) {
905      chomp($@);
906      return $self->log->error("delete_document: delete failed for index [$index]: [$@]");
907   }
908
909   return $r;
910}
911
912#
913# https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html
914#
915# Example: my $q = { query => { term => { ip => "192.168.57.19" } } }
916#
917sub delete_by_query {
918   my $self = shift;
919   my ($query, $index, $type) = @_;
920
921   my $es = $self->_es;
922   $self->brik_help_run_undef_arg('open', $es) or return;
923   $self->brik_help_run_undef_arg('delete_by_query', $query) or return;
924   $self->brik_help_run_undef_arg('delete_by_query', $index) or return;
925   $self->brik_help_run_undef_arg('delete_by_query', $type) or return;
926   $self->brik_help_run_invalid_arg('delete_by_query', $query, 'HASH') or return;
927
928   my $timeout = $self->rtimeout;
929
930   my %args = (
931      index => $index,
932      type => $type,
933      body => $query,
934   );
935
936   my $r;
937   eval {
938      $r = $es->delete_by_query(%args);
939   };
940   if ($@) {
941      chomp($@);
942      return $self->log->error("delete_by_query: failed for index [$index]: [$@]");
943   }
944
945   # This may fail, we ignore it.
946   $self->refresh_index($index);
947
948   return $r;
949}
950
951#
952# Search::Elasticsearch::Client::2_0::Direct::Cat
953#
954sub show_indices {
955   my $self = shift;
956   my ($string) = @_;
957
958   my $es = $self->_es;
959   $self->brik_help_run_undef_arg('open', $es) or return;
960
961   my $r;
962   eval {
963      $r = $es->cat->indices;
964   };
965   if ($@) {
966      chomp($@);
967      return $self->log->error("show_indices: failed: [$@]");
968   }
969
970   my @lines = split(/\n/, $r);
971
972   if (@lines == 0) {
973      $self->log->warning("show_indices: nothing returned, no index?");
974   }
975
976   my @filtered = ();
977   if (defined($string)) {
978      for (@lines) {
979         if (m{$string}) {
980            push @filtered, $_;
981         }
982      }
983      @lines = @filtered;
984   }
985
986   return \@lines;
987}
988
989#
990# Search::Elasticsearch::Client::2_0::Direct::Cat
991#
992sub show_nodes {
993   my $self = shift;
994
995   my $es = $self->_es;
996   $self->brik_help_run_undef_arg('open', $es) or return;
997
998   my $r;
999   eval {
1000      $r = $es->cat->nodes;
1001   };
1002   if ($@) {
1003      chomp($@);
1004      return $self->log->error("show_nodes: failed: [$@]");
1005   }
1006
1007   my @lines = split(/\n/, $r);
1008
1009   if (@lines == 0) {
1010      $self->log->warning("show_nodes: nothing returned, no nodes?");
1011   }
1012
1013   return \@lines;
1014}
1015
1016#
1017# Search::Elasticsearch::Client::2_0::Direct::Cat
1018#
1019sub show_health {
1020   my $self = shift;
1021
1022   my $es = $self->_es;
1023   $self->brik_help_run_undef_arg('open', $es) or return;
1024
1025   my $r;
1026   eval {
1027      $r = $es->cat->health;
1028   };
1029   if ($@) {
1030      chomp($@);
1031      return $self->log->error("show_health: failed: [$@]");
1032   }
1033
1034   my @lines = split(/\n/, $r);
1035
1036   if (@lines == 0) {
1037      $self->log->warning("show_health: nothing returned, no recovery?");
1038   }
1039
1040   return \@lines;
1041}
1042
1043#
1044# Search::Elasticsearch::Client::2_0::Direct::Cat
1045#
1046sub show_recovery {
1047   my $self = shift;
1048
1049   my $es = $self->_es;
1050   $self->brik_help_run_undef_arg('open', $es) or return;
1051
1052   my $r;
1053   eval {
1054      $r = $es->cat->recovery;
1055   };
1056   if ($@) {
1057      chomp($@);
1058      return $self->log->error("show_recovery: failed: [$@]");
1059   }
1060
1061   my @lines = split(/\n/, $r);
1062
1063   if (@lines == 0) {
1064      $self->log->warning("show_recovery: nothing returned, no index?");
1065   }
1066
1067   return \@lines;
1068}
1069
1070#
1071# curl -s 'localhost:9200/_cat/allocation?v'
1072#
1073sub show_allocation {
1074   my $self = shift;
1075
1076   my $es = $self->_es;
1077   $self->brik_help_run_undef_arg('open', $es) or return;
1078
1079   my $r;
1080   eval {
1081      $r = $es->cat->allocation;
1082   };
1083   if ($@) {
1084      chomp($@);
1085      return $self->log->error("show_allocation: failed: [$@]");
1086   }
1087
1088   my @lines = split(/\n/, $r);
1089
1090   if (@lines == 0) {
1091      $self->log->warning("show_allocation: nothing returned, no index?");
1092   }
1093
1094   return \@lines;
1095}
1096
1097sub list_indices {
1098   my $self = shift;
1099   my ($regex) = @_;
1100
1101   my $get = $self->get_indices or return;
1102
1103   my @indices = ();
1104   for (@$get) {
1105      if (defined($regex)) {
1106         if ($_->{index} =~ m{$regex}) {
1107            push @indices, $_->{index};
1108         }
1109      }
1110      else {
1111         push @indices, $_->{index};
1112      }
1113   }
1114
1115   return [ sort { $a cmp $b } @indices ];
1116}
1117
1118sub get_indices {
1119   my $self = shift;
1120
1121   my $lines = $self->show_indices or return;
1122   if (@$lines == 0) {
1123      $self->log->warning("get_indices: no index found");
1124      return [];
1125   }
1126
1127   #
1128   # Format depends on ElasticSearch version. We try to detect the format.
1129   #
1130   # 5.0.0:
1131   # "yellow open www-2016-08-14 BmNE9RaBRSCKqB5Oe8yZcw 5 1  146 0 251.8kb 251.8kb"
1132   #
1133   my @indices = ();
1134   for (@$lines) {
1135      my @t = split(/\s+/);
1136      if (@t == 10) {  # Version 5.0.0
1137         my $color = $t[0];
1138         my $state = $t[1];
1139         my $index = $t[2];
1140         my $id = $t[3];
1141         my $shards = $t[4];
1142         my $replicas = $t[5];
1143         my $count = $t[6];
1144         my $count2 = $t[7];
1145         my $total_size = $t[8];
1146         my $size = $t[9];
1147         push @indices, {
1148            color => $color,
1149            state => $state,
1150            index => $index,
1151            id => $id,
1152            shards => $shards,
1153            replicas => $replicas,
1154            count => $count,
1155            total_size => $total_size,
1156            size => $size,
1157         };
1158      }
1159      elsif (@t == 9) {
1160         my $index = $t[2];
1161         push @indices, {
1162            index => $index,
1163         };
1164      }
1165      elsif (@t == 8) {
1166         my $index = $t[1];
1167         push @indices, {
1168            index => $index,
1169         };
1170      }
1171   }
1172
1173   return \@indices;
1174}
1175
1176#
1177# Search::Elasticsearch::Client::5_0::Direct::Indices
1178#
1179sub get_index {
1180   my $self = shift;
1181   my ($index) = @_;
1182 
1183   my $es = $self->_es;
1184   $self->brik_help_run_undef_arg('open', $es) or return;
1185   $self->brik_help_run_undef_arg('get_index', $index) or return;
1186   $self->brik_help_run_invalid_arg('get_index', $index, 'ARRAY', 'SCALAR') or return;
1187
1188   my %args = (
1189      index => $index,
1190   );
1191
1192   my $r;
1193   eval {
1194      $r = $es->indices->get(%args);
1195   };
1196   if ($@) {
1197      chomp($@);
1198      return $self->log->error("get_index: get failed for index [$index]: [$@]");
1199   }
1200
1201   return $r;
1202}
1203
1204sub list_index_types {
1205   my $self = shift;
1206   my ($index) = @_;
1207
1208   my $es = $self->_es;
1209   $self->brik_help_run_undef_arg('open', $es) or return;
1210   $self->brik_help_run_undef_arg('list_index_types', $index) or return;
1211   $self->brik_help_run_invalid_arg('list_index_types', $index, 'SCALAR') or return;
1212
1213   my $r = $self->get_mappings($index) or return;
1214   if (keys %$r > 1) {
1215      return $self->log->error("list_index_types: multiple indices found, choose one");
1216   }
1217
1218   my @types = ();
1219   for my $this_index (keys %$r) {
1220      my $mappings = $r->{$this_index}{mappings};
1221      push @types, keys %$mappings;
1222   }
1223
1224   my %uniq = map { $_ => 1 } @types;
1225
1226   return [ sort { $a cmp $b } keys %uniq ];
1227}
1228
1229#
1230# By default, if you provide only one index and no type,
1231# all types will be merged (including _default_)
1232# If you specify one type (other than _default_), _default_ will be merged to it.
1233#
1234sub list_index_fields {
1235   my $self = shift;
1236   my ($index, $type) = @_;
1237
1238   my $es = $self->_es;
1239   $self->brik_help_run_undef_arg('open', $es) or return;
1240   $self->brik_help_run_undef_arg('list_index_fields', $index) or return;
1241   $self->brik_help_run_invalid_arg('list_index_fields', $index, 'SCALAR') or return;
1242
1243   my $r;
1244   if (defined($type)) {
1245      $r = $self->get_mappings($index, $type) or return;
1246      if (keys %$r > 1) {
1247         return $self->log->error("list_index_fields: multiple indices found, ".
1248            "choose one");
1249      }
1250      # _default_ mapping may not exists.
1251      if ($self->is_mapping_exists($index, '_default_')) {
1252         my $r2 = $self->get_mappings($index, '_default_');
1253         # Merge
1254         for my $this_index (keys %$r2) {
1255            my $default = $r2->{$this_index}{mappings}{'_default_'};
1256            $r->{$this_index}{mappings}{_default_} = $default;
1257         }
1258      }
1259   }
1260   else {
1261      $r = $self->get_mappings($index) or return;
1262      if (keys %$r > 1) {
1263         return $self->log->error("list_index_fields: multiple indices found, ".
1264            "choose one");
1265      }
1266   }
1267
1268   my @fields = ();
1269   for my $this_index (keys %$r) {
1270      my $mappings = $r->{$this_index}{mappings};
1271      for my $this_type (keys %$mappings) {
1272         my $properties = $mappings->{$this_type}{properties};
1273         push @fields, keys %$properties;
1274      }
1275   }
1276
1277   my %uniq = map { $_ => 1 } @fields;
1278
1279   return [ sort { $a cmp $b } keys %uniq ];
1280}
1281
1282sub list_indices_version {
1283   my $self = shift;
1284   my ($index) = @_;
1285
1286   my $es = $self->_es;
1287   $self->brik_help_run_undef_arg('open', $es) or return;
1288   $self->brik_help_run_undef_arg('list_indices_version', $index) or return;
1289   $self->brik_help_run_invalid_arg('list_indices_version', $index, 'ARRAY', 'SCALAR')
1290      or return;
1291
1292   my $r = $self->get_index($index) or return;
1293
1294   my @list = ();
1295   for my $this (keys %$r) {
1296      my $name = $this;
1297      my $version = $r->{$this}{settings}{index}{version}{created};
1298      push @list, {
1299         index => $name,
1300         version => $version,
1301      };
1302   }
1303
1304   return \@list;
1305}
1306
1307sub open_index {
1308   my $self = shift;
1309   my ($index) = @_;
1310
1311   my $es = $self->_es;
1312   $self->brik_help_run_undef_arg('open', $es) or return;
1313   $self->brik_help_run_undef_arg('open_index', $index) or return;
1314   $self->brik_help_run_invalid_arg('open_index', $index, 'ARRAY', 'SCALAR') or return;
1315
1316   my $r;
1317   eval {
1318      $r = $es->indices->open(
1319         index => $index,
1320      );
1321   };
1322   if ($@) {
1323      chomp($@);
1324      return $self->log->error("open_index: failed: [$@]");
1325   }
1326
1327   return $r;
1328}
1329
1330sub close_index {
1331   my $self = shift;
1332   my ($index) = @_;
1333
1334   my $es = $self->_es;
1335   $self->brik_help_run_undef_arg('open', $es) or return;
1336   $self->brik_help_run_undef_arg('close_index', $index) or return;
1337   $self->brik_help_run_invalid_arg('close_index', $index, 'ARRAY', 'SCALAR') or return;
1338
1339   my $r;
1340   eval {
1341      $r = $es->indices->close(
1342         index => $index,
1343      );
1344   };
1345   if ($@) {
1346      chomp($@);
1347      return $self->log->error("close_index: failed: [$@]");
1348   }
1349
1350   return $r;
1351}
1352
1353#
1354# Search::Elasticsearch::Client::5_0::Direct::Indices
1355#
1356sub get_aliases {
1357   my $self = shift;
1358   my ($index) = @_;
1359
1360   $index ||= $self->index;
1361   my $es = $self->_es;
1362   $self->brik_help_run_undef_arg('open', $es) or return;
1363
1364   my %args = (
1365      index => $index,
1366   );
1367
1368   my $r;
1369   eval {
1370      $r = $es->indices->get(%args);
1371   };
1372   if ($@) {
1373      chomp($@);
1374      return $self->log->error("get_aliases: get_aliases failed: [$@]");
1375   }
1376
1377   my %aliases = ();
1378   for my $this (keys %$r) {
1379      $aliases{$this} = $r->{$this}{aliases};
1380   }
1381
1382   return \%aliases;
1383}
1384
1385#
1386# Search::Elasticsearch::Client::5_0::Direct::Indices
1387#
1388sub put_alias {
1389   my $self = shift;
1390   my ($index, $alias) = @_;
1391
1392   my $es = $self->_es;
1393   $self->brik_help_run_undef_arg('open', $es) or return;
1394   $self->brik_help_run_undef_arg('put_alias', $index) or return;
1395   $self->brik_help_run_undef_arg('put_alias', $alias) or return;
1396
1397   my %args = (
1398      index => $index,
1399      name => $alias,
1400   );
1401
1402   my $r;
1403   eval {
1404      $r = $es->indices->put_alias(%args);
1405   };
1406   if ($@) {
1407      chomp($@);
1408      return $self->log->error("put_alias: put_alias failed: [$@]");
1409   }
1410
1411   return $r;
1412}
1413
1414#
1415# Search::Elasticsearch::Client::5_0::Direct::Indices
1416#
1417sub delete_alias {
1418   my $self = shift;
1419   my ($index, $alias) = @_;
1420
1421   my $es = $self->_es;
1422   $self->brik_help_run_undef_arg('open', $es) or return;
1423   $self->brik_help_run_undef_arg('delete_alias', $index) or return;
1424   $self->brik_help_run_undef_arg('delete_alias', $alias) or return;
1425
1426   my %args = (
1427      index => $index,
1428      name => $alias,
1429   );
1430
1431   my $r;
1432   eval {
1433      $r = $es->indices->delete_alias(%args);
1434   };
1435   if ($@) {
1436      chomp($@);
1437      return $self->log->error("delete_alias: delete_alias failed: [$@]");
1438   }
1439
1440   return $r;
1441}
1442
1443sub update_alias {
1444   my $self = shift;
1445   my ($new_index, $alias) = @_;
1446
1447   my $es = $self->_es;
1448   $self->brik_help_run_undef_arg('open', $es) or return;
1449   $self->brik_help_run_undef_arg('update_alias', $new_index) or return;
1450   $self->brik_help_run_undef_arg('update_alias', $alias) or return;
1451
1452   # Search for previous index with that alias, if any.
1453   my $prev_index;
1454   my $aliases = $self->get_aliases or return;
1455   while (my ($k, $v) = each %$aliases) {
1456      for my $this (keys %$v) {
1457         if ($this eq $alias) {
1458            $prev_index = $k;
1459            last;
1460         }
1461      }
1462      last if $prev_index;
1463   }
1464
1465   # Delete previous alias if it exists.
1466   if (defined($prev_index)) {
1467      $self->delete_alias($prev_index, $alias) or return;
1468   }
1469
1470   return $self->put_alias($new_index, $alias);
1471}
1472
1473sub is_mapping_exists {
1474   my $self = shift;
1475   my ($index, $mapping) = @_;
1476
1477   $self->brik_help_run_undef_arg('is_mapping_exists', $index) or return;
1478   $self->brik_help_run_undef_arg('is_mapping_exists', $mapping) or return;
1479
1480   if (! $self->is_index_exists($index)) {
1481      return 0;
1482   }
1483
1484   my $all = $self->get_mappings($index) or return;
1485   for my $this_index (keys %$all) {
1486      my $mappings = $all->{$this_index}{mappings};
1487      for my $this_mapping (keys %$mappings) {
1488         if ($this_mapping eq $mapping) {
1489            return 1;
1490         }
1491      }
1492   }
1493
1494   return 0;
1495}
1496
1497#
1498# Search::Elasticsearch::Client::2_0::Direct::Indices
1499#
1500sub get_mappings {
1501   my $self = shift;
1502   my ($index, $type) = @_;
1503
1504   my $es = $self->_es;
1505   $self->brik_help_run_undef_arg('open', $es) or return;
1506   $self->brik_help_run_undef_arg('get_mappings', $index) or return;
1507   $self->brik_help_run_invalid_arg('get_mappings', $index, 'ARRAY', 'SCALAR') or return;
1508
1509   my %args = (
1510      index => $index,
1511      type => $type,
1512   );
1513
1514   my $r;
1515   eval {
1516      $r = $es->indices->get_mapping(%args);
1517   };
1518   if ($@) {
1519      chomp($@);
1520      return $self->log->error("get_mappings: get_mapping failed for index [$index]: ".
1521         "[$@]");
1522   }
1523
1524   return $r;
1525}
1526
1527#
1528# Search::Elasticsearch::Client::2_0::Direct::Indices
1529#
1530sub create_index {
1531   my $self = shift;
1532   my ($index, $shards_count) = @_;
1533
1534   my $es = $self->_es;
1535   $self->brik_help_run_undef_arg('open', $es) or return;
1536   $self->brik_help_run_undef_arg('create_index', $index) or return;
1537         
1538   my $r;
1539   eval {
1540      $r = $es->indices->create(
1541         index => $index,
1542      );
1543   };
1544   if ($@) {
1545      chomp($@);
1546      return $self->log->error("create_index: create failed for index [$index]: [$@]");
1547   }
1548   
1549   return $r;
1550}
1551
1552#
1553# https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html
1554#
1555sub create_index_with_mappings {
1556   my $self = shift;
1557   my ($index, $mappings) = @_;
1558
1559   my $es = $self->_es;
1560   $self->brik_help_run_undef_arg('open', $es) or return;
1561   $self->brik_help_run_undef_arg('create_index_with_mappings', $index) or return;
1562   $self->brik_help_run_undef_arg('create_index_with_mappings', $mappings) or return;
1563   $self->brik_help_run_invalid_arg('create_index_with_mappings', $mappings, 'HASH') or return;
1564
1565   my $r;
1566   eval {
1567      $r = $es->indices->create(
1568         index => $index,
1569         body => {
1570            mappings => $mappings,
1571         },
1572      );
1573   };
1574   if ($@) {
1575      chomp($@);
1576      return $self->log->error("create_index_with_mappings: create failed for index [$index]: [$@]");
1577   }
1578
1579   return $r;
1580}
1581
1582# GET http://localhost:9200/
1583sub info {
1584   my $self = shift;
1585   my ($nodes) = @_;
1586
1587   $nodes ||= $self->nodes;
1588   $self->brik_help_run_undef_arg('info', $nodes) or return;
1589   $self->brik_help_run_invalid_arg('info', $nodes, 'ARRAY') or return;
1590   $self->brik_help_run_empty_array_arg('info', $nodes) or return;
1591
1592   my $first = $nodes->[0];
1593
1594   $self->get($first) or return;
1595
1596   return $self->content;
1597}
1598
1599sub version {
1600   my $self = shift;
1601   my ($nodes) = @_;
1602
1603   $nodes ||= $self->nodes;
1604   $self->brik_help_run_undef_arg('version', $nodes) or return;
1605   $self->brik_help_run_invalid_arg('version', $nodes, 'ARRAY') or return;
1606   $self->brik_help_run_empty_array_arg('version', $nodes) or return;
1607
1608   my $first = $nodes->[0];
1609
1610   $self->get($first) or return;
1611   my $content = $self->content or return;
1612
1613   return $content->{version}{number};
1614}
1615
1616#
1617# Search::Elasticsearch::Client::2_0::Direct::Indices
1618#
1619sub get_templates {
1620   my $self = shift;
1621
1622   my $es = $self->_es;
1623   $self->brik_help_run_undef_arg('open', $es) or return;
1624
1625   my $r;
1626   eval {
1627      $r = $es->indices->get_template;
1628   };
1629   if ($@) {
1630      chomp($@);
1631      return $self->log->error("get_templates: failed: [$@]");
1632   }
1633
1634   return $r;
1635}
1636
1637sub list_templates {
1638   my $self = shift;
1639
1640   my $content = $self->get_templates or return;
1641
1642   return [ sort { $a cmp $b } keys %$content ];
1643}
1644
1645#
1646# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
1647#
1648sub get_template {
1649   my $self = shift;
1650   my ($template) = @_;
1651
1652   my $es = $self->_es;
1653   $self->brik_help_run_undef_arg('open', $es) or return;
1654   $self->brik_help_run_undef_arg('get_template', $template) or return;
1655
1656   my $r;
1657   eval {
1658      $r = $es->indices->get_template(
1659         name => $template,
1660      );
1661   };
1662   if ($@) {
1663      chomp($@);
1664      return $self->log->error("get_template: template failed for name [$template]: [$@]");
1665   }
1666
1667   return $r;
1668}
1669
1670#
1671# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
1672#
1673sub put_template {
1674   my $self = shift;
1675   my ($name, $template) = @_;
1676
1677   my $es = $self->_es;
1678   $self->brik_help_run_undef_arg('open', $es) or return;
1679   $self->brik_help_run_undef_arg('put_template', $name) or return;
1680   $self->brik_help_run_undef_arg('put_template', $template) or return;
1681   $self->brik_help_run_invalid_arg('put_template', $template, 'HASH') or return;
1682
1683   my $r;
1684   eval {
1685      $r = $es->indices->put_template(
1686         name => $name,
1687         body => $template,
1688      );
1689   };
1690   if ($@) {
1691      chomp($@);
1692      return $self->log->error("put_template: template failed for name [$name]: [$@]");
1693   }
1694
1695   return $r;
1696}
1697
1698sub put_template_from_json_file {
1699   my $self = shift;
1700   my ($json_file) = @_;
1701
1702   my $es = $self->_es;
1703   $self->brik_help_run_undef_arg('open', $es) or return;
1704   $self->brik_help_run_undef_arg('put_template_from_json_file', $json_file) or return;
1705   $self->brik_help_run_file_not_found('put_template_from_json_file', $json_file) or return;
1706
1707   my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
1708   my $data = $fj->read($json_file) or return;
1709
1710   if (! exists($data->{template})) {
1711      return $self->log->error("put_template_from_json_file: no template name found");
1712   }
1713
1714   my $name = $data->{template};
1715
1716   return $self->put_template($name, $data);
1717}
1718
1719sub update_template_from_json_file {
1720   my $self = shift;
1721   my ($json_file) = @_;
1722
1723   my $es = $self->_es;
1724   $self->brik_help_run_undef_arg('open', $es) or return;
1725   $self->brik_help_run_undef_arg('update_template_from_json_file', $json_file) or return;
1726   $self->brik_help_run_file_not_found('update_template_from_json_file', $json_file) or return;
1727
1728   my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
1729   my $data = $fj->read($json_file) or return;
1730
1731   if (! exists($data->{template})) {
1732      return $self->log->error("put_template_from_json_file: no template name found");
1733   }
1734
1735   my $name = $data->{template};
1736
1737   $self->delete_template($name);  # We ignore errors, template may not exist.
1738
1739   return $self->put_template($name, $data);
1740}
1741
1742#
1743# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-settings.html
1744# Search::Elasticsearch::Client::2_0::Direct::Indices
1745#
1746sub get_settings {
1747   my $self = shift;
1748   my ($indices, $names) = @_;
1749
1750   my $es = $self->_es;
1751   $self->brik_help_run_undef_arg('open', $es) or return;
1752
1753   my %args = ();
1754   if (defined($indices)) {
1755      $self->brik_help_run_undef_arg('get_settings', $indices) or return;
1756      my $ref = $self->brik_help_run_invalid_arg('get_settings', $indices, 'ARRAY', 'SCALAR')
1757         or return;
1758      $args{index} = $indices;
1759   }
1760   if (defined($names)) {
1761      $self->brik_help_run_file_not_found('get_settings', $names) or return;
1762      my $ref = $self->brik_help_run_invalid_arg('get_settings', $names, 'ARRAY', 'SCALAR')
1763         or return;
1764      $args{name} = $names;
1765   }
1766
1767   my $r;
1768   eval {
1769      $r = $es->indices->get_settings(%args);
1770   };
1771   if ($@) {
1772      chomp($@);
1773      return $self->log->error("get_settings: failed: [$@]");
1774   }
1775
1776   return $r;
1777}
1778
1779#
1780# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-settings.html
1781# Search::Elasticsearch::Client::2_0::Direct::Indices
1782#
1783# Example:
1784#
1785# run client::elasticsearch put_settings "{ index => { refresh_interval => -1 } }"
1786#
1787# XXX: should be renamed to put_index_settings
1788#
1789sub put_settings {
1790   my $self = shift;
1791   my ($settings, $indices) = @_;
1792
1793   my $es = $self->_es;
1794   $self->brik_help_run_undef_arg('open', $es) or return;
1795   $self->brik_help_run_undef_arg('put_settings', $settings) or return;
1796   $self->brik_help_run_invalid_arg('put_settings', $settings, 'HASH') or return;
1797
1798   my %args = (
1799      body => $settings,
1800   );
1801   if (defined($indices)) {
1802      $self->brik_help_run_undef_arg('put_settings', $indices) or return;
1803      my $ref = $self->brik_help_run_invalid_arg('put_settings', $indices, 'ARRAY', 'SCALAR')
1804         or return;
1805      $args{index} = $indices;
1806   }
1807
1808   my $r;
1809   eval {
1810      $r = $es->indices->put_settings(%args);
1811   };
1812   if ($@) {
1813      chomp($@);
1814      return $self->log->error("put_settings: failed: [$@]");
1815   }
1816
1817   return $r;
1818}
1819
1820sub set_index_number_of_replicas {
1821   my $self = shift;
1822   my ($indices, $number) = @_;
1823
1824   my $es = $self->_es;
1825   $self->brik_help_run_undef_arg('open', $es) or return;
1826   $self->brik_help_run_undef_arg('set_index_number_of_replicas', $indices) or return;
1827   $self->brik_help_run_invalid_arg('set_index_number_of_replicas', $indices, 'ARRAY', 'SCALAR')
1828      or return;
1829
1830   my $settings = { number_of_replicas => $number };
1831
1832   return $self->put_settings($settings, $indices);
1833}
1834
1835sub set_index_refresh_interval {
1836   my $self = shift;
1837   my ($indices, $number) = @_;
1838
1839   my $es = $self->_es;
1840   $self->brik_help_run_undef_arg('open', $es) or return;
1841   $self->brik_help_run_undef_arg('set_index_refresh_interval', $indices) or return;
1842   $self->brik_help_run_invalid_arg('set_index_refresh_interval', $indices, 'ARRAY', 'SCALAR')
1843      or return;
1844
1845   # If there is a meaningful value not postfixed with a unity,
1846   # we default to add a `s' for a number of seconds.
1847   if ($number =~ /^\d+$/ && $number > 0) {
1848      $number .= 's';
1849   }
1850
1851   my $settings = { refresh_interval => $number };
1852
1853   return $self->put_settings($settings, $indices);
1854}
1855
1856sub get_index_number_of_replicas {
1857   my $self = shift;
1858   my ($indices) = @_;
1859
1860   my $es = $self->_es;
1861   $self->brik_help_run_undef_arg('open', $es) or return;
1862   $self->brik_help_run_undef_arg('get_index_number_of_replicas', $indices) or return;
1863   $self->brik_help_run_invalid_arg('get_index_number_of_replicas', $indices, 'ARRAY', 'SCALAR')
1864      or return;
1865
1866   my $settings = $self->get_settings($indices);
1867
1868   my %indices = ();
1869   for (keys %$settings) {
1870      $indices{$_} = $settings->{$_}{settings}{index}{number_of_replicas};
1871   }
1872
1873   return \%indices;
1874}
1875
1876sub get_index_refresh_interval {
1877   my $self = shift;
1878   my ($indices, $number) = @_;
1879
1880   my $es = $self->_es;
1881   $self->brik_help_run_undef_arg('open', $es) or return;
1882   $self->brik_help_run_undef_arg('get_index_refresh_interval', $indices) or return;
1883   $self->brik_help_run_invalid_arg('get_index_refresh_interval', $indices, 'ARRAY', 'SCALAR')
1884      or return;
1885
1886   my $settings = $self->get_settings($indices);
1887
1888   my %indices = ();
1889   for (keys %$settings) {
1890      $indices{$_} = $settings->{$_}{settings}{index}{refresh_interval};
1891   }
1892
1893   return \%indices;
1894}
1895
1896sub get_index_number_of_shards {
1897   my $self = shift;
1898   my ($indices, $number) = @_;
1899
1900   my $es = $self->_es;
1901   $self->brik_help_run_undef_arg('open', $es) or return;
1902   $self->brik_help_run_undef_arg('get_index_number_of_shards', $indices) or return;
1903   $self->brik_help_run_invalid_arg('get_index_number_of_shards', $indices, 'ARRAY', 'SCALAR')
1904      or return;
1905
1906   my $settings = $self->get_settings($indices);
1907
1908   my %indices = ();
1909   for (keys %$settings) {
1910      $indices{$_} = $settings->{$_}{settings}{index}{number_of_shards};
1911   }
1912
1913   return \%indices;
1914}
1915
1916#
1917# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
1918#
1919sub delete_template {
1920   my $self = shift;
1921   my ($name) = @_;
1922
1923   my $es = $self->_es;
1924   $self->brik_help_run_undef_arg('open', $es) or return;
1925   $self->brik_help_run_undef_arg('delete_template', $name) or return;
1926
1927   my $r;
1928   eval {
1929      $r = $es->indices->delete_template(
1930         name => $name,
1931      );
1932   };
1933   if ($@) {
1934      chomp($@);
1935      return $self->log->error("delete_template: failed for name [$name]: [$@]");
1936   }
1937
1938   return $r;
1939}
1940
1941#
1942# Return a boolean to state for index existence
1943#
1944sub is_index_exists {
1945   my $self = shift;
1946   my ($index) = @_;
1947
1948   my $es = $self->_es;
1949   $self->brik_help_run_undef_arg('open', $es) or return;
1950   $self->brik_help_run_undef_arg('is_index_exists', $index) or return;
1951
1952   my $r;
1953   eval {
1954      $r = $es->indices->exists(
1955         index => $index,
1956      );
1957   };
1958   if ($@) {
1959      chomp($@);
1960      return $self->log->error("is_index_exists: failed for index [$index]: [$@]");
1961   }
1962
1963   return $r ? 1 : 0;
1964}
1965
1966#
1967# Return a boolean to state for index with type existence
1968#
1969sub is_type_exists {
1970   my $self = shift;
1971   my ($index, $type) = @_;
1972
1973   my $es = $self->_es;
1974   $self->brik_help_run_undef_arg('open', $es) or return;
1975   $self->brik_help_run_undef_arg('is_type_exists', $index) or return;
1976   $self->brik_help_run_undef_arg('is_type_exists', $type) or return;
1977
1978   my $r;
1979   eval {
1980      $r = $es->indices->exists_type(
1981         index => $index,
1982         type => $type,
1983      );
1984   };
1985   if ($@) {
1986      chomp($@);
1987      return $self->log->error("is_type_exists: failed for index [$index] and ".
1988         "type [$type]: [$@]");
1989   }
1990
1991   return $r ? 1 : 0;
1992}
1993
1994#
1995# Return a boolean to state for document existence
1996#
1997sub is_document_exists {
1998   my $self = shift;
1999   my ($index, $type, $document) = @_;
2000
2001   my $es = $self->_es;
2002   $self->brik_help_run_undef_arg('open', $es) or return;
2003   $self->brik_help_run_undef_arg('is_document_exists', $index) or return;
2004   $self->brik_help_run_undef_arg('is_document_exists', $type) or return;
2005   $self->brik_help_run_undef_arg('is_document_exists', $document) or return;
2006   $self->brik_help_run_invalid_arg('is_document_exists', $document, 'HASH') or return;
2007
2008   my $r;
2009   eval {
2010      $r = $es->exists(
2011         index => $index,
2012         type => $type,
2013         %$document,
2014      );
2015   };
2016   if ($@) {
2017      chomp($@);
2018      return $self->log->error("is_document_exists: failed for index [$index] and ".
2019         "type [$type]: [$@]");
2020   }
2021
2022   return $r ? 1 : 0;
2023}
2024
2025sub parse_error_string {
2026   my $self = shift;
2027   my ($string) = @_;
2028
2029   $self->brik_help_run_undef_arg('parse_error_string', $string) or return;
2030
2031   # [Timeout] ** [http://X.Y.Z.1:9200]-[599] Timed out while waiting for socket to become ready for reading, called from sub Search::Elasticsearch::Role::Client::Direct::__ANON__ at /usr/local/lib/perl5/site_perl/Metabrik/Client/Elasticsearch.pm line 1466. With vars: {'status_code' => 599,'request' => {'body' => undef,'qs' => {},'ignore' => [],'serialize' => 'std','path' => '/index-thing/_refresh','method' => 'POST'}}
2032
2033   my ($class, $node, $code, $message, $dump) = $string =~
2034      m{^\[([^]]+)\] \*\* \[([^]]+)\]\-\[(\d+)\] (.+)\. With vars: (.+)$};
2035
2036   if (defined($dump) && length($dump)) {
2037      my $sd = Metabrik::String::Dump->new_from_brik_init($self) or return;
2038      $dump = $sd->decode($dump);
2039   }
2040
2041   # Sanity check
2042   if (defined($node) && $node =~ m{^http} && $code =~ m{^\d+$}
2043   &&  defined($dump) && ref($dump) eq 'HASH') {
2044      return {
2045         class => $class,
2046         node => $node,
2047         code => $code,
2048         message => $message,
2049         dump => $dump,
2050      };
2051   }
2052
2053   # Were not able to decode, we return as-is.
2054   return {
2055      message => $string,
2056   };
2057}
2058
2059#
2060# Refresh an index to receive latest additions
2061#
2062# Search::Elasticsearch::Client::5_0::Direct::Indices
2063# https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-refresh.html
2064#
2065sub refresh_index {
2066   my $self = shift;
2067   my ($index) = @_;
2068
2069   my $es = $self->_es;
2070   $self->brik_help_run_undef_arg('open', $es) or return;
2071   $self->brik_help_run_undef_arg('refresh_index', $index) or return;
2072
2073   my $try = $self->try;
2074
2075RETRY:
2076
2077   my $r;
2078   eval {
2079      $r = $es->indices->refresh(
2080         index => $index,
2081      );
2082   };
2083   if ($@) {
2084      if (--$try == 0) {
2085         chomp($@);
2086         my $p = $self->parse_error_string($@);
2087         if (defined($p) && exists($p->{class})) {
2088            my $class = $p->{class};
2089            my $code = $p->{code};
2090            my $node = $p->{node};
2091            return $self->log->error("refresh_index: failed for index [$index] ".
2092               "after [$try] tries with error [$class] code [$code] for node [$node]");
2093         }
2094         else {
2095            return $self->log->error("refresh_index: failed for index [$index] ".
2096               "after [$try]: [$@]");
2097         }
2098      }
2099      sleep 60;
2100      goto RETRY;
2101   }
2102
2103   return $r;
2104}
2105
2106sub export_as_csv {
2107   my $self = shift;
2108   my ($index, $size, $cb) = @_;
2109
2110   $size ||= 10_000;
2111   my $es = $self->_es;
2112   $self->brik_help_run_undef_arg('open', $es) or return;
2113   $self->brik_help_run_undef_arg('export_as_csv', $index) or return;
2114   $self->brik_help_run_undef_arg('export_as_csv', $size) or return;
2115
2116   my $max = $self->max;
2117   my $datadir = $self->datadir;
2118
2119   $self->log->debug("export_as_csv: selecting scroll Command...");
2120
2121   my $scroll;
2122   my $version = $self->version or return;
2123   if ($version lt "5.0.0") {
2124      $scroll = $self->open_scroll_scan_mode($index, $size) or return;
2125   }
2126   else {
2127      $scroll = $self->open_scroll($index, $size) or return;
2128   }
2129
2130   $self->log->debug("export_as_csv: selecting scroll Command...OK.");
2131
2132   my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;
2133
2134   my $fc = Metabrik::File::Csv->new_from_brik_init($self) or return;
2135   $fc->separator(',');
2136   $fc->escape('\\');
2137   $fc->append(1);
2138   $fc->first_line_is_header(0);
2139   $fc->write_header(1);
2140   $fc->use_quoting(1);
2141   if (defined($self->csv_header)) {
2142      my $sorted = [ sort { $a cmp $b } @{$self->csv_header} ];
2143      $fc->header($sorted);
2144   }
2145   if (defined($self->csv_encoded_fields)) {
2146      $fc->encoded_fields($self->csv_encoded_fields);
2147   }
2148   if (defined($self->csv_object_fields)) {
2149      $fc->object_fields($self->csv_object_fields);
2150   }
2151
2152   my $csv_header = $fc->header;
2153
2154   my $total = $self->total_scroll;
2155   $self->log->info("export_as_csv: total [$total] for index [$index]");
2156
2157   my %types = ();
2158   my $read = 0;
2159   my $skipped = 0;
2160   my $exported = 0;
2161   my $start = time();
2162   my $done = $datadir."/$index.exported";
2163   my $start_time = time();
2164   my %chunk = ();
2165   while (my $next = $self->next_scroll(10000)) {
2166      for my $this (@$next) {
2167         $read++;
2168
2169         if (defined($cb)) {
2170            $this = $cb->($this);
2171            if (! defined($this)) {
2172               $self->log->error("export_as_csv: callback failed for index [$index] ".
2173                  "at read [$read], skipping single entry");
2174               $skipped++;
2175               next;
2176            }
2177         }
2178
2179         my $id = $this->{_id};
2180         my $doc = $this->{_source};
2181         my $type = $this->{_type} || 'doc';  # Prepare for when types will be removed from ES
2182         if (! exists($types{$type})) {
2183            # If not given, we guess the CSV fields to use.
2184            if (! defined($csv_header)) {
2185               my $fields = $self->list_index_fields($index, $type) or return;
2186               #$types{$type}{header} = [ '_id', sort { $a cmp $b } keys %$doc ];
2187               $types{$type}{header} = [ '_id', @$fields ];
2188            }
2189            else {
2190               $types{$type}{header} = [ '_id', @$csv_header ];
2191            }
2192
2193            $types{$type}{output} = $datadir."/$index:$type.csv";
2194
2195            # Verify it has not been exported yet
2196            if (-f $done) {
2197               return $self->log->error("export_as_csv: export already done for index ".
2198                  "[$index]");
2199            }
2200
2201            $self->log->info("export_as_csv: exporting to file [".$types{$type}{output}.
2202               "] for type [$type], using chunk size of [$size]");
2203         }
2204
2205         my $h = { _id => $id };
2206
2207         for my $k (keys %$doc) {
2208            $h->{$k} = $doc->{$k};
2209         }
2210
2211         $fc->header($types{$type}{header});
2212
2213         push @{$chunk{$type}}, $h;
2214         if (@{$chunk{$type}} > 999) {
2215            my $r = $fc->write($chunk{$type}, $types{$type}{output});
2216            if (!defined($r)) {
2217               $self->log->warning("export_as_csv: unable to process entry, skipping");
2218               $skipped++;
2219               next;
2220            }
2221            $chunk{$type} = [];
2222         }
2223
2224         # Log a status sometimes.
2225         if (! (++$exported % 100_000)) {
2226            my $now = time();
2227            my $perc = sprintf("%.02f", $exported / $total * 100);
2228            $self->log->info("export_as_csv: fetched [$exported/$total] ($perc%) ".
2229               "elements in ".($now - $start)." second(s) from index [$index]");
2230            $start = time();
2231         }
2232
2233         # Limit export to specified maximum
2234         if ($max > 0 && $exported >= $max) {
2235            $self->log->info("export_as_csv: max export reached [$exported] for index ".
2236               "[$index], stopping");
2237            last;
2238         }
2239      }
2240   }
2241
2242   # Process remaining data waiting to be written and build output file list
2243   my %files = ();
2244   for my $type (keys %types) {
2245      if (@{$chunk{$type}} > 0) {
2246         $fc->write($chunk{$type}, $types{$type}{output});
2247         $files{$types{$type}{output}}++;
2248      }
2249   }
2250
2251   $self->close_scroll;
2252
2253   my $stop_time = time();
2254   my $duration = $stop_time - $start_time;
2255   my $eps = $exported;
2256   if ($duration > 0) {
2257      $eps = $exported / $duration;
2258   }
2259
2260   my $result = {
2261      read => $read,
2262      exported => $exported,
2263      skipped => $read - $exported,
2264      total_count => $total,
2265      complete => ($exported == $total) ? 1 : 0,
2266      duration => $duration,
2267      eps => $eps, 
2268      files => [ sort { $a cmp $b } keys %files ],
2269   };
2270
2271   # Say the file has been processed, and put resulting stats.
2272   $fd->write($result, $done) or return;
2273
2274   $self->log->info("export_as_csv: done.");
2275
2276   return $result;
2277}
2278
2279#
2280# Optimization instructions:
2281# https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html
2282#
2283sub import_from_csv {
2284   my $self = shift;
2285   my ($input_csv, $index, $type, $hash, $cb) = @_;
2286
2287   my $es = $self->_es;
2288   $self->brik_help_run_undef_arg('open', $es) or return;
2289   $self->brik_help_run_undef_arg('import_from_csv', $input_csv) or return;
2290   $self->brik_help_run_file_not_found('import_from_csv', $input_csv) or return;
2291
2292   # If index and/or types are not defined, we try to get them from input filename
2293   if (! defined($index) || ! defined($type)) {
2294      # Example: index-DATE:type.csv
2295      if ($input_csv =~ m{^(.+):(.+)\.csv(?:.*)?$}) {
2296         my ($this_index, $this_type) = $input_csv =~ m{^(.+):(.+)\.csv(?:.*)?$};
2297         $index ||= $this_index;
2298         $type ||= $this_type;
2299      }
2300   }
2301
2302   # Verify it has not been indexed yet
2303   my $done = "$input_csv.imported";
2304   if (-f $done) {
2305      $self->log->info("import_from_csv: import already done for file [$input_csv]");
2306      return 0;
2307   }
2308
2309   # And default to Attributes if guess failed.
2310   $index ||= $self->index;
2311   $type ||= $self->type;
2312   $self->brik_help_set_undef_arg('index', $index) or return;
2313   $self->brik_help_set_undef_arg('type', $type) or return;
2314
2315   if ($index eq '*') {
2316      return $self->log->error("import_from_csv: cannot import to invalid index [$index]");
2317   }
2318   if ($type eq '*') {
2319      return $self->log->error("import_from_csv: cannot import to invalid type [$type]");
2320   }
2321
2322   $self->log->debug("input [$input_csv]");
2323   $self->log->debug("index [$index]");
2324   $self->log->debug("type [$type]");
2325
2326   my $count_before = 0;
2327   if ($self->is_index_exists($index)) {
2328      $count_before = $self->count($index, $type);
2329      if (! defined($count_before)) {
2330         return;
2331      }
2332      $self->log->info("import_from_csv: current index [$index] count is ".
2333         "[$count_before]");
2334   }
2335
2336   my $max = $self->max;
2337
2338   $self->open_bulk_mode($index, $type) or return;
2339
2340   $self->log->info("import_from_csv: importing file [$input_csv] to index [$index] ".
2341      "with type [$type]");
2342
2343   my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;
2344
2345   my $fc = Metabrik::File::Csv->new_from_brik_init($self) or return;
2346   $fc->separator(',');
2347   $fc->escape('\\');
2348   $fc->first_line_is_header(1);
2349   $fc->encoded_fields($self->csv_encoded_fields);
2350   $fc->object_fields($self->csv_object_fields);
2351
2352   my $refresh_interval;
2353   my $number_of_replicas;
2354   my $start = time();
2355   my $speed_settings = {};
2356   my $imported = 0;
2357   my $first = 1;
2358   my $read = 0;
2359   my $skipped_chunks = 0;
2360   my $start_time = time();
2361   while (my $this = $fc->read_next($input_csv)) {
2362      $read++;
2363
2364      my $h = {};
2365      my $id = $this->{_id};
2366      delete $this->{_id};
2367      for my $k (keys %$this) {
2368         my $value = $this->{$k};
2369         # We keep only fields when they have a value.
2370         # No need to index data that is empty.
2371         if (defined($value) && length($value)) {
2372            $h->{$k} = $value;
2373         }
2374      }
2375
2376      if (defined($cb)) {
2377         $h = $cb->($h);
2378         if (! defined($h)) {
2379            $self->log->error("import_from_csv: callback failed for index [$index] ".
2380               "at read [$read], skipping single entry");
2381            $skipped_chunks++;
2382            next;
2383         }
2384      }
2385
2386      my $r = $self->index_bulk($h, $index, $type, $hash, $id);
2387      if (! defined($r)) {
2388         $self->log->error("import_from_csv: bulk processing failed for index [$index] ".
2389            "at read [$read], skipping chunk");
2390         $skipped_chunks++;
2391         next;
2392      }
2393
2394      # Gather index settings, and set values for speed.
2395      # We don't do it earlier, cause we need index to be created,
2396      # and it should have been done from index_bulk Command.
2397      if ($first && $self->is_index_exists($index)) {
2398         # Save current values so we can restore them at the end of Command.
2399         # We ignore errors here, this is non-blocking for indexing.
2400         $refresh_interval = $self->get_index_refresh_interval($index);
2401         $refresh_interval = $refresh_interval->{$index};
2402         $number_of_replicas = $self->get_index_number_of_replicas($index);
2403         $number_of_replicas = $number_of_replicas->{$index};
2404         if ($self->use_indexing_optimizations) {
2405            $self->set_index_number_of_replicas($index, 0);
2406         }
2407         $self->set_index_refresh_interval($index, -1);
2408         $first = 0;
2409      }
2410
2411      # Log a status sometimes.
2412      if (! (++$imported % 100_000)) {
2413         my $now = time();
2414         $self->log->info("import_from_csv: imported [$imported] entries in ".
2415            ($now - $start)." second(s) to index [$index]");
2416         $start = time();
2417      }
2418
2419      # Limit import to specified maximum
2420      if ($max > 0 && $imported >= $max) {
2421         $self->log->info("import_from_csv: max import reached [$imported] for ".
2422            "index [$index], stopping");
2423         last;
2424      }
2425   }
2426
2427   $self->bulk_flush;
2428
2429   my $stop_time = time();
2430   my $duration = $stop_time - $start_time;
2431   my $eps = $imported / ($duration || 1);  # Avoid divide by zero error.
2432
2433   $self->refresh_index($index);
2434
2435   my $count_current = $self->count($index, $type) or return;
2436   $self->log->info("import_from_csv: after index [$index] count is [$count_current]");
2437
2438   my $skipped = 0;
2439   my $complete = (($count_current - $count_before) == $read) ? 1 : 0;
2440   if ($complete) {  # If complete, import has been retried, and everything is now ok.
2441      $imported = $read;
2442   }
2443   else {
2444      $skipped = $read - ($count_current - $count_before);
2445   }
2446
2447   my $result = {
2448      read => $read,
2449      imported => $imported,
2450      skipped => $skipped,
2451      previous_count => $count_before,
2452      current_count => $count_current,
2453      complete => $complete,
2454      duration => $duration,
2455      eps => $eps,
2456   };
2457
2458   # Say the file has been processed, and put resulting stats.
2459   $fd->write($result, $done) or return;
2460
2461   # Restore previous settings, if any
2462   if (defined($refresh_interval)) {
2463      $self->set_index_refresh_interval($index, $refresh_interval);
2464   }
2465   if (defined($number_of_replicas) && $self->use_indexing_optimizations) {
2466      $self->set_index_number_of_replicas($index, $number_of_replicas);
2467   }
2468
2469   return $result;
2470}
2471
2472#
2473# http://localhost:9200/_nodes/stats/process?pretty
2474#
2475# Search::Elasticsearch::Client::2_0::Direct::Nodes
2476#
2477sub get_stats_process {
2478   my $self = shift;
2479
2480   my $es = $self->_es;
2481   $self->brik_help_run_undef_arg('open', $es) or return;
2482
2483   my $r;
2484   eval {
2485      $r = $es->nodes->stats(
2486         metric => [ qw(process) ],
2487      );
2488   };
2489   if ($@) {
2490      chomp($@);
2491      return $self->log->error("get_stats_process: failed: [$@]");
2492   }
2493
2494   return $r;
2495}
2496
2497#
2498# curl http://localhost:9200/_nodes/process?pretty
2499#
2500# Search::Elasticsearch::Client::2_0::Direct::Nodes
2501#
2502sub get_process {
2503   my $self = shift;
2504
2505   my $es = $self->_es;
2506   $self->brik_help_run_undef_arg('open', $es) or return;
2507
2508   my $r;
2509   eval {
2510      $r = $es->nodes->info(
2511         metric => [ qw(process) ],
2512      );
2513   };
2514   if ($@) {
2515      chomp($@);
2516      return $self->log->error("get_process: failed: [$@]");
2517   }
2518
2519   return $r;
2520}
2521
2522#
2523# Search::Elasticsearch::Client::2_0::Direct::Cluster
2524#
2525sub get_cluster_state {
2526   my $self = shift;
2527
2528   my $es = $self->_es;
2529   $self->brik_help_run_undef_arg('open', $es) or return;
2530
2531   my $r;
2532   eval {
2533      $r = $es->cluster->state;
2534   };
2535   if ($@) {
2536      chomp($@);
2537      return $self->log->error("get_cluster_state: failed: [$@]");
2538   }
2539
2540   return $r;
2541}
2542
2543#
2544# Search::Elasticsearch::Client::2_0::Direct::Cluster
2545#
2546sub get_cluster_health {
2547   my $self = shift;
2548
2549   my $es = $self->_es;
2550   $self->brik_help_run_undef_arg('open', $es) or return;
2551
2552   my $r;
2553   eval {
2554      $r = $es->cluster->health;
2555   };
2556   if ($@) {
2557      chomp($@);
2558      return $self->log->error("get_cluster_health: failed: [$@]");
2559   }
2560
2561   return $r;
2562}
2563
2564#
2565# Search::Elasticsearch::Client::2_0::Direct::Cluster
2566#
2567sub get_cluster_settings {
2568   my $self = shift;
2569
2570   my $es = $self->_es;
2571   $self->brik_help_run_undef_arg('open', $es) or return;
2572
2573   my $r;
2574   eval {
2575      $r = $es->cluster->get_settings;
2576   };
2577   if ($@) {
2578      chomp($@);
2579      return $self->log->error("get_cluster_settings: failed: [$@]");
2580   }
2581
2582   return $r;
2583}
2584
2585#
2586# Search::Elasticsearch::Client::2_0::Direct::Cluster
2587#
2588sub put_cluster_settings {
2589   my $self = shift;
2590   my ($settings) = @_;
2591
2592   my $es = $self->_es;
2593   $self->brik_help_run_undef_arg('open', $es) or return;
2594   $self->brik_help_run_undef_arg('put_cluster_settings', $settings) or return;
2595   $self->brik_help_run_invalid_arg('put_cluster_settings', $settings, 'HASH') or return;
2596
2597   my %args = (
2598      body => $settings,
2599   );
2600
2601   my $r;
2602   eval {
2603      $r = $es->cluster->put_settings(%args);
2604   };
2605   if ($@) {
2606      chomp($@);
2607      return $self->log->error("put_cluster_settings: failed: [$@]");
2608   }
2609
2610   return $r;
2611}
2612
2613sub count_green_indices {
2614   my $self = shift;
2615
2616   my $get = $self->show_indices or return;
2617
2618   my $count = 0;
2619   for (@$get) {
2620      if (/^\s*green\s+/) {
2621         $count++;
2622      }
2623   }
2624
2625   return $count;
2626}
2627
2628sub count_yellow_indices {
2629   my $self = shift;
2630
2631   my $get = $self->show_indices or return;
2632
2633   my $count = 0;
2634   for (@$get) {
2635      if (/^\s*yellow\s+/) {
2636         $count++;
2637      }
2638   }
2639
2640   return $count;
2641}
2642
2643sub count_red_indices {
2644   my $self = shift;
2645
2646   my $get = $self->show_indices or return;
2647
2648   my $count = 0;
2649   for (@$get) {
2650      if (/^\s*red\s+/) {
2651         $count++;
2652      }
2653   }
2654
2655   return $count;
2656}
2657
2658sub count_indices {
2659   my $self = shift;
2660
2661   my $get = $self->show_indices or return;
2662
2663   return scalar @$get;
2664}
2665
2666sub list_indices_status {
2667   my $self = shift;
2668
2669   my $get = $self->show_indices or return;
2670
2671   my $count_red = 0;
2672   my $count_yellow = 0;
2673   my $count_green = 0;
2674   for (@$get) {
2675      if (/^\s*red\s+/) {
2676         $count_red++;
2677      }
2678      elsif (/^\s*yellow\s+/) {
2679         $count_yellow++;
2680      }
2681      elsif (/^\s*green\s+/) {
2682         $count_green++;
2683      }
2684   }
2685
2686   return {
2687      red => $count_red,
2688      yellow => $count_yellow,
2689      green => $count_green,
2690   };
2691}
2692
2693sub count_shards {
2694   my $self = shift;
2695
2696   my $indices = $self->get_indices or return;
2697
2698   my $count = 0;
2699   for (@$indices) {
2700      $count += $_->{shards};
2701   }
2702
2703   return $count;
2704}
2705
2706sub count_size {
2707   my $self = shift;
2708
2709   my $indices = $self->get_indices or return;
2710
2711   my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
2712   $fn->kibi_suffix("kb");
2713   $fn->mebi_suffix("mb");
2714   $fn->gibi_suffix("gb");
2715   $fn->kilo_suffix("KB");
2716   $fn->mega_suffix("MB");
2717   $fn->giga_suffix("GB");
2718
2719   my $size = 0;
2720   for (@$indices) {
2721      $size += $fn->to_number($_->{size});
2722   }
2723
2724   return $fn->from_number($size);
2725}
2726
2727sub count_total_size {
2728   my $self = shift;
2729
2730   my $indices = $self->get_indices or return;
2731
2732   my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
2733   $fn->kibi_suffix("kb");
2734   $fn->mebi_suffix("mb");
2735   $fn->gibi_suffix("gb");
2736   $fn->kilo_suffix("KB");
2737   $fn->mega_suffix("MB");
2738   $fn->giga_suffix("GB");
2739
2740   my $size = 0;
2741   for (@$indices) {
2742      $size += $fn->to_number($_->{total_size});
2743   }
2744
2745   return $fn->from_number($size);
2746}
2747
2748sub count_count {
2749   my $self = shift;
2750
2751   my $indices = $self->get_indices or return;
2752
2753   my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
2754   $fn->kilo_suffix('k');
2755   $fn->mega_suffix('m');
2756   $fn->giga_suffix('M');
2757
2758   my $count = 0;
2759   for (@$indices) {
2760      $count += $_->{count};
2761   }
2762
2763   return $fn->from_number($count);
2764}
2765
2766sub list_green_indices {
2767   my $self = shift;
2768
2769   my $get = $self->get_indices or return;
2770
2771   my @indices = ();
2772   for (@$get) {
2773      if ($_->{color} eq 'green') {
2774         push @indices, $_->{index};
2775      }
2776   }
2777
2778   return \@indices;
2779}
2780
2781sub list_yellow_indices {
2782   my $self = shift;
2783
2784   my $get = $self->get_indices or return;
2785
2786   my @indices = ();
2787   for (@$get) {
2788      if ($_->{color} eq 'yellow') {
2789         push @indices, $_->{index};
2790      }
2791   }
2792
2793   return \@indices;
2794}
2795
2796sub list_red_indices {
2797   my $self = shift;
2798
2799   my $get = $self->get_indices or return;
2800
2801   my @indices = ();
2802   for (@$get) {
2803      if ($_->{color} eq 'red') {
2804         push @indices, $_->{index};
2805      }
2806   }
2807
2808   return \@indices;
2809}
2810
2811#
2812# https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html
2813#
2814sub list_datatypes {
2815   my $self = shift;
2816
2817   return {
2818      core => [ qw(string long integer short byte double float data boolean binary) ],
2819   };
2820}
2821
2822#
2823# Return total hits for last www_search
2824#
2825sub get_hits_total {
2826   my $self = shift;
2827   my ($run) = @_;
2828
2829   $self->brik_help_run_undef_arg('get_hits_total', $run) or return;
2830
2831   if (ref($run) eq 'HASH') {
2832      if (exists($run->{hits}) && exists($run->{hits}{total})) {
2833         return $run->{hits}{total};
2834      }
2835   }
2836
2837   return $self->log->error("get_hits_total: last Command not compatible");
2838}
2839
2840sub disable_shard_allocation {
2841   my $self = shift;
2842
2843   my $settings = {
2844      persistent => {
2845         'cluster.routing.allocation.enable' => 'none',
2846      }
2847   };
2848
2849   return $self->put_cluster_settings($settings);
2850}
2851
2852sub enable_shard_allocation {
2853   my $self = shift;
2854
2855   my $settings = {
2856      persistent => { 
2857         'cluster.routing.allocation.enable' => 'all',
2858      }
2859   };
2860
2861   return $self->put_cluster_settings($settings);
2862}
2863
2864sub flush_synced {
2865   my $self = shift;
2866
2867   my $es = $self->_es;
2868   $self->brik_help_run_undef_arg('open', $es) or return;
2869
2870   my $r;
2871   eval {
2872      $r = $es->indices->flush_synced;
2873   };
2874   if ($@) {
2875      chomp($@);
2876      return $self->log->error("flush_synced: failed: [$@]");
2877   }
2878
2879   return $r;
2880}
2881
2882#
2883# https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html
2884#
2885# run client::elasticsearch create_snapshot_repository myrepo
2886#      "{ type => 'fs', settings => { compress => 'true', location => '/path/' } }"
2887#
2888# You have to set path.repo in elasticsearch.yml like:
2889# path.repo: ["/home/gomor/es-backups"]
2890#
2891# Search::Elasticsearch::Client::2_0::Direct::Snapshot
2892#
2893sub create_snapshot_repository {
2894   my $self = shift;
2895   my ($body, $repository_name) = @_;
2896
2897   my $es = $self->_es;
2898   $self->brik_help_run_undef_arg('open', $es) or return;
2899   $self->brik_help_run_undef_arg('create_snapshot_repository', $body) or return;
2900
2901   $repository_name ||= 'repository';
2902
2903   my %args = (
2904      repository => $repository_name,
2905      body => $body,
2906   );
2907
2908   my $r;
2909   eval {
2910      $r = $es->snapshot->create_repository(%args);
2911   };
2912   if ($@) {
2913      chomp($@);
2914      return $self->log->error("create_snapshot_repository: failed: [$@]");
2915   }
2916
2917   return $r;
2918}
2919
2920sub create_shared_fs_snapshot_repository {
2921   my $self = shift;
2922   my ($location, $repository_name) = @_;
2923
2924   $repository_name ||= 'repository';
2925   $self->brik_help_run_undef_arg('create_shared_fs_snapshot_repository', $location) or return;
2926
2927   if ($location !~ m{^/}) {
2928      return $self->log->error("create_shared_fs_snapshot_repository: you have to give ".
2929         "a full directory path, this one is invalid [$location]");
2930   }
2931
2932   my $body = {
2933      type => 'fs',
2934      settings => {
2935         compress => 'true',
2936         location => $location,
2937      },
2938   };
2939
2940   return $self->create_snapshot_repository($body, $repository_name);
2941}
2942
2943#
2944# Search::Elasticsearch::Client::2_0::Direct::Snapshot
2945#
2946sub get_snapshot_repositories {
2947   my $self = shift;
2948
2949   my $es = $self->_es;
2950   $self->brik_help_run_undef_arg('open', $es) or return;
2951
2952   my $r;
2953   eval {
2954      $r = $es->snapshot->get_repository;
2955   };
2956   if ($@) {
2957      chomp($@);
2958      return $self->log->error("get_snapshot_repositories: failed: [$@]");
2959   }
2960
2961   return $r;
2962}
2963
2964#
2965# Search::Elasticsearch::Client::2_0::Direct::Snapshot
2966#
2967sub get_snapshot_status {
2968   my $self = shift;
2969
2970   my $es = $self->_es;
2971   $self->brik_help_run_undef_arg('open', $es) or return;
2972
2973   my $r;
2974   eval {
2975      $r = $es->snapshot->status;
2976   };
2977   if ($@) {
2978      chomp($@);
2979      return $self->log->error("get_snapshot_status: failed: [$@]");
2980   }
2981
2982   return $r;
2983}
2984
2985#
2986# Search::Elasticsearch::Client::5_0::Direct::Snapshot
2987#
2988sub create_snapshot {
2989   my $self = shift;
2990   my ($snapshot_name, $repository_name, $body) = @_;
2991
2992   my $es = $self->_es;
2993   $self->brik_help_run_undef_arg('open', $es) or return;
2994
2995   $snapshot_name ||= 'snapshot';
2996   $repository_name ||= 'repository';
2997
2998   my %args = (
2999      repository => $repository_name,
3000      snapshot => $snapshot_name,
3001   );
3002   if (defined($body)) {
3003      $args{body} = $body;
3004   }
3005
3006   my $r;
3007   eval {
3008      $r = $es->snapshot->create(%args);
3009   };
3010   if ($@) {
3011      chomp($@);
3012      return $self->log->error("create_snapshot: failed: [$@]");
3013   }
3014
3015   return $r;
3016}
3017
3018sub create_snapshot_for_indices {
3019   my $self = shift;
3020   my ($indices, $snapshot_name, $repository_name) = @_;
3021
3022   $self->brik_help_run_undef_arg('create_snapshot_for_indices', $indices) or return;
3023
3024   $snapshot_name ||= 'snapshot';
3025   $repository_name ||= 'repository';
3026
3027   my $body = {
3028      indices => $indices,
3029   };
3030
3031   return $self->create_snapshot($snapshot_name, $repository_name, $body);
3032}
3033
3034sub is_snapshot_finished {
3035   my $self = shift;
3036
3037   my $status = $self->get_snapshot_status or return;
3038
3039   if (@{$status->{snapshots}} == 0) {
3040      return 1;
3041   }
3042
3043   return 0;
3044}
3045
3046sub get_snapshot_state {
3047   my $self = shift;
3048
3049   if ($self->is_snapshot_finished) {
3050      return $self->log->info("get_snapshot_state: is already finished");
3051   }
3052
3053   my $status = $self->get_snapshot_status or return;
3054
3055   my @indices_done = ();
3056   my @indices_not_done = ();
3057
3058   my $list = $status->{snapshots};
3059   for my $snapshot (@$list) {
3060      my $indices = $snapshot->{indices};
3061      for my $index (@$indices) {
3062         my $done = $index->{shards_stats}{done};
3063         if ($done) {
3064            push @indices_done, $index;
3065         }
3066         else {
3067            push @indices_not_done, $index;
3068         }
3069      }
3070   }
3071
3072   return { done => \@indices_done, not_done => \@indices_not_done };
3073}
3074
3075sub verify_snapshot_repository {
3076}
3077
3078sub delete_snapshot_repository {
3079   my $self = shift;
3080   my ($repository_name) = @_;
3081
3082   my $es = $self->_es;
3083   $self->brik_help_run_undef_arg('open', $es) or return;
3084   $self->brik_help_run_undef_arg('delete_snapshot_repository', $repository_name) or return;
3085
3086   my $r;
3087   eval {
3088      $r = $es->snapshot->delete_repository(
3089         repository => $repository_name,
3090      );
3091   };
3092   if ($@) {
3093      chomp($@);
3094      return $self->log->error("delete_snapshot_repository: failed: [$@]");
3095   }
3096
3097   return $r;
3098}
3099
3100sub get_snapshot {
3101   my $self = shift;
3102   my ($snapshot_name, $repository_name) = @_;
3103
3104   my $es = $self->_es;
3105   $self->brik_help_run_undef_arg('open', $es) or return;
3106
3107   $snapshot_name ||= 'snapshot';
3108   $repository_name ||= 'repository';
3109
3110   my $r;
3111   eval {
3112      $r = $es->snapshot->get(
3113         repository => $repository_name,
3114         snapshot => $snapshot_name,
3115      );
3116   };
3117   if ($@) {
3118      chomp($@);
3119      return $self->log->error("get_snapshot: failed: [$@]");
3120   }
3121
3122   return $r;
3123}
3124
3125#
3126# Search::Elasticsearch::Client::5_0::Direct::Snapshot
3127#
3128sub delete_snapshot {
3129   my $self = shift;
3130   my ($snapshot_name, $repository_name) = @_;
3131
3132   my $es = $self->_es;
3133   $self->brik_help_run_undef_arg('open', $es) or return;
3134   $self->brik_help_run_undef_arg('delete_snapshot', $snapshot_name) or return;
3135   $self->brik_help_run_undef_arg('delete_snapshot', $repository_name) or return;
3136
3137   my $timeout = $self->rtimeout;
3138
3139   my $r;
3140   eval {
3141      $r = $es->snapshot->delete(
3142         repository => $repository_name,
3143         snapshot => $snapshot_name,
3144         master_timeout => "${timeout}s",
3145      );
3146   };
3147   if ($@) {
3148      chomp($@);
3149      return $self->log->error("delete_snapshot: failed: [$@]");
3150   }
3151
3152   return $r;
3153}
3154
3155#
3156# https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html
3157#
3158sub restore_snapshot {
3159   my $self = shift;
3160   my ($snapshot_name, $repository_name, $body) = @_;
3161
3162   my $es = $self->_es;
3163   $snapshot_name ||= 'snapshot';
3164   $repository_name ||= 'repository';
3165   $self->brik_help_run_undef_arg('open', $es) or return;
3166   $self->brik_help_run_undef_arg('restore_snapshot', $snapshot_name) or return;
3167   $self->brik_help_run_undef_arg('restore_snapshot', $repository_name) or return;
3168
3169   my %args = (
3170      repository => $repository_name,
3171      snapshot => $snapshot_name,
3172   );
3173   if (defined($body)) {
3174      $args{body} = $body;
3175   }
3176
3177   my $r;
3178   eval {
3179      $r = $es->snapshot->restore(%args);
3180   };
3181   if ($@) {
3182      chomp($@);
3183      return $self->log->error("restore_snapshot: failed: [$@]");
3184   }
3185
3186   return $r;
3187}
3188
3189sub restore_snapshot_for_indices {
3190   my $self = shift;
3191   my ($indices, $snapshot_name, $repository_name) = @_;
3192
3193   $snapshot_name ||= 'snapshot';
3194   $repository_name ||= 'repository';
3195   $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $indices) or return;
3196   $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $snapshot_name) or return;
3197   $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $repository_name) or return;
3198
3199   my $body = {
3200      indices => $indices,
3201   };
3202
3203   return $self->restore_snapshot($snapshot_name, $repository_name, $body);
3204}
3205
3206# shard occupation
3207#
3208# curl -XGET "http://127.0.0.1:9200/_cat/shards?v
3209# Or https://www.elastic.co/guide/en/elasticsearch/reference/1.6/cluster-nodes-stats.html
3210#
3211# disk occuption:
3212# curl -XGET http://127.0.0.1:9200/_cat/nodes?h=ip,h,diskAvail,diskTotal
3213#
3214#
3215# Who is master: curl -XGET http://127.0.0.1:9200/_cat/master?v
3216#
3217
3218# Check memory lock
3219
3220# curl -XGET 'localhost:9200/_nodes?filter_path=**.mlockall&pretty'
3221# {
3222#  "nodes" : {
3223#    "3XXX" : {
3224#      "process" : {
3225#        "mlockall" : true
3226#      }
3227#    }
3228#  }
3229# }
3230
32311;
3232
3233__END__
3234
3235=head1 NAME
3236
3237Metabrik::Client::Elasticsearch - client::elasticsearch Brik
3238
3239=head1 SYNOPSIS
3240
3241   host:~> my $q = { term => { ip => "192.168.57.19" } }
3242   host:~> run client::elasticsearch open
3243   host:~> run client::elasticsearch query $q data-*
3244
3245=head1 DESCRIPTION
3246
3247Template to write a new Metabrik Brik.
3248
3249=head1 COPYRIGHT AND LICENSE
3250
3251Copyright (c) 2014-2018, Patrice E<lt>GomoRE<gt> Auffret
3252
3253You may distribute this module under the terms of The BSD 3-Clause License.
3254See LICENSE file in the source distribution archive.
3255
3256=head1 AUTHOR
3257
3258Patrice E<lt>GomoRE<gt> Auffret
3259
3260=cut
Note: See TracBrowser for help on using the repository browser.