source: repository/lib/Metabrik/Client/Elasticsearch.pm

Last change on this file was 918:aa1dbe000fdc, checked in by GomoR <gomor@…>, 2 months ago
  • bugfix: client::elasticsearch: correctly display usage for query Command on invalid call
  • bugfix: client::elasticsearch::query: use size Attribute when querying
  • bugfix: lookup::iplocation: shorten error message when IPv4 has no location found in from_ip Command
File size: 71.8 KB
Line 
1#
2# $Id$
3#
4# client::elasticsearch Brik
5#
6package Metabrik::Client::Elasticsearch;
7use strict;
8use warnings;
9
10use base qw(Metabrik::Client::Rest);
11
12sub brik_properties {
13   return {
14      revision => '$Revision$',
15      tags => [ qw(unstable es es) ],
16      author => 'GomoR <GomoR[at]metabrik.org>',
17      license => 'http://opensource.org/licenses/BSD-3-Clause',
18      attributes => {
19         nodes => [ qw(node_list) ],
20         cxn_pool => [ qw(Sniff|Static|Static::NoPing) ],
21         date => [ qw(date) ],
22         index => [ qw(index) ],
23         type => [ qw(type) ],
24         from => [ qw(number) ],
25         size => [ qw(count) ],
26         max => [ qw(count) ],
27         max_flush_count => [ qw(count) ],
28         max_flush_size => [ qw(count) ],
29         rtimeout => [ qw(seconds) ],
30         sniff_rtimeout => [ qw(seconds) ],
31         try => [ qw(count) ],
32         use_bulk_autoflush => [ qw(0|1) ],
33         use_indexing_optimizations => [ qw(0|1) ],
34         csv_encoded_fields => [ qw(fields) ],
35         csv_object_fields => [ qw(fields) ],
36         _es => [ qw(INTERNAL) ],
37         _bulk => [ qw(INTERNAL) ],
38         _scroll => [ qw(INTERNAL) ],
39      },
40      attributes_default => {
41         nodes => [ qw(http://localhost:9200) ],
42         cxn_pool => 'Sniff',
43         from => 0,
44         size => 10,
45         max => 0,
46         index => '*',
47         type => '*',
48         rtimeout => 60,
49         sniff_rtimeout => 3,
50         try => 3,
51         max_flush_count => 1_000,
52         max_flush_size => 1_000_000,
53         use_bulk_autoflush => 1,
54         use_indexing_optimizations => 0,
55      },
56      commands => {
57         open => [ qw(nodes_list|OPTIONAL cxn_pool|OPTIONAL) ],
58         open_bulk_mode => [ qw(index|OPTIONAL type|OPTIONAL) ],
59         open_scroll_scan_mode => [ qw(index|OPTIONAL size|OPTIONAL) ],
60         open_scroll => [ qw(index|OPTIONAL size|OPTIONAL) ],
61         close_scroll => [ ],
62         total_scroll => [ ],
63         next_scroll => [ ],
64         index_document => [ qw(document index|OPTIONAL type|OPTIONAL id|OPTIONAL) ],
65         index_bulk => [ qw(document index|OPTIONAL type|OPTIONAL id|OPTIONAL) ],
66         bulk_flush => [ ],
67         query => [ qw($query_hash index|OPTIONAL type|OPTIONAL) ],
68         count => [ qw(index|OPTIONAL type|OPTIONAL) ],
69         get_from_id => [ qw(id index|OPTIONAL type|OPTIONAL) ],
70         www_search => [ qw(query index|OPTIONAL type|OPTIONAL) ],
71         delete_index => [ qw(index|indices_list) ],
72         update_alias => [ qw(new_index alias) ],
73         delete_document => [ qw(index type id) ],
74         delete_by_query => [ qw($query_hash index type) ],
75         show_indices => [ qw(string_filter|OPTIONAL) ],
76         show_nodes => [ ],
77         show_health => [ ],
78         show_recovery => [ ],
79         list_indices => [ ],
80         get_indices => [ ],
81         get_index => [ qw(index|indices_list) ],
82         list_index_types => [ qw(index) ],
83         list_index_fields => [ qw(index) ],
84         list_indices_version => [ qw(index|indices_list) ],
85         open_index => [ qw(index|indices_list) ],
86         close_index => [ qw(index|indices_list) ],
87         get_aliases => [ qw(index) ],
88         put_alias => [ qw(index alias) ],
89         delete_alias => [ qw(index alias) ],
90         get_mappings => [ qw(index type|OPTIONAL) ],
91         create_index => [ qw(index) ],
92         create_index_with_mappings => [ qw(index mappings) ],
93         info => [ qw(nodes_list|OPTIONAL) ],
94         version => [ qw(nodes_list|OPTIONAL) ],
95         get_templates => [ ],
96         list_templates => [ ],
97         get_template => [ qw(name) ],
98         put_template => [ qw(name template) ],
99         put_template_from_json_file => [ qw(file) ],
100         get_settings => [ qw(index|indices_list|OPTIONAL name|names_list|OPTIONAL) ],
101         put_settings => [ qw(settings_hash index|indices_list|OPTIONAL) ],
102         set_index_number_of_replicas => [ qw(index|indices_list number) ],
103         set_index_refresh_interval => [ qw(index|indices_list number) ],
104         get_index_number_of_replicas => [ qw(index|indices) ],
105         get_index_refresh_interval => [ qw(index|indices_list) ],
106         get_index_number_of_shards => [ qw(index|indices_list) ],
107         delete_template => [ qw(name) ],
108         is_index_exists => [ qw(index) ],
109         is_type_exists => [ qw(index type) ],
110         is_document_exists => [ qw(index type document) ],
111         parse_error_string => [ qw(string) ],
112         refresh_index => [ qw(index) ],
113         export_as_csv => [ qw(index size|OPTIONAL) ],
114         import_from_csv => [ qw(input_csv index|OPTIONAL type|OPTIONAL size|OPTIONAL) ],
115         get_stats_process => [ ],
116         get_process => [ ],
117         get_cluster_state => [ ],
118         get_cluster_health => [ ],
119         get_cluster_settings => [ ],
120         put_cluster_settings => [ qw(settings) ],
121         count_green_indices => [ ],
122         count_yellow_indices => [ ],
123         count_red_indices => [ ],
124         list_green_indices => [ ],
125         list_yellow_indices => [ ],
126         list_red_indices => [ ],
127         count_indices => [ ],
128         list_indices_status => [ ],
129         count_shards => [ ],
130         count_size => [ ],
131         count_total_size => [ ],
132         count_count => [ ],
133         list_datatypes => [ ],
134         get_hits_total => [ ],
135         disable_shard_allocation => [ ],
136         enable_shard_allocation => [ ],
137         flush_synced => [ ],
138         create_snapshot_repository => [ qw(body repository_name|OPTIONAL) ],
139         create_shared_fs_snapshot_repository => [ qw(location
140            repository_name|OPTIONAL) ],
141         get_snapshot_repositories => [ ],
142         get_snapshot_status => [ ],
143         delete_snapshot_repository => [ qw(repository_name) ],
144         create_snapshot => [ qw(snapshot_name|OPTIONAL repository_name|OPTIONAL
145            body|OPTIONAL) ],
146         create_snapshot_for_indices => [ qw(indices snapshot_name|OPTIONAL
147            repository_name|OPTIONAL) ],
148         is_snapshot_finished => [ ],
149         get_snapshot_state => [ ],
150         get_snapshot => [ qw(snapshot_name|OPTIONAL repository_name|OPTIONAL) ],
151         delete_snapshot => [ qw(snapshot_name repository_name) ],
152         restore_snapshot => [ qw(snapshot_name repository_name body|OPTIONAL) ],
153         restore_snapshot_for_indices => [ qw(indices snapshot_name repository_name) ],
154      },
155      require_modules => {
156         'Metabrik::String::Json' => [ ],
157         'Metabrik::File::Csv' => [ ],
158         'Metabrik::File::Json' => [ ],
159         'Metabrik::File::Dump' => [ ],
160         'Metabrik::Format::Number' => [ ],
161         'Search::Elasticsearch' => [ ],
162      },
163   };
164}
165
166sub brik_preinit {
167   my $self = shift;
168
169   eval("use Search::Elasticsearch;");
170   if ($Search::Elasticsearch::VERSION < 5) {
171      $self->log->error("brik_preinit: please upgrade Search::Elasticsearch module ".
172         "with: run perl::module install Search::Elasticsearch");
173   }
174
175   return $self->SUPER::brik_preinit;
176}
177
178sub open {
179   my $self = shift;
180   my ($nodes, $cxn_pool) = @_;
181
182   $nodes ||= $self->nodes;
183   $cxn_pool ||= $self->cxn_pool;
184   $self->brik_help_run_undef_arg('open', $nodes) or return;
185   $self->brik_help_run_undef_arg('open', $cxn_pool) or return;
186   $self->brik_help_run_invalid_arg('open', $nodes, 'ARRAY') or return;
187   $self->brik_help_run_empty_array_arg('open', $nodes) or return;
188
189   for my $node (@$nodes) {
190      if ($node !~ m{https?://}) {
191         return $self->log->error("open: invalid node[$node], must start with http(s)");
192      }
193   }
194
195   my $timeout = $self->rtimeout;
196
197   my $nodes_str = join('|', @$nodes);
198   $self->log->verbose("open: using nodes [$nodes_str]");
199
200   #
201   # Timeout description here:
202   #
203   # Search::Elasticsearch::Role::Cxn
204   #
205
206   my $es = Search::Elasticsearch->new(
207      nodes => $nodes,
208      cxn_pool => $cxn_pool,
209      timeout => $timeout,
210      max_retries => $self->try,
211      retry_on_timeout => 1,
212      sniff_timeout => $self->sniff_rtimeout, # seconds, default 1
213      request_timeout => 60,  # seconds, default 30
214      ping_timeout => 5,  # seconds, default 2
215      dead_timeout => 120,  # seconds, detault 60
216      max_dead_timeout => 3600,  # seconds, default 3600
217      sniff_request_timeout => 15, # seconds, default 2
218   );
219   if (! defined($es)) {
220      return $self->log->error("open: failed");
221   }
222
223   $self->_es($es);
224
225   return $nodes;
226}
227
228#
229# Search::Elasticsearch::Client::5_0::Bulk
230#
231sub open_bulk_mode {
232   my $self = shift;
233   my ($index, $type) = @_;
234
235   $index ||= $self->index;
236   $type ||= $self->type;
237   my $es = $self->_es;
238   $self->brik_help_run_undef_arg('open', $es) or return;
239   $self->brik_help_run_undef_arg('open_bulk_mode', $index) or return;
240   $self->brik_help_run_undef_arg('open_bulk_mode', $type) or return;
241
242   my %args = (
243      index => $index,
244      type => $type,
245   );
246
247   if ($self->use_bulk_autoflush) {
248      my $max_count = $self->max_flush_count || 1_000;
249      my $max_size = $self->max_flush_size || 1_000_000;
250
251      $args{max_count} = $max_count;
252      $args{max_size} = $max_size;
253      $args{max_time} = 0;
254
255      $self->log->info("open_bulk_mode: opening with max_flush_count [$max_count] and ".
256         "max_flush_size [$max_size]");
257   }
258   else {
259      $args{max_count} = 0;
260      $args{max_size} = 0;
261      $args{max_time} = 0;
262      $args{on_error} = undef;
263      #$args{on_success} = sub {
264         #my ($action, $response, $i) = @_;
265      #};
266
267      $self->log->info("open_bulk_mode: opening without automatic flushing");
268   }
269
270   my $bulk = $es->bulk_helper(%args);
271   if (! defined($bulk)) {
272      return $self->log->error("open_bulk_mode: failed");
273   }
274
275   $self->_bulk($bulk);
276
277   return $self->nodes;
278}
279
280sub open_scroll_scan_mode {
281   my $self = shift;
282   my ($index, $size) = @_;
283
284   my $version = $self->version or return;
285   if ($version ge "5.0.0") {
286      return $self->log->error("open_scroll_scan_mode: Command not supported for ES version ".
287         "$version, try open_scroll Command instead");
288   }
289
290   $index ||= $self->index;
291   $size ||= $self->size;
292   my $es = $self->_es;
293   $self->brik_help_run_undef_arg('open', $es) or return;
294   $self->brik_help_run_undef_arg('open_scroll_scan_mode', $index) or return;
295   $self->brik_help_run_undef_arg('open_scroll_scan_mode', $size) or return;
296
297   my $scroll = $es->scroll_helper(
298      index => $index,
299      search_type => 'scan',
300      size => $size,
301   );
302   if (! defined($scroll)) {
303      return $self->log->error("open_scroll_scan_mode: failed");
304   }
305
306   $self->_scroll($scroll);
307
308   return $self->nodes;
309}
310
311#
312# Search::Elasticsearch::Client::5_0::Scroll
313#
314sub open_scroll {
315   my $self = shift;
316   my ($index, $size) = @_;
317
318   my $version = $self->version or return;
319   if ($version lt "5.0.0") {
320      return $self->log->error("open_scroll: Command not supported for ES version ".
321         "$version, try open_scroll_scan_mode Command instead");
322   }
323
324   $index ||= $self->index;
325   $size ||= $self->size;
326   my $es = $self->_es;
327   $self->brik_help_run_undef_arg('open', $es) or return;
328   $self->brik_help_run_undef_arg('open_scroll', $index) or return;
329   $self->brik_help_run_undef_arg('open_scroll', $size) or return;
330
331   my $timeout = $self->rtimeout;
332
333   #
334   # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html
335   #
336   my $scroll = $es->scroll_helper(
337      scroll => "${timeout}s",
338      scroll_in_qs => 1,  # By default (0), pass scroll_id in request body. When 1, pass
339                          # it in query string.
340      index => $index,
341      size => $size,
342      body => {
343         sort => [ qw(_doc) ],
344      },
345   );
346   if (! defined($scroll)) {
347      return $self->log->error("open_scroll: failed");
348   }
349
350   $self->_scroll($scroll);
351
352   $self->log->info("open_scroll: opened with size [$size] and timeout [${timeout}s]");
353
354   return $self->nodes;
355}
356
357#
358# Search::Elasticsearch::Client::5_0::Scroll
359#
360sub close_scroll {
361   my $self = shift;
362
363   my $scroll = $self->_scroll;
364   if (! defined($scroll)) {
365      return 1;
366   }
367
368   $scroll->finish;
369   $self->_scroll(undef);
370
371   return 1;
372}
373
374sub total_scroll {
375   my $self = shift;
376
377   my $scroll = $self->_scroll;
378   $self->brik_help_run_undef_arg('open_scroll', $scroll) or return;
379
380   my $total;
381   eval {
382      $total = $scroll->total;
383   };
384   if ($@) {
385      chomp($@);
386      return $self->log->error("total_scroll: failed with: [$@]");
387   }
388
389   return $total;
390}
391
392sub next_scroll {
393   my $self = shift;
394
395   my $scroll = $self->_scroll;
396   $self->brik_help_run_undef_arg('open_scroll', $scroll) or return;
397
398   my $next;
399   eval {
400      $next = $scroll->next;
401   };
402   if ($@) {
403      chomp($@);
404      return $self->log->error("next_scroll: failed with: [$@]");
405   }
406
407   return $next;
408}
409
410sub index_document {
411   my $self = shift;
412   my ($doc, $index, $type, $id) = @_;
413
414   $index ||= $self->index;
415   $type ||= $self->type;
416   my $es = $self->_es;
417   $self->brik_help_run_undef_arg('open', $es) or return;
418   $self->brik_help_run_undef_arg('index_document', $doc) or return;
419   $self->brik_help_run_invalid_arg('index_document', $doc, 'HASH') or return;
420   $self->brik_help_set_undef_arg('index', $index) or return;
421   $self->brik_help_set_undef_arg('type', $type) or return;
422
423   my %args = (
424      index => $index,
425      type => $type,
426      body => $doc,
427   );
428   if (defined($id)) {
429      $args{id} = $id;
430   }
431
432   my $r;
433   eval {
434      $r = $es->index(%args);
435   };
436   if ($@) {
437      chomp($@);
438      return $self->log->error("index_document: index failed for index [$index]: [$@]");
439   }
440
441   return $r;
442}
443
444#
445# Search::Elasticsearch::Client::5_0::Bulk
446#
447sub index_bulk {
448   my $self = shift;
449   my ($doc, $index, $type, $id) = @_;
450
451   my $bulk = $self->_bulk;
452   $index ||= $self->index;
453   $type ||= $self->type;
454   $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
455   $self->brik_help_run_undef_arg('index_bulk', $doc) or return;
456   $self->brik_help_set_undef_arg('index', $index) or return;
457   $self->brik_help_set_undef_arg('type', $type) or return;
458
459   my %args = (
460      source => $doc,
461   );
462   if (defined($id)) {
463      $args{id} = $id;
464   }
465
466   my $r;
467   eval {
468      #$r = $bulk->index(\%args);
469      $r = $bulk->add_action(index => \%args);
470   };
471   if ($@) {
472      chomp($@);
473      my $p = $self->parse_error_string($@);
474      if (defined($p) && exists($p->{class})) {
475         my $class = $p->{class};
476         my $code = $p->{code};
477         my $node = $p->{node};
478         return $self->log->error("index_bulk: failed for index [$index] with error ".
479            "[$class] code [$code] for node [$node]");
480      }
481      else {
482         return $self->log->error("index_bulk: index failed for index [$index]: [$@]");
483      }
484   }
485
486   return $r;
487}
488
489sub bulk_flush {
490   my $self = shift;
491
492   my $bulk = $self->_bulk;
493   $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
494
495   my $try = $self->try;
496
497RETRY:
498
499   my $r;
500   eval {
501      $r = $bulk->flush;
502   };
503   if ($@) {
504      if (--$try == 0) {
505         chomp($@);
506         my $p = $self->parse_error_string($@);
507         if (defined($p) && exists($p->{class})) {
508            my $class = $p->{class};
509            my $code = $p->{code};
510            my $node = $p->{node};
511            return $self->log->error("bulk_flush: failed after [$try] tries with error ".
512               "[$class] code [$code] for node [$node]");
513         }
514         else {
515            return $self->log->error("bulk_flush: failed after [$try]: [$@]");
516         }
517      }
518      sleep 60;
519      goto RETRY;
520   }
521
522   return $r;
523}
524
525#
526# Search::Elasticsearch::Client::2_0::Direct
527# Search::Elasticsearch::Client::5_0::Direct
528#
529sub count {
530   my $self = shift;
531   my ($index, $type) = @_;
532
533   $index ||= $self->index;
534   $type ||= $self->type;
535   my $es = $self->_es;
536   $self->brik_help_run_undef_arg('open', $es) or return;
537
538   my %args = ();
539   if (defined($index) && $index ne '*') {
540      $args{index} = $index;
541   }
542   if (defined($type) && $type ne '*') {
543      $args{type} = $type;
544   }
545
546   #$args{body} = {
547      #query => {
548         #match => { title => 'Elasticsearch clients' },
549      #},
550   #}
551
552   my $r;
553   my $version = $self->version or return;
554   if ($version ge "5.0.0") {
555      eval {
556         $r = $es->count(%args);
557      };
558   }
559   else {
560      eval {
561         $r = $es->search(
562            index => $index,
563            type => $type,
564            search_type => 'count',
565            body => {
566               query => {
567                  match_all => {},
568               },
569            },
570         );
571      };
572   }
573   if ($@) {
574      chomp($@);
575      return $self->log->error("count: count failed for index [$index]: [$@]");
576   }
577
578   if ($version ge "5.0.0") {
579      if (exists($r->{count})) {
580         return $r->{count};
581      }
582   }
583   elsif (exists($r->{hits}) && exists($r->{hits}{total})) {
584      return $r->{hits}{total};
585   }
586
587   return $self->log->error("count: nothing found");
588}
589
590#
591# https://www.elastic.co/guide/en/elasticsearch/reference/current/full-text-queries.html
592# https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html
593#
594# Example: my $q = { query => { term => { ip => "192.168.57.19" } } }
595#
596sub query {
597   my $self = shift;
598   my ($query, $index, $type) = @_;
599
600   $index ||= $self->index;
601   $type ||= $self->type;
602   my $es = $self->_es;
603   $self->brik_help_run_undef_arg('open', $es) or return;
604   $self->brik_help_run_undef_arg('query', $query) or return;
605   $self->brik_help_set_undef_arg('index', $index) or return;
606   $self->brik_help_set_undef_arg('type', $type) or return;
607   $self->brik_help_run_invalid_arg('query', $query, 'HASH') or return;
608
609   my $timeout = $self->rtimeout;
610
611   my %args = (
612      index => $index,
613      body => $query,
614   );
615
616   if ($type ne '*') {
617      $args{type} = $type;
618   }
619
620   my $r;
621   eval {
622      $r = $es->search(%args);
623   };
624   if ($@) {
625      chomp($@);
626      return $self->log->error("query: failed for index [$index]: [$@]");
627   }
628
629   return $r;
630}
631
632sub get_from_id {
633   my $self = shift;
634   my ($id, $index, $type) = @_;
635
636   $index ||= $self->index;
637   $type ||= $self->type;
638   my $es = $self->_es;
639   $self->brik_help_run_undef_arg('open', $es) or return;
640   $self->brik_help_run_undef_arg('get_from_id', $id) or return;
641   $self->brik_help_set_undef_arg('index', $index) or return;
642   $self->brik_help_set_undef_arg('type', $type) or return;
643
644   my $r;
645   eval {
646      $r = $es->get(
647         index => $index,
648         type => $type,
649         id => $id,
650      );
651   };
652   if ($@) {
653      chomp($@);
654      return $self->log->error("get_from_id: get failed for index [$index]: [$@]");
655   }
656
657   return $r;
658}
659
660#
661# https://www.elastic.co/guide/en/elasticsearch/reference/current/search-uri-request.html
662#
663sub www_search {
664   my $self = shift;
665   my ($query, $index, $type) = @_;
666
667   $index ||= $self->index;
668   $type ||= $self->type;
669   $self->brik_help_run_undef_arg('www_search', $query) or return;
670   $self->brik_help_set_undef_arg('index', $index) or return;
671   $self->brik_help_set_undef_arg('type', $type) or return;
672
673   my $from = $self->from;
674   my $size = $self->size;
675
676   my $sj = Metabrik::String::Json->new_from_brik_init($self) or return;
677
678   my $nodes = $self->nodes;
679   for my $node (@$nodes) {
680      # http://localhost:9200/INDEX/TYPE/_search/?size=SIZE&q=QUERY
681      my $url = "$node/$index";
682      if ($type ne '*') {
683         $url .= "/$type";
684      }
685      $url .= "/_search/?from=$from&size=$size&q=".$query;
686
687      my $get = $self->SUPER::get($url) or next;
688      my $body = $get->{content};
689
690      my $decoded = $sj->decode($body) or next;
691
692      return $decoded;
693   }
694
695   return;
696}
697
698#
699# Search::Elasticsearch::Client::2_0::Direct::Indices
700#
701sub delete_index {
702   my $self = shift;
703   my ($index) = @_;
704
705   my $es = $self->_es;
706   $self->brik_help_run_undef_arg('open', $es) or return;
707   $self->brik_help_run_undef_arg('delete_index', $index) or return;
708   $self->brik_help_run_invalid_arg('delete_index', $index, 'ARRAY', 'SCALAR') or return;
709
710   my %args = (
711      index => $index,
712   );
713
714   my $r;
715   eval {
716      $r = $es->indices->delete(%args);
717   };
718   if ($@) {
719      chomp($@);
720      return $self->log->error("delete_index: delete failed for index [$index]: [$@]");
721   }
722
723   return $r;
724}
725
726#
727# Search::Elasticsearch::Client::2_0::Direct::Indices
728#
729sub delete_document {
730   my $self = shift;
731   my ($index, $type, $id) = @_;
732
733   my $es = $self->_es;
734   $self->brik_help_run_undef_arg('open', $es) or return;
735   $self->brik_help_run_undef_arg('delete_document', $index) or return;
736   $self->brik_help_run_undef_arg('delete_document', $type) or return;
737   $self->brik_help_run_undef_arg('delete_document', $id) or return;
738
739   my %args = (
740      index => $index,
741      type => $type,
742      id => $id,
743   );
744
745   my $r;
746   eval {
747      $r = $es->delete(%args);
748   };
749   if ($@) {
750      chomp($@);
751      return $self->log->error("delete_document: delete failed for index [$index]: [$@]");
752   }
753
754   return $r;
755}
756
757#
758# https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html
759#
760# Example: my $q = { query => { term => { ip => "192.168.57.19" } } }
761#
762sub delete_by_query {
763   my $self = shift;
764   my ($query, $index, $type) = @_;
765
766   my $es = $self->_es;
767   $self->brik_help_run_undef_arg('open', $es) or return;
768   $self->brik_help_run_undef_arg('delete_by_query', $query) or return;
769   $self->brik_help_run_undef_arg('delete_by_query', $index) or return;
770   $self->brik_help_run_undef_arg('delete_by_query', $type) or return;
771   $self->brik_help_run_invalid_arg('delete_by_query', $query, 'HASH') or return;
772
773   my $timeout = $self->rtimeout;
774
775   my %args = (
776      index => $index,
777      type => $type,
778      body => $query,
779   );
780
781   my $r;
782   eval {
783      $r = $es->delete_by_query(%args);
784   };
785   if ($@) {
786      chomp($@);
787      return $self->log->error("delete_by_query: failed for index [$index]: [$@]");
788   }
789
790   # This may fail, we ignore it.
791   $self->refresh_index($index);
792
793   return $r;
794}
795
796#
797# Search::Elasticsearch::Client::2_0::Direct::Cat
798#
799sub show_indices {
800   my $self = shift;
801   my ($string) = @_;
802
803   my $es = $self->_es;
804   $self->brik_help_run_undef_arg('open', $es) or return;
805
806   my $r;
807   eval {
808      $r = $es->cat->indices;
809   };
810   if ($@) {
811      chomp($@);
812      return $self->log->error("show_indices: failed: [$@]");
813   }
814
815   my @lines = split(/\n/, $r);
816
817   if (@lines == 0) {
818      $self->log->warning("show_indices: nothing returned, no index?");
819   }
820
821   my @filtered = ();
822   if (defined($string)) {
823      for (@lines) {
824         if (m{$string}) {
825            push @filtered, $_;
826         }
827      }
828      @lines = @filtered;
829   }
830
831   return \@lines;
832}
833
834#
835# Search::Elasticsearch::Client::2_0::Direct::Cat
836#
837sub show_nodes {
838   my $self = shift;
839
840   my $es = $self->_es;
841   $self->brik_help_run_undef_arg('open', $es) or return;
842
843   my $r;
844   eval {
845      $r = $es->cat->nodes;
846   };
847   if ($@) {
848      chomp($@);
849      return $self->log->error("show_nodes: failed: [$@]");
850   }
851
852   my @lines = split(/\n/, $r);
853
854   if (@lines == 0) {
855      $self->log->warning("show_nodes: nothing returned, no nodes?");
856   }
857
858   return \@lines;
859}
860
861#
862# Search::Elasticsearch::Client::2_0::Direct::Cat
863#
864sub show_health {
865   my $self = shift;
866
867   my $es = $self->_es;
868   $self->brik_help_run_undef_arg('open', $es) or return;
869
870   my $r;
871   eval {
872      $r = $es->cat->health;
873   };
874   if ($@) {
875      chomp($@);
876      return $self->log->error("show_health: failed: [$@]");
877   }
878
879   my @lines = split(/\n/, $r);
880
881   if (@lines == 0) {
882      $self->log->warning("show_health: nothing returned, no recovery?");
883   }
884
885   return \@lines;
886}
887
888#
889# Search::Elasticsearch::Client::2_0::Direct::Cat
890#
891sub show_recovery {
892   my $self = shift;
893
894   my $es = $self->_es;
895   $self->brik_help_run_undef_arg('open', $es) or return;
896
897   my $r;
898   eval {
899      $r = $es->cat->recovery;
900   };
901   if ($@) {
902      chomp($@);
903      return $self->log->error("show_recovery: failed: [$@]");
904   }
905
906   my @lines = split(/\n/, $r);
907
908   if (@lines == 0) {
909      $self->log->warning("show_recovery: nothing returned, no index?");
910   }
911
912   return \@lines;
913}
914
915sub list_indices {
916   my $self = shift;
917
918   my $get = $self->get_indices or return;
919
920   my @indices = ();
921   for (@$get) {
922      push @indices, $_->{index};
923   }
924
925   return [ sort { $a cmp $b } @indices ];
926}
927
928sub get_indices {
929   my $self = shift;
930
931   my $lines = $self->show_indices or return;
932   if (@$lines == 0) {
933      $self->log->warning("get_indices: no index found");
934      return [];
935   }
936
937   #
938   # Format depends on ElasticSearch version. We try to detect the format.
939   #
940   # 5.0.0:
941   # "yellow open www-2016-08-14 BmNE9RaBRSCKqB5Oe8yZcw 5 1  146 0 251.8kb 251.8kb"
942   #
943   my @indices = ();
944   for (@$lines) {
945      my @t = split(/\s+/);
946      if (@t == 10) {  # Version 5.0.0
947         my $color = $t[0];
948         my $state = $t[1];
949         my $index = $t[2];
950         my $id = $t[3];
951         my $shards = $t[4];
952         my $replicas = $t[5];
953         my $count = $t[6];
954         my $count2 = $t[7];
955         my $total_size = $t[8];
956         my $size = $t[9];
957         push @indices, {
958            color => $color,
959            state => $state,
960            index => $index,
961            id => $id,
962            shards => $shards,
963            replicas => $replicas,
964            count => $count,
965            total_size => $total_size,
966            size => $size,
967         };
968      }
969      elsif (@t == 9) {
970         my $index = $t[2];
971         push @indices, {
972            index => $index,
973         };
974      }
975      elsif (@t == 8) {
976         my $index = $t[1];
977         push @indices, {
978            index => $index,
979         };
980      }
981   }
982
983   return \@indices;
984}
985
986#
987# Search::Elasticsearch::Client::5_0::Direct::Indices
988#
989sub get_index {
990   my $self = shift;
991   my ($index) = @_;
992 
993   my $es = $self->_es;
994   $self->brik_help_run_undef_arg('open', $es) or return;
995   $self->brik_help_run_undef_arg('get_index', $index) or return;
996   $self->brik_help_run_invalid_arg('get_index', $index, 'ARRAY', 'SCALAR') or return;
997
998   my %args = (
999      index => $index,
1000   );
1001
1002   my $r;
1003   eval {
1004      $r = $es->indices->get(%args);
1005   };
1006   if ($@) {
1007      chomp($@);
1008      return $self->log->error("get_index: get failed for index [$index]: [$@]");
1009   }
1010
1011   return $r;
1012}
1013
1014sub list_index_types {
1015   my $self = shift;
1016   my ($index) = @_;
1017
1018   my $es = $self->_es;
1019   $self->brik_help_run_undef_arg('open', $es) or return;
1020   $self->brik_help_run_undef_arg('list_index_types', $index) or return;
1021   $self->brik_help_run_invalid_arg('list_index_types', $index, 'SCALAR') or return;
1022
1023   my $r = $self->get_mappings($index) or return;
1024   if (keys %$r > 1) {
1025      return $self->log->error("list_index_types: multiple indices found, choose one");
1026   }
1027
1028   my @types = ();
1029   for my $this_index (keys %$r) {
1030      my $mappings = $r->{$this_index}{mappings};
1031      push @types, keys %$mappings;
1032   }
1033
1034   my %uniq = map { $_ => 1 } @types;
1035
1036   return [ sort { $a cmp $b } keys %uniq ];
1037}
1038
1039#
1040# By default, if you provide only one index and no type,
1041# all types will be merged (including _default_)
1042# If you specify one type (other than _default_), _default_ will be merged to it.
1043#
1044sub list_index_fields {
1045   my $self = shift;
1046   my ($index, $type) = @_;
1047
1048   my $es = $self->_es;
1049   $self->brik_help_run_undef_arg('open', $es) or return;
1050   $self->brik_help_run_undef_arg('list_index_fields', $index) or return;
1051   $self->brik_help_run_invalid_arg('list_index_fields', $index, 'SCALAR') or return;
1052
1053   my $r;
1054   if (defined($type)) {
1055      $r = $self->get_mappings($index, $type) or return;
1056      if (keys %$r > 1) {
1057         return $self->log->error("list_index_fields: multiple indices found, ".
1058            "choose one");
1059      }
1060      my $r2 = $self->get_mappings($index, '_default_') or return;
1061      # Merge
1062      for my $this_index (keys %$r2) {
1063         my $default = $r2->{$this_index}{mappings}{'_default_'};
1064         $r->{$this_index}{mappings}{_default_} = $default;
1065      }
1066   }
1067   else {
1068      $r = $self->get_mappings($index) or return;
1069      if (keys %$r > 1) {
1070         return $self->log->error("list_index_fields: multiple indices found, ".
1071            "choose one");
1072      }
1073   }
1074
1075   my @fields = ();
1076   for my $this_index (keys %$r) {
1077      my $mappings = $r->{$this_index}{mappings};
1078      for my $this_type (keys %$mappings) {
1079         my $properties = $mappings->{$this_type}{properties};
1080         push @fields, keys %$properties;
1081      }
1082   }
1083
1084   my %uniq = map { $_ => 1 } @fields;
1085
1086   return [ sort { $a cmp $b } keys %uniq ];
1087}
1088
1089sub list_indices_version {
1090   my $self = shift;
1091   my ($index) = @_;
1092
1093   my $es = $self->_es;
1094   $self->brik_help_run_undef_arg('open', $es) or return;
1095   $self->brik_help_run_undef_arg('list_indices_version', $index) or return;
1096   $self->brik_help_run_invalid_arg('list_indices_version', $index, 'ARRAY', 'SCALAR')
1097      or return;
1098
1099   my $r = $self->get_index($index) or return;
1100
1101   my @list = ();
1102   for my $this (keys %$r) {
1103      my $name = $this;
1104      my $version = $r->{$this}{settings}{index}{version}{created};
1105      push @list, {
1106         index => $name,
1107         version => $version,
1108      };
1109   }
1110
1111   return \@list;
1112}
1113
1114sub open_index {
1115   my $self = shift;
1116   my ($index) = @_;
1117
1118   my $es = $self->_es;
1119   $self->brik_help_run_undef_arg('open', $es) or return;
1120   $self->brik_help_run_undef_arg('open_index', $index) or return;
1121   $self->brik_help_run_invalid_arg('open_index', $index, 'ARRAY', 'SCALAR') or return;
1122
1123   my $r;
1124   eval {
1125      $r = $es->indices->open(
1126         index => $index,
1127      );
1128   };
1129   if ($@) {
1130      chomp($@);
1131      return $self->log->error("open_index: failed: [$@]");
1132   }
1133
1134   return $r;
1135}
1136
1137sub close_index {
1138   my $self = shift;
1139   my ($index) = @_;
1140
1141   my $es = $self->_es;
1142   $self->brik_help_run_undef_arg('open', $es) or return;
1143   $self->brik_help_run_undef_arg('close_index', $index) or return;
1144   $self->brik_help_run_invalid_arg('close_index', $index, 'ARRAY', 'SCALAR') or return;
1145
1146   my $r;
1147   eval {
1148      $r = $es->indices->close(
1149         index => $index,
1150      );
1151   };
1152   if ($@) {
1153      chomp($@);
1154      return $self->log->error("close_index: failed: [$@]");
1155   }
1156
1157   return $r;
1158}
1159
1160#
1161# Search::Elasticsearch::Client::5_0::Direct::Indices
1162#
1163sub get_aliases {
1164   my $self = shift;
1165   my ($index) = @_;
1166
1167   $index ||= $self->index;
1168   my $es = $self->_es;
1169   $self->brik_help_run_undef_arg('open', $es) or return;
1170
1171   my %args = (
1172      index => $index,
1173   );
1174
1175   my $r;
1176   eval {
1177      $r = $es->indices->get(%args);
1178   };
1179   if ($@) {
1180      chomp($@);
1181      return $self->log->error("get_aliases: get_aliases failed: [$@]");
1182   }
1183
1184   my %aliases = ();
1185   for my $this (keys %$r) {
1186      $aliases{$this} = $r->{$this}{aliases};
1187   }
1188
1189   return \%aliases;
1190}
1191
1192#
1193# Search::Elasticsearch::Client::5_0::Direct::Indices
1194#
1195sub put_alias {
1196   my $self = shift;
1197   my ($index, $alias) = @_;
1198
1199   my $es = $self->_es;
1200   $self->brik_help_run_undef_arg('open', $es) or return;
1201   $self->brik_help_run_undef_arg('put_alias', $index) or return;
1202   $self->brik_help_run_undef_arg('put_alias', $alias) or return;
1203
1204   my %args = (
1205      index => $index,
1206      name => $alias,
1207   );
1208
1209   my $r;
1210   eval {
1211      $r = $es->indices->put_alias(%args);
1212   };
1213   if ($@) {
1214      chomp($@);
1215      return $self->log->error("put_alias: put_alias failed: [$@]");
1216   }
1217
1218   return $r;
1219}
1220
1221#
1222# Search::Elasticsearch::Client::5_0::Direct::Indices
1223#
1224sub delete_alias {
1225   my $self = shift;
1226   my ($index, $alias) = @_;
1227
1228   my $es = $self->_es;
1229   $self->brik_help_run_undef_arg('open', $es) or return;
1230   $self->brik_help_run_undef_arg('delete_alias', $index) or return;
1231   $self->brik_help_run_undef_arg('delete_alias', $alias) or return;
1232
1233   my %args = (
1234      index => $index,
1235      name => $alias,
1236   );
1237
1238   my $r;
1239   eval {
1240      $r = $es->indices->delete_alias(%args);
1241   };
1242   if ($@) {
1243      chomp($@);
1244      return $self->log->error("delete_alias: delete_alias failed: [$@]");
1245   }
1246
1247   return $r;
1248}
1249
1250sub update_alias {
1251   my $self = shift;
1252   my ($new_index, $alias) = @_;
1253
1254   my $es = $self->_es;
1255   $self->brik_help_run_undef_arg('open', $es) or return;
1256   $self->brik_help_run_undef_arg('update_alias', $new_index) or return;
1257   $self->brik_help_run_undef_arg('update_alias', $alias) or return;
1258
1259   # Search for previous index with that alias, if any.
1260   my $prev_index;
1261   my $aliases = $self->get_aliases or return;
1262   while (my ($k, $v) = each %$aliases) {
1263      for my $this (keys %$v) {
1264         if ($this eq $alias) {
1265            $prev_index = $k;
1266            last;
1267         }
1268      }
1269      last if $prev_index;
1270   }
1271
1272   # Delete previous alias if it exists.
1273   if (defined($prev_index)) {
1274      $self->delete_alias($prev_index, $alias) or return;
1275   }
1276
1277   return $self->put_alias($new_index, $alias);
1278}
1279
1280#
1281# Search::Elasticsearch::Client::2_0::Direct::Indices
1282#
1283sub get_mappings {
1284   my $self = shift;
1285   my ($index, $type) = @_;
1286
1287   my $es = $self->_es;
1288   $self->brik_help_run_undef_arg('open', $es) or return;
1289   $self->brik_help_run_undef_arg('get_mappings', $index) or return;
1290   $self->brik_help_run_invalid_arg('get_mappings', $index, 'ARRAY', 'SCALAR') or return;
1291
1292   my %args = (
1293      index => $index,
1294      type => $type,
1295   );
1296
1297   my $r;
1298   eval {
1299      $r = $es->indices->get_mapping(%args);
1300   };
1301   if ($@) {
1302      chomp($@);
1303      return $self->log->error("get_mappings: get_mapping failed for index [$index]: ".
1304         "[$@]");
1305   }
1306
1307   return $r;
1308}
1309
1310#
1311# Search::Elasticsearch::Client::2_0::Direct::Indices
1312#
1313sub create_index {
1314   my $self = shift;
1315   my ($index, $shards_count) = @_;
1316
1317   my $es = $self->_es;
1318   $self->brik_help_run_undef_arg('open', $es) or return;
1319   $self->brik_help_run_undef_arg('create_index', $index) or return;
1320         
1321   my $r;
1322   eval {
1323      $r = $es->indices->create(
1324         index => $index,
1325      );
1326   };
1327   if ($@) {
1328      chomp($@);
1329      return $self->log->error("create_index: create failed for index [$index]: [$@]");
1330   }
1331   
1332   return $r;
1333}
1334
1335#
1336# https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html
1337#
1338sub create_index_with_mappings {
1339   my $self = shift;
1340   my ($index, $mappings) = @_;
1341
1342   my $es = $self->_es;
1343   $self->brik_help_run_undef_arg('open', $es) or return;
1344   $self->brik_help_run_undef_arg('create_index_with_mappings', $index) or return;
1345   $self->brik_help_run_undef_arg('create_index_with_mappings', $mappings) or return;
1346   $self->brik_help_run_invalid_arg('create_index_with_mappings', $mappings, 'HASH') or return;
1347
1348   my $r;
1349   eval {
1350      $r = $es->indices->create(
1351         index => $index,
1352         body => {
1353            mappings => $mappings,
1354         },
1355      );
1356   };
1357   if ($@) {
1358      chomp($@);
1359      return $self->log->error("create_index_with_mappings: create failed for index [$index]: [$@]");
1360   }
1361
1362   return $r;
1363}
1364
1365# GET http://localhost:9200/
1366sub info {
1367   my $self = shift;
1368   my ($nodes) = @_;
1369
1370   $nodes ||= $self->nodes;
1371   $self->brik_help_run_undef_arg('info', $nodes) or return;
1372   $self->brik_help_run_invalid_arg('info', $nodes, 'ARRAY') or return;
1373   $self->brik_help_run_empty_array_arg('info', $nodes) or return;
1374
1375   my $first = $nodes->[0];
1376
1377   $self->get($first) or return;
1378
1379   return $self->content;
1380}
1381
1382sub version {
1383   my $self = shift;
1384   my ($nodes) = @_;
1385
1386   $nodes ||= $self->nodes;
1387   $self->brik_help_run_undef_arg('version', $nodes) or return;
1388   $self->brik_help_run_invalid_arg('version', $nodes, 'ARRAY') or return;
1389   $self->brik_help_run_empty_array_arg('version', $nodes) or return;
1390
1391   my $first = $nodes->[0];
1392
1393   $self->get($first) or return;
1394   my $content = $self->content or return;
1395
1396   return $content->{version}{number};
1397}
1398
1399#
1400# Search::Elasticsearch::Client::2_0::Direct::Indices
1401#
1402sub get_templates {
1403   my $self = shift;
1404
1405   my $es = $self->_es;
1406   $self->brik_help_run_undef_arg('open', $es) or return;
1407
1408   my $r;
1409   eval {
1410      $r = $es->indices->get_template;
1411   };
1412   if ($@) {
1413      chomp($@);
1414      return $self->log->error("get_templates: failed: [$@]");
1415   }
1416
1417   return $r;
1418}
1419
1420sub list_templates {
1421   my $self = shift;
1422
1423   my $content = $self->get_templates or return;
1424
1425   return [ sort { $a cmp $b } keys %$content ];
1426}
1427
1428#
1429# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
1430#
1431sub get_template {
1432   my $self = shift;
1433   my ($template) = @_;
1434
1435   my $es = $self->_es;
1436   $self->brik_help_run_undef_arg('open', $es) or return;
1437   $self->brik_help_run_undef_arg('get_template', $template) or return;
1438
1439   my $r;
1440   eval {
1441      $r = $es->indices->get_template(
1442         name => $template,
1443      );
1444   };
1445   if ($@) {
1446      chomp($@);
1447      return $self->log->error("get_template: template failed for name [$template]: [$@]");
1448   }
1449
1450   return $r;
1451}
1452
1453#
1454# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
1455#
1456sub put_template {
1457   my $self = shift;
1458   my ($name, $template) = @_;
1459
1460   my $es = $self->_es;
1461   $self->brik_help_run_undef_arg('open', $es) or return;
1462   $self->brik_help_run_undef_arg('put_template', $name) or return;
1463   $self->brik_help_run_undef_arg('put_template', $template) or return;
1464   $self->brik_help_run_invalid_arg('put_template', $template, 'HASH') or return;
1465
1466   my $r;
1467   eval {
1468      $r = $es->indices->put_template(
1469         name => $name,
1470         body => $template,
1471      );
1472   };
1473   if ($@) {
1474      chomp($@);
1475      return $self->log->error("put_template: template failed for name [$name]: [$@]");
1476   }
1477
1478   return $r;
1479}
1480
1481sub put_template_from_json_file {
1482   my $self = shift;
1483   my ($json_file) = @_;
1484
1485   my $es = $self->_es;
1486   $self->brik_help_run_undef_arg('open', $es) or return;
1487   $self->brik_help_run_undef_arg('put_template_from_json_file', $json_file) or return;
1488   $self->brik_help_run_file_not_found('put_template_from_json_file', $json_file) or return;
1489
1490   my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
1491   my $data = $fj->read($json_file) or return;
1492
1493   if (! exists($data->{template})) {
1494      return $self->log->error("put_template_from_json_file: no template name found");
1495   }
1496
1497   my $name = $data->{template};
1498
1499   return $self->put_template($name, $data);
1500}
1501
1502#
1503# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-settings.html
1504# Search::Elasticsearch::Client::2_0::Direct::Indices
1505#
1506sub get_settings {
1507   my $self = shift;
1508   my ($indices, $names) = @_;
1509
1510   my $es = $self->_es;
1511   $self->brik_help_run_undef_arg('open', $es) or return;
1512
1513   my %args = ();
1514   if (defined($indices)) {
1515      $self->brik_help_run_undef_arg('get_settings', $indices) or return;
1516      my $ref = $self->brik_help_run_invalid_arg('get_settings', $indices, 'ARRAY', 'SCALAR')
1517         or return;
1518      $args{index} = $indices;
1519   }
1520   if (defined($names)) {
1521      $self->brik_help_run_file_not_found('get_settings', $names) or return;
1522      my $ref = $self->brik_help_run_invalid_arg('get_settings', $names, 'ARRAY', 'SCALAR')
1523         or return;
1524      $args{name} = $names;
1525   }
1526
1527   my $r;
1528   eval {
1529      $r = $es->indices->get_settings(%args);
1530   };
1531   if ($@) {
1532      chomp($@);
1533      return $self->log->error("get_settings: failed: [$@]");
1534   }
1535
1536   return $r;
1537}
1538
1539#
1540# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-settings.html
1541# Search::Elasticsearch::Client::2_0::Direct::Indices
1542#
1543# Example:
1544#
1545# run client::elasticsearch put_settings "{ index => { refresh_interval => -1 } }"
1546#
1547# XXX: should be renamed to put_index_settings
1548#
1549sub put_settings {
1550   my $self = shift;
1551   my ($settings, $indices) = @_;
1552
1553   my $es = $self->_es;
1554   $self->brik_help_run_undef_arg('open', $es) or return;
1555   $self->brik_help_run_undef_arg('put_settings', $settings) or return;
1556   $self->brik_help_run_invalid_arg('put_settings', $settings, 'HASH') or return;
1557
1558   my %args = (
1559      body => $settings,
1560   );
1561   if (defined($indices)) {
1562      $self->brik_help_run_undef_arg('put_settings', $indices) or return;
1563      my $ref = $self->brik_help_run_invalid_arg('put_settings', $indices, 'ARRAY', 'SCALAR')
1564         or return;
1565      $args{index} = $indices;
1566   }
1567
1568   my $r;
1569   eval {
1570      $r = $es->indices->put_settings(%args);
1571   };
1572   if ($@) {
1573      chomp($@);
1574      return $self->log->error("put_settings: failed: [$@]");
1575   }
1576
1577   return $r;
1578}
1579
1580sub set_index_number_of_replicas {
1581   my $self = shift;
1582   my ($indices, $number) = @_;
1583
1584   my $es = $self->_es;
1585   $self->brik_help_run_undef_arg('open', $es) or return;
1586   $self->brik_help_run_undef_arg('set_index_number_of_replicas', $indices) or return;
1587   $self->brik_help_run_invalid_arg('set_index_number_of_replicas', $indices, 'ARRAY', 'SCALAR')
1588      or return;
1589
1590   my $settings = { number_of_replicas => $number };
1591
1592   return $self->put_settings($settings, $indices);
1593}
1594
1595sub set_index_refresh_interval {
1596   my $self = shift;
1597   my ($indices, $number) = @_;
1598
1599   my $es = $self->_es;
1600   $self->brik_help_run_undef_arg('open', $es) or return;
1601   $self->brik_help_run_undef_arg('set_index_refresh_interval', $indices) or return;
1602   $self->brik_help_run_invalid_arg('set_index_refresh_interval', $indices, 'ARRAY', 'SCALAR')
1603      or return;
1604
1605   # If there is a meaningful value not postfixed with a unity,
1606   # we default to add a `s' for a number of seconds.
1607   if ($number =~ /^\d+$/ && $number > 0) {
1608      $number .= 's';
1609   }
1610
1611   my $settings = { refresh_interval => $number };
1612
1613   return $self->put_settings($settings, $indices);
1614}
1615
1616sub get_index_number_of_replicas {
1617   my $self = shift;
1618   my ($indices) = @_;
1619
1620   my $es = $self->_es;
1621   $self->brik_help_run_undef_arg('open', $es) or return;
1622   $self->brik_help_run_undef_arg('get_index_number_of_replicas', $indices) or return;
1623   $self->brik_help_run_invalid_arg('get_index_number_of_replicas', $indices, 'ARRAY', 'SCALAR')
1624      or return;
1625
1626   my $settings = $self->get_settings($indices);
1627
1628   my %indices = ();
1629   for (keys %$settings) {
1630      $indices{$_} = $settings->{$_}{settings}{index}{number_of_replicas};
1631   }
1632
1633   return \%indices;
1634}
1635
1636sub get_index_refresh_interval {
1637   my $self = shift;
1638   my ($indices, $number) = @_;
1639
1640   my $es = $self->_es;
1641   $self->brik_help_run_undef_arg('open', $es) or return;
1642   $self->brik_help_run_undef_arg('get_index_refresh_interval', $indices) or return;
1643   $self->brik_help_run_invalid_arg('get_index_refresh_interval', $indices, 'ARRAY', 'SCALAR')
1644      or return;
1645
1646   my $settings = $self->get_settings($indices);
1647
1648   my %indices = ();
1649   for (keys %$settings) {
1650      $indices{$_} = $settings->{$_}{settings}{index}{refresh_interval};
1651   }
1652
1653   return \%indices;
1654}
1655
1656sub get_index_number_of_shards {
1657   my $self = shift;
1658   my ($indices, $number) = @_;
1659
1660   my $es = $self->_es;
1661   $self->brik_help_run_undef_arg('open', $es) or return;
1662   $self->brik_help_run_undef_arg('get_index_number_of_shards', $indices) or return;
1663   $self->brik_help_run_invalid_arg('get_index_number_of_shards', $indices, 'ARRAY', 'SCALAR')
1664      or return;
1665
1666   my $settings = $self->get_settings($indices);
1667
1668   my %indices = ();
1669   for (keys %$settings) {
1670      $indices{$_} = $settings->{$_}{settings}{index}{number_of_shards};
1671   }
1672
1673   return \%indices;
1674}
1675
1676#
1677# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
1678#
1679sub delete_template {
1680   my $self = shift;
1681   my ($name) = @_;
1682
1683   my $es = $self->_es;
1684   $self->brik_help_run_undef_arg('open', $es) or return;
1685   $self->brik_help_run_undef_arg('delete_template', $name) or return;
1686
1687   my $r;
1688   eval {
1689      $r = $es->indices->delete_template(
1690         name => $name,
1691      );
1692   };
1693   if ($@) {
1694      chomp($@);
1695      return $self->log->error("delete_template: failed for name [$name]: [$@]");
1696   }
1697
1698   return $r;
1699}
1700
1701#
1702# Return a boolean to state for index existence
1703#
1704sub is_index_exists {
1705   my $self = shift;
1706   my ($index) = @_;
1707
1708   my $es = $self->_es;
1709   $self->brik_help_run_undef_arg('open', $es) or return;
1710   $self->brik_help_run_undef_arg('is_index_exists', $index) or return;
1711
1712   my $r;
1713   eval {
1714      $r = $es->indices->exists(
1715         index => $index,
1716      );
1717   };
1718   if ($@) {
1719      chomp($@);
1720      return $self->log->error("is_index_exists: failed for index [$index]: [$@]");
1721   }
1722
1723   return $r ? 1 : 0;
1724}
1725
1726#
1727# Return a boolean to state for index with type existence
1728#
1729sub is_type_exists {
1730   my $self = shift;
1731   my ($index, $type) = @_;
1732
1733   my $es = $self->_es;
1734   $self->brik_help_run_undef_arg('open', $es) or return;
1735   $self->brik_help_run_undef_arg('is_type_exists', $index) or return;
1736   $self->brik_help_run_undef_arg('is_type_exists', $type) or return;
1737
1738   my $r;
1739   eval {
1740      $r = $es->indices->exists_type(
1741         index => $index,
1742         type => $type,
1743      );
1744   };
1745   if ($@) {
1746      chomp($@);
1747      return $self->log->error("is_type_exists: failed for index [$index] and ".
1748         "type [$type]: [$@]");
1749   }
1750
1751   return $r ? 1 : 0;
1752}
1753
1754#
1755# Return a boolean to state for document existence
1756#
1757sub is_document_exists {
1758   my $self = shift;
1759   my ($index, $type, $document) = @_;
1760
1761   my $es = $self->_es;
1762   $self->brik_help_run_undef_arg('open', $es) or return;
1763   $self->brik_help_run_undef_arg('is_document_exists', $index) or return;
1764   $self->brik_help_run_undef_arg('is_document_exists', $type) or return;
1765   $self->brik_help_run_undef_arg('is_document_exists', $document) or return;
1766   $self->brik_help_run_invalid_arg('is_document_exists', $document, 'HASH') or return;
1767
1768   my $r;
1769   eval {
1770      $r = $es->exists(
1771         index => $index,
1772         type => $type,
1773         %$document,
1774      );
1775   };
1776   if ($@) {
1777      chomp($@);
1778      return $self->log->error("is_document_exists: failed for index [$index] and ".
1779         "type [$type]: [$@]");
1780   }
1781
1782   return $r ? 1 : 0;
1783}
1784
1785sub parse_error_string {
1786   my $self = shift;
1787   my ($string) = @_;
1788
1789   $self->brik_help_run_undef_arg('parse_error_string', $string) or return;
1790
1791   # [Timeout] ** [http://X.Y.Z.1:9200]-[599] Timed out while waiting for socket to become ready for reading, called from sub Search::Elasticsearch::Role::Client::Direct::__ANON__ at /usr/local/lib/perl5/site_perl/Metabrik/Client/Elasticsearch.pm line 1466. With vars: {'status_code' => 599,'request' => {'body' => undef,'qs' => {},'ignore' => [],'serialize' => 'std','path' => '/index-thing/_refresh','method' => 'POST'}}
1792
1793   my ($class, $node, $code, $message, $dump) = $string =~
1794      m{^\[([^]]+)\] \*\* \[([^]]+)\]\-\[(\d+)\] (.+)\. With vars: (.+)$};
1795
1796   if (defined($dump) && length($dump)) {
1797      my $sd = Metabrik::String::Dump->new_from_brik_init($self) or return;
1798      $dump = $sd->decode($dump);
1799   }
1800
1801   # Sanity check
1802   if ($node =~ m{^http} && $code =~ m{^\d+$} && defined($dump) && ref($dump) eq 'HASH') {
1803      return {
1804         class => $class,
1805         node => $node,
1806         code => $code,
1807         message => $message,
1808         dump => $dump,
1809      };
1810   }
1811
1812   # Were not able to decode, we return as-is.
1813   return {
1814      message => $string,
1815   };
1816}
1817
1818#
1819# Refresh an index to receive latest additions
1820#
1821# Search::Elasticsearch::Client::5_0::Direct::Indices
1822# https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-refresh.html
1823#
1824sub refresh_index {
1825   my $self = shift;
1826   my ($index) = @_;
1827
1828   my $es = $self->_es;
1829   $self->brik_help_run_undef_arg('open', $es) or return;
1830   $self->brik_help_run_undef_arg('refresh_index', $index) or return;
1831
1832   my $try = $self->try;
1833
1834RETRY:
1835
1836   my $r;
1837   eval {
1838      $r = $es->indices->refresh(
1839         index => $index,
1840      );
1841   };
1842   if ($@) {
1843      if (--$try == 0) {
1844         chomp($@);
1845         my $p = $self->parse_error_string($@);
1846         if (defined($p) && exists($p->{class})) {
1847            my $class = $p->{class};
1848            my $code = $p->{code};
1849            my $node = $p->{node};
1850            return $self->log->error("refresh_index: failed for index [$index] ".
1851               "after [$try] tries with error [$class] code [$code] for node [$node]");
1852         }
1853         else {
1854            return $self->log->error("refresh_index: failed for index [$index] ".
1855               "after [$try]: [$@]");
1856         }
1857      }
1858      sleep 60;
1859      goto RETRY;
1860   }
1861
1862   return $r;
1863}
1864
1865sub export_as_csv {
1866   my $self = shift;
1867   my ($index, $size) = @_;
1868
1869   $size ||= 10_000;
1870   my $es = $self->_es;
1871   $self->brik_help_run_undef_arg('open', $es) or return;
1872   $self->brik_help_run_undef_arg('export_as_csv', $index) or return;
1873   $self->brik_help_run_undef_arg('export_as_csv', $size) or return;
1874
1875   my $max = $self->max;
1876
1877   my $scroll;
1878   my $version = $self->version or return;
1879   if ($version lt "5.0.0") {
1880      $scroll = $self->open_scroll_scan_mode($index, $size) or return;
1881   }
1882   else {
1883      $scroll = $self->open_scroll($index, $size) or return;
1884   }
1885
1886   my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;
1887
1888   my $fc = Metabrik::File::Csv->new_from_brik_init($self) or return;
1889   $fc->separator(',');
1890   $fc->escape('\\');
1891   $fc->append(1);
1892   $fc->first_line_is_header(0);
1893   $fc->write_header(1);
1894   $fc->use_quoting(1);
1895   $fc->encoded_fields($self->csv_encoded_fields);
1896   $fc->object_fields($self->csv_object_fields);
1897
1898   my $total = $self->total_scroll;
1899   $self->log->info("export_as_csv: total [$total] for index [$index]");
1900
1901   my $h = {};
1902   my %types = ();
1903   my $read = 0;
1904   my $skipped = 0;
1905   my $exported = 0;
1906   my $start = time();
1907   my $done = 'output.exported';
1908   my $start_time = time();
1909   while (my $this = $self->next_scroll) {
1910      $read++;
1911      my $id = $this->{_id};
1912      my $doc = $this->{_source};
1913      my $type = $this->{_type};
1914      if (! exists($types{$type})) {
1915         my $fields = $self->list_index_fields($index, $type) or return;
1916         #$types{$type}{header} = [ '_id', sort { $a cmp $b } keys %$doc ];
1917         $types{$type}{header} = [ '_id', @$fields ];
1918         $types{$type}{output} = "$index:$type.csv";
1919         $done = $types{$type}{output_exported} = "$index:$type.csv.exported";
1920
1921         # Verify it has not been exported yet
1922         if (-f $types{$type}{output_exported}) {
1923            return $self->log->error("export_as_csv: export already done for index ".
1924               "[$index] with type [$type] and file [$index:$type.csv]");
1925         }
1926
1927         $self->log->info("export_as_csv: exporting to file [$index:$type.csv] ".
1928            "for new type [$type], using chunk size of [$size]");
1929      }
1930
1931      $h->{_id} = $id;
1932
1933      for my $k (keys %$doc) {
1934         $h->{$k} = $doc->{$k};
1935      }
1936
1937      $fc->header($types{$type}{header});
1938      my $r = $fc->write([ $h ], $types{$type}{output});
1939      if (!defined($r)) {
1940         $self->log->warning("export_as_csv: unable to process entry, skipping");
1941         $skipped++;
1942         next;
1943      }
1944
1945      # Log a status sometimes.
1946      if (! (++$exported % 100_000)) {
1947         my $now = time();
1948         $self->log->info("export_as_csv: fetched [$exported/$total] elements in ".
1949            ($now - $start)." second(s) from index [$index]");
1950         $start = time();
1951      }
1952
1953      # Limit export to specified maximum
1954      if ($max > 0 && $exported >= $max) {
1955         $self->log->info("export_as_csv: max export reached [$exported] for index ".
1956            "[$index], stopping");
1957         last;
1958      }
1959   }
1960
1961   $self->close_scroll;
1962
1963   my $stop_time = time();
1964   my $duration = $stop_time - $start_time;
1965   my $eps = $exported;
1966   if ($duration > 0) {
1967      $eps = $exported / $duration;
1968   }
1969
1970   my $result = {
1971      read => $read,
1972      exported => $exported,
1973      skipped => $read - $exported,
1974      total_count => $total,
1975      complete => ($exported == $total) ? 1 : 0,
1976      duration => $duration,
1977      eps => $eps, 
1978   };
1979
1980   # Say the file has been processed, and put resulting stats.
1981   $fd->write($result, $done) or return;
1982
1983   return $result;
1984}
1985
1986#
1987# Optimization instructions:
1988# https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html
1989#
1990sub import_from_csv {
1991   my $self = shift;
1992   my ($input_csv, $index, $type) = @_;
1993
1994   my $es = $self->_es;
1995   $self->brik_help_run_undef_arg('open', $es) or return;
1996   $self->brik_help_run_undef_arg('import_from_csv', $input_csv) or return;
1997   $self->brik_help_run_file_not_found('import_from_csv', $input_csv) or return;
1998
1999   # If index and/or types are not defined, we try to get them from input filename
2000   if (! defined($index) || ! defined($type)) {
2001      # Example: index-DATE:type.csv
2002      if ($input_csv =~ m{^(.+):(.+)\.csv(?:.*)?$}) {
2003         my ($this_index, $this_type) = $input_csv =~ m{^(.+):(.+)\.csv(?:.*)?$};
2004         $index ||= $this_index;
2005         $type ||= $this_type;
2006      }
2007   }
2008
2009   # Verify it has not been indexed yet
2010   my $done = "$input_csv.imported";
2011   if (-f $done) {
2012      $self->log->info("import_from_csv: import already done for file [$input_csv]");
2013      return 0;
2014   }
2015
2016   # And default to Attributes if guess failed.
2017   $index ||= $self->index;
2018   $type ||= $self->type;
2019   $self->brik_help_set_undef_arg('index', $index) or return;
2020   $self->brik_help_set_undef_arg('type', $type) or return;
2021
2022   if ($index eq '*') {
2023      return $self->log->error("import_from_csv: cannot import to invalid index [$index]");
2024   }
2025   if ($type eq '*') {
2026      return $self->log->error("import_from_csv: cannot import to invalid type [$type]");
2027   }
2028
2029   $self->log->debug("input [$input_csv]");
2030   $self->log->debug("index [$index]");
2031   $self->log->debug("type [$type]");
2032
2033   my $count_before = 0;
2034   if ($self->is_index_exists($index)) {
2035      $count_before = $self->count($index, $type);
2036      if (! defined($count_before)) {
2037         return;
2038      }
2039      $self->log->info("import_from_csv: current index [$index] count is ".
2040         "[$count_before]");
2041   }
2042
2043   my $max = $self->max;
2044
2045   $self->open_bulk_mode($index, $type) or return;
2046
2047   $self->log->info("import_from_csv: importing file [$input_csv] to index [$index] ".
2048      "with type [$type]");
2049
2050   my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;
2051
2052   my $fc = Metabrik::File::Csv->new_from_brik_init($self) or return;
2053   $fc->separator(',');
2054   $fc->escape('\\');
2055   $fc->first_line_is_header(1);
2056   $fc->encoded_fields($self->csv_encoded_fields);
2057   $fc->object_fields($self->csv_object_fields);
2058
2059   my $refresh_interval;
2060   my $number_of_replicas;
2061   my $start = time();
2062   my $speed_settings = {};
2063   my $imported = 0;
2064   my $first = 1;
2065   my $read = 0;
2066   my $skipped_chunks = 0;
2067   my $start_time = time();
2068   while (my $this = $fc->read_next($input_csv)) {
2069      $read++;
2070
2071      my $h = {};
2072      my $id = $this->{_id};
2073      delete $this->{_id};
2074      for my $k (keys %$this) {
2075         my $value = $this->{$k};
2076         # We keep only fields when they have a value.
2077         # No need to index data that is empty.
2078         if (defined($value) && length($value)) {
2079            $h->{$k} = $value;
2080         }
2081      }
2082
2083      my $r = $self->index_bulk($h, $index, $type, $id);
2084      if (! defined($r)) {
2085         $self->log->error("import_from_csv: bulk processing failed for index [$index] ".
2086            "at read [$read], skipping chunk");
2087         $skipped_chunks++;
2088         next;
2089      }
2090
2091      # Gather index settings, and set values for speed.
2092      # We don't do it earlier, cause we need index to be created,
2093      # and it should have been done from index_bulk Command.
2094      if ($first && $self->is_index_exists($index)) {
2095         # Save current values so we can restore them at the end of Command.
2096         # We ignore errors here, this is non-blocking for indexing.
2097         $refresh_interval = $self->get_index_refresh_interval($index);
2098         $refresh_interval = $refresh_interval->{$index};
2099         $number_of_replicas = $self->get_index_number_of_replicas($index);
2100         $number_of_replicas = $number_of_replicas->{$index};
2101         if ($self->use_indexing_optimizations) {
2102            $self->set_index_number_of_replicas($index, 0);
2103         }
2104         $self->set_index_refresh_interval($index, -1);
2105         $first = 0;
2106      }
2107
2108      # Log a status sometimes.
2109      if (! (++$imported % 100_000)) {
2110         my $now = time();
2111         $self->log->info("import_from_csv: imported [$imported] entries in ".
2112            ($now - $start)." second(s) to index [$index]");
2113         $start = time();
2114      }
2115
2116      # Limit import to specified maximum
2117      if ($max > 0 && $imported >= $max) {
2118         $self->log->info("import_from_csv: max import reached [$imported] for ".
2119            "index [$index], stopping");
2120         last;
2121      }
2122   }
2123
2124   $self->bulk_flush;
2125
2126   my $stop_time = time();
2127   my $duration = $stop_time - $start_time;
2128   my $eps = $imported / ($duration || 1);  # Avoid divide by zero error.
2129
2130   $self->refresh_index($index);
2131
2132   my $count_current = $self->count($index, $type) or return;
2133   $self->log->info("import_from_csv: after index [$index] count is [$count_current]");
2134
2135   my $skipped = 0;
2136   my $complete = (($count_current - $count_before) == $read) ? 1 : 0;
2137   if ($complete) {  # If complete, import has been retried, and everything is now ok.
2138      $imported = $read;
2139   }
2140   else {
2141      $skipped = $read - ($count_current - $count_before);
2142   }
2143
2144   my $result = {
2145      read => $read,
2146      imported => $imported,
2147      skipped => $skipped,
2148      previous_count => $count_before,
2149      current_count => $count_current,
2150      complete => $complete,
2151      duration => $duration,
2152      eps => $eps,
2153   };
2154
2155   # Say the file has been processed, and put resulting stats.
2156   $fd->write($result, $done) or return;
2157
2158   # Restore previous settings, if any
2159   if (defined($refresh_interval)) {
2160      $self->set_index_refresh_interval($index, $refresh_interval);
2161   }
2162   if (defined($number_of_replicas) && $self->use_indexing_optimizations) {
2163      $self->set_index_number_of_replicas($index, $number_of_replicas);
2164   }
2165
2166   return $result;
2167}
2168
2169#
2170# http://localhost:9200/_nodes/stats/process?pretty
2171#
2172# Search::Elasticsearch::Client::2_0::Direct::Nodes
2173#
2174sub get_stats_process {
2175   my $self = shift;
2176
2177   my $es = $self->_es;
2178   $self->brik_help_run_undef_arg('open', $es) or return;
2179
2180   my $r;
2181   eval {
2182      $r = $es->nodes->stats(
2183         metric => [ qw(process) ],
2184      );
2185   };
2186   if ($@) {
2187      chomp($@);
2188      return $self->log->error("get_stats_process: failed: [$@]");
2189   }
2190
2191   return $r;
2192}
2193
2194#
2195# curl http://localhost:9200/_nodes/process?pretty
2196#
2197# Search::Elasticsearch::Client::2_0::Direct::Nodes
2198#
2199sub get_process {
2200   my $self = shift;
2201
2202   my $es = $self->_es;
2203   $self->brik_help_run_undef_arg('open', $es) or return;
2204
2205   my $r;
2206   eval {
2207      $r = $es->nodes->info(
2208         metric => [ qw(process) ],
2209      );
2210   };
2211   if ($@) {
2212      chomp($@);
2213      return $self->log->error("get_process: failed: [$@]");
2214   }
2215
2216   return $r;
2217}
2218
2219#
2220# Search::Elasticsearch::Client::2_0::Direct::Cluster
2221#
2222sub get_cluster_state {
2223   my $self = shift;
2224
2225   my $es = $self->_es;
2226   $self->brik_help_run_undef_arg('open', $es) or return;
2227
2228   my $r;
2229   eval {
2230      $r = $es->cluster->state;
2231   };
2232   if ($@) {
2233      chomp($@);
2234      return $self->log->error("get_cluster_state: failed: [$@]");
2235   }
2236
2237   return $r;
2238}
2239
2240#
2241# Search::Elasticsearch::Client::2_0::Direct::Cluster
2242#
2243sub get_cluster_health {
2244   my $self = shift;
2245
2246   my $es = $self->_es;
2247   $self->brik_help_run_undef_arg('open', $es) or return;
2248
2249   my $r;
2250   eval {
2251      $r = $es->cluster->health;
2252   };
2253   if ($@) {
2254      chomp($@);
2255      return $self->log->error("get_cluster_health: failed: [$@]");
2256   }
2257
2258   return $r;
2259}
2260
2261#
2262# Search::Elasticsearch::Client::2_0::Direct::Cluster
2263#
2264sub get_cluster_settings {
2265   my $self = shift;
2266
2267   my $es = $self->_es;
2268   $self->brik_help_run_undef_arg('open', $es) or return;
2269
2270   my $r;
2271   eval {
2272      $r = $es->cluster->get_settings;
2273   };
2274   if ($@) {
2275      chomp($@);
2276      return $self->log->error("get_cluster_settings: failed: [$@]");
2277   }
2278
2279   return $r;
2280}
2281
2282#
2283# Search::Elasticsearch::Client::2_0::Direct::Cluster
2284#
2285sub put_cluster_settings {
2286   my $self = shift;
2287   my ($settings) = @_;
2288
2289   my $es = $self->_es;
2290   $self->brik_help_run_undef_arg('open', $es) or return;
2291   $self->brik_help_run_undef_arg('put_cluster_settings', $settings) or return;
2292   $self->brik_help_run_invalid_arg('put_cluster_settings', $settings, 'HASH') or return;
2293
2294   my %args = (
2295      body => $settings,
2296   );
2297
2298   my $r;
2299   eval {
2300      $r = $es->cluster->put_settings(%args);
2301   };
2302   if ($@) {
2303      chomp($@);
2304      return $self->log->error("put_cluster_settings: failed: [$@]");
2305   }
2306
2307   return $r;
2308}
2309
2310sub count_green_indices {
2311   my $self = shift;
2312
2313   my $get = $self->show_indices or return;
2314
2315   my $count = 0;
2316   for (@$get) {
2317      if (/^\s*green\s+/) {
2318         $count++;
2319      }
2320   }
2321
2322   return $count;
2323}
2324
2325sub count_yellow_indices {
2326   my $self = shift;
2327
2328   my $get = $self->show_indices or return;
2329
2330   my $count = 0;
2331   for (@$get) {
2332      if (/^\s*yellow\s+/) {
2333         $count++;
2334      }
2335   }
2336
2337   return $count;
2338}
2339
2340sub count_red_indices {
2341   my $self = shift;
2342
2343   my $get = $self->show_indices or return;
2344
2345   my $count = 0;
2346   for (@$get) {
2347      if (/^\s*red\s+/) {
2348         $count++;
2349      }
2350   }
2351
2352   return $count;
2353}
2354
2355sub count_indices {
2356   my $self = shift;
2357
2358   my $get = $self->show_indices or return;
2359
2360   return scalar @$get;
2361}
2362
2363sub list_indices_status {
2364   my $self = shift;
2365
2366   my $get = $self->show_indices or return;
2367
2368   my $count_red = 0;
2369   my $count_yellow = 0;
2370   my $count_green = 0;
2371   for (@$get) {
2372      if (/^\s*red\s+/) {
2373         $count_red++;
2374      }
2375      elsif (/^\s*yellow\s+/) {
2376         $count_yellow++;
2377      }
2378      elsif (/^\s*green\s+/) {
2379         $count_green++;
2380      }
2381   }
2382
2383   return {
2384      red => $count_red,
2385      yellow => $count_yellow,
2386      green => $count_green,
2387   };
2388}
2389
2390sub count_shards {
2391   my $self = shift;
2392
2393   my $indices = $self->get_indices or return;
2394
2395   my $count = 0;
2396   for (@$indices) {
2397      $count += $_->{shards};
2398   }
2399
2400   return $count;
2401}
2402
2403sub count_size {
2404   my $self = shift;
2405
2406   my $indices = $self->get_indices or return;
2407
2408   my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
2409   $fn->kibi_suffix("kb");
2410   $fn->mebi_suffix("mb");
2411   $fn->gibi_suffix("gb");
2412   $fn->kilo_suffix("KB");
2413   $fn->mega_suffix("MB");
2414   $fn->giga_suffix("GB");
2415
2416   my $size = 0;
2417   for (@$indices) {
2418      $size += $fn->to_number($_->{size});
2419   }
2420
2421   return $fn->from_number($size);
2422}
2423
2424sub count_total_size {
2425   my $self = shift;
2426
2427   my $indices = $self->get_indices or return;
2428
2429   my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
2430   $fn->kibi_suffix("kb");
2431   $fn->mebi_suffix("mb");
2432   $fn->gibi_suffix("gb");
2433   $fn->kilo_suffix("KB");
2434   $fn->mega_suffix("MB");
2435   $fn->giga_suffix("GB");
2436
2437   my $size = 0;
2438   for (@$indices) {
2439      $size += $fn->to_number($_->{total_size});
2440   }
2441
2442   return $fn->from_number($size);
2443}
2444
2445sub count_count {
2446   my $self = shift;
2447
2448   my $indices = $self->get_indices or return;
2449
2450   my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
2451   $fn->kilo_suffix('k');
2452   $fn->mega_suffix('m');
2453   $fn->giga_suffix('M');
2454
2455   my $count = 0;
2456   for (@$indices) {
2457      $count += $_->{count};
2458   }
2459
2460   return $fn->from_number($count);
2461}
2462
2463sub list_green_indices {
2464   my $self = shift;
2465
2466   my $get = $self->get_indices or return;
2467
2468   my @indices = ();
2469   for (@$get) {
2470      if ($_->{color} eq 'green') {
2471         push @indices, $_->{index};
2472      }
2473   }
2474
2475   return \@indices;
2476}
2477
2478sub list_yellow_indices {
2479   my $self = shift;
2480
2481   my $get = $self->get_indices or return;
2482
2483   my @indices = ();
2484   for (@$get) {
2485      if ($_->{color} eq 'yellow') {
2486         push @indices, $_->{index};
2487      }
2488   }
2489
2490   return \@indices;
2491}
2492
2493sub list_red_indices {
2494   my $self = shift;
2495
2496   my $get = $self->get_indices or return;
2497
2498   my @indices = ();
2499   for (@$get) {
2500      if ($_->{color} eq 'red') {
2501         push @indices, $_->{index};
2502      }
2503   }
2504
2505   return \@indices;
2506}
2507
2508#
2509# https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html
2510#
2511sub list_datatypes {
2512   my $self = shift;
2513
2514   return {
2515      core => [ qw(string long integer short byte double float data boolean binary) ],
2516   };
2517}
2518
2519#
2520# Return total hits for last www_search
2521#
2522sub get_hits_total {
2523   my $self = shift;
2524
2525   # Retrieve data stored in the $RUN Variable from Context
2526   my $run = $self->context->do('$RUN');
2527   if (ref($run) eq 'HASH') {
2528      if (exists($run->{hits}) && exists($run->{hits}{total})) {
2529         return $run->{hits}{total};
2530      }
2531   }
2532
2533   return $self->log->error("get_hits_total: last Command not compatible");
2534}
2535
2536sub disable_shard_allocation {
2537   my $self = shift;
2538
2539   my $settings = {
2540      persistent => {
2541         'cluster.routing.allocation.enable' => 'none',
2542      }
2543   };
2544
2545   return $self->put_cluster_settings($settings);
2546}
2547
2548sub enable_shard_allocation {
2549   my $self = shift;
2550
2551   my $settings = {
2552      persistent => { 
2553         'cluster.routing.allocation.enable' => 'all',
2554      }
2555   };
2556
2557   return $self->put_cluster_settings($settings);
2558}
2559
2560sub flush_synced {
2561   my $self = shift;
2562
2563   my $es = $self->_es;
2564   $self->brik_help_run_undef_arg('open', $es) or return;
2565
2566   my $r;
2567   eval {
2568      $r = $es->indices->flush_synced;
2569   };
2570   if ($@) {
2571      chomp($@);
2572      return $self->log->error("flush_synced: failed: [$@]");
2573   }
2574
2575   return $r;
2576}
2577
2578#
2579# https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html
2580#
2581# run client::elasticsearch create_snapshot_repository myrepo
2582#      "{ type => 'fs', settings => { compress => 'true', location => '/path/' } }"
2583#
2584# You have to set path.repo in elasticsearch.yml like:
2585# path.repo: ["/home/gomor/es-backups"]
2586#
2587# Search::Elasticsearch::Client::2_0::Direct::Snapshot
2588#
2589sub create_snapshot_repository {
2590   my $self = shift;
2591   my ($body, $repository_name) = @_;
2592
2593   my $es = $self->_es;
2594   $self->brik_help_run_undef_arg('open', $es) or return;
2595   $self->brik_help_run_undef_arg('create_snapshot_repository', $body) or return;
2596
2597   $repository_name ||= 'repository';
2598
2599   my %args = (
2600      repository => $repository_name,
2601      body => $body,
2602   );
2603
2604   my $r;
2605   eval {
2606      $r = $es->snapshot->create_repository(%args);
2607   };
2608   if ($@) {
2609      chomp($@);
2610      return $self->log->error("create_snapshot_repository: failed: [$@]");
2611   }
2612
2613   return $r;
2614}
2615
2616sub create_shared_fs_snapshot_repository {
2617   my $self = shift;
2618   my ($location, $repository_name) = @_;
2619
2620   $repository_name ||= 'repository';
2621   $self->brik_help_run_undef_arg('create_shared_fs_snapshot_repository', $location) or return;
2622
2623   if ($location !~ m{^/}) {
2624      return $self->log->error("create_shared_fs_snapshot_repository: you have to give ".
2625         "a full directory path, this one is invalid [$location]");
2626   }
2627
2628   my $body = {
2629      type => 'fs',
2630      settings => {
2631         compress => 'true',
2632         location => $location,
2633      },
2634   };
2635
2636   return $self->create_snapshot_repository($body, $repository_name);
2637}
2638
2639#
2640# Search::Elasticsearch::Client::2_0::Direct::Snapshot
2641#
2642sub get_snapshot_repositories {
2643   my $self = shift;
2644
2645   my $es = $self->_es;
2646   $self->brik_help_run_undef_arg('open', $es) or return;
2647
2648   my $r;
2649   eval {
2650      $r = $es->snapshot->get_repository;
2651   };
2652   if ($@) {
2653      chomp($@);
2654      return $self->log->error("get_snapshot_repositories: failed: [$@]");
2655   }
2656
2657   return $r;
2658}
2659
2660#
2661# Search::Elasticsearch::Client::2_0::Direct::Snapshot
2662#
2663sub get_snapshot_status {
2664   my $self = shift;
2665
2666   my $es = $self->_es;
2667   $self->brik_help_run_undef_arg('open', $es) or return;
2668
2669   my $r;
2670   eval {
2671      $r = $es->snapshot->status;
2672   };
2673   if ($@) {
2674      chomp($@);
2675      return $self->log->error("get_snapshot_status: failed: [$@]");
2676   }
2677
2678   return $r;
2679}
2680
2681#
2682# Search::Elasticsearch::Client::5_0::Direct::Snapshot
2683#
2684sub create_snapshot {
2685   my $self = shift;
2686   my ($snapshot_name, $repository_name, $body) = @_;
2687
2688   my $es = $self->_es;
2689   $self->brik_help_run_undef_arg('open', $es) or return;
2690
2691   $snapshot_name ||= 'snapshot';
2692   $repository_name ||= 'repository';
2693
2694   my %args = (
2695      repository => $repository_name,
2696      snapshot => $snapshot_name,
2697   );
2698   if (defined($body)) {
2699      $args{body} = $body;
2700   }
2701
2702   my $r;
2703   eval {
2704      $r = $es->snapshot->create(%args);
2705   };
2706   if ($@) {
2707      chomp($@);
2708      return $self->log->error("create_snapshot: failed: [$@]");
2709   }
2710
2711   return $r;
2712}
2713
2714sub create_snapshot_for_indices {
2715   my $self = shift;
2716   my ($indices, $snapshot_name, $repository_name) = @_;
2717
2718   $self->brik_help_run_undef_arg('create_snapshot_for_indices', $indices) or return;
2719
2720   $snapshot_name ||= 'snapshot';
2721   $repository_name ||= 'repository';
2722
2723   my $body = {
2724      indices => $indices,
2725   };
2726
2727   return $self->create_snapshot($snapshot_name, $repository_name, $body);
2728}
2729
2730sub is_snapshot_finished {
2731   my $self = shift;
2732
2733   my $status = $self->get_snapshot_status or return;
2734
2735   if (@{$status->{snapshots}} == 0) {
2736      return 1;
2737   }
2738
2739   return 0;
2740}
2741
2742sub get_snapshot_state {
2743   my $self = shift;
2744
2745   if ($self->is_snapshot_finished) {
2746      return $self->log->info("get_snapshot_state: is already finished");
2747   }
2748
2749   my $status = $self->get_snapshot_status or return;
2750
2751   my @indices_done = ();
2752   my @indices_not_done = ();
2753
2754   my $list = $status->{snapshots};
2755   for my $snapshot (@$list) {
2756      my $indices = $snapshot->{indices};
2757      for my $index (@$indices) {
2758         my $done = $index->{shards_stats}{done};
2759         if ($done) {
2760            push @indices_done, $index;
2761         }
2762         else {
2763            push @indices_not_done, $index;
2764         }
2765      }
2766   }
2767
2768   return { done => \@indices_done, not_done => \@indices_not_done };
2769}
2770
2771sub verify_snapshot_repository {
2772}
2773
2774sub delete_snapshot_repository {
2775   my $self = shift;
2776   my ($repository_name) = @_;
2777
2778   my $es = $self->_es;
2779   $self->brik_help_run_undef_arg('open', $es) or return;
2780   $self->brik_help_run_undef_arg('delete_snapshot_repository', $repository_name) or return;
2781
2782   my $r;
2783   eval {
2784      $r = $es->snapshot->delete_repository(
2785         repository => $repository_name,
2786      );
2787   };
2788   if ($@) {
2789      chomp($@);
2790      return $self->log->error("delete_snapshot_repository: failed: [$@]");
2791   }
2792
2793   return $r;
2794}
2795
2796sub get_snapshot {
2797   my $self = shift;
2798   my ($snapshot_name, $repository_name) = @_;
2799
2800   my $es = $self->_es;
2801   $self->brik_help_run_undef_arg('open', $es) or return;
2802
2803   $snapshot_name ||= 'snapshot';
2804   $repository_name ||= 'repository';
2805
2806   my $r;
2807   eval {
2808      $r = $es->snapshot->get(
2809         repository => $repository_name,
2810         snapshot => $snapshot_name,
2811      );
2812   };
2813   if ($@) {
2814      chomp($@);
2815      return $self->log->error("get_snapshot: failed: [$@]");
2816   }
2817
2818   return $r;
2819}
2820
2821#
2822# Search::Elasticsearch::Client::5_0::Direct::Snapshot
2823#
2824sub delete_snapshot {
2825   my $self = shift;
2826   my ($snapshot_name, $repository_name) = @_;
2827
2828   my $es = $self->_es;
2829   $self->brik_help_run_undef_arg('open', $es) or return;
2830   $self->brik_help_run_undef_arg('delete_snapshot', $snapshot_name) or return;
2831   $self->brik_help_run_undef_arg('delete_snapshot', $repository_name) or return;
2832
2833   my $timeout = $self->rtimeout;
2834
2835   my $r;
2836   eval {
2837      $r = $es->snapshot->delete(
2838         repository => $repository_name,
2839         snapshot => $snapshot_name,
2840         master_timeout => "${timeout}s",
2841      );
2842   };
2843   if ($@) {
2844      chomp($@);
2845      return $self->log->error("delete_snapshot: failed: [$@]");
2846   }
2847
2848   return $r;
2849}
2850
2851#
2852# https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html
2853#
2854sub restore_snapshot {
2855   my $self = shift;
2856   my ($snapshot_name, $repository_name, $body) = @_;
2857
2858   my $es = $self->_es;
2859   $snapshot_name ||= 'snapshot';
2860   $repository_name ||= 'repository';
2861   $self->brik_help_run_undef_arg('open', $es) or return;
2862   $self->brik_help_run_undef_arg('restore_snapshot', $snapshot_name) or return;
2863   $self->brik_help_run_undef_arg('restore_snapshot', $repository_name) or return;
2864
2865   my %args = (
2866      repository => $repository_name,
2867      snapshot => $snapshot_name,
2868   );
2869   if (defined($body)) {
2870      $args{body} = $body;
2871   }
2872
2873   my $r;
2874   eval {
2875      $r = $es->snapshot->restore(%args);
2876   };
2877   if ($@) {
2878      chomp($@);
2879      return $self->log->error("restore_snapshot: failed: [$@]");
2880   }
2881
2882   return $r;
2883}
2884
2885sub restore_snapshot_for_indices {
2886   my $self = shift;
2887   my ($indices, $snapshot_name, $repository_name) = @_;
2888
2889   $snapshot_name ||= 'snapshot';
2890   $repository_name ||= 'repository';
2891   $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $indices) or return;
2892   $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $snapshot_name) or return;
2893   $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $repository_name) or return;
2894
2895   my $body = {
2896      indices => $indices,
2897   };
2898
2899   return $self->restore_snapshot($snapshot_name, $repository_name, $body);
2900}
2901
29021;
2903
2904__END__
2905
2906=head1 NAME
2907
2908Metabrik::Client::Elasticsearch - client::elasticsearch Brik
2909
2910=head1 SYNOPSIS
2911
2912   host:~> my $q = { term => { ip => "192.168.57.19" } }
2913   host:~> run client::elasticsearch open
2914   host:~> run client::elasticsearch query $q data-*
2915
2916=head1 DESCRIPTION
2917
2918Template to write a new Metabrik Brik.
2919
2920=head1 COPYRIGHT AND LICENSE
2921
2922Copyright (c) 2014-2017, Patrice E<lt>GomoRE<gt> Auffret
2923
2924You may distribute this module under the terms of The BSD 3-Clause License.
2925See LICENSE file in the source distribution archive.
2926
2927=head1 AUTHOR
2928
2929Patrice E<lt>GomoRE<gt> Auffret
2930
2931=cut
Note: See TracBrowser for help on using the repository browser.