source: repository/lib/Metabrik/Client/Elasticsearch.pm

Last change on this file was 949:3282a484f381, checked in by GomoR <gomor@…>, 2 weeks ago
  • PERF: client::elasticsearch: export_as_csv Command to using chunk for file writing
  • UPDATE: client::elasticsearch: scroll queries to take a query Argument
  • UPDATE: client::elasticsearch: next_scroll take return an ARRAY or a single value
  • new: client::elasticsearch: is_mapping_exists Command
File size: 76.4 KB
Line 
1#
2# $Id$
3#
4# client::elasticsearch Brik
5#
6package Metabrik::Client::Elasticsearch;
7use strict;
8use warnings;
9
10use base qw(Metabrik::Client::Rest);
11
12sub brik_properties {
13   return {
14      revision => '$Revision$',
15      tags => [ qw(unstable es es) ],
16      author => 'GomoR <GomoR[at]metabrik.org>',
17      license => 'http://opensource.org/licenses/BSD-3-Clause',
18      attributes => {
19         nodes => [ qw(node_list) ],
20         cxn_pool => [ qw(Sniff|Static|Static::NoPing) ],
21         date => [ qw(date) ],
22         index => [ qw(index) ],
23         type => [ qw(type) ],
24         from => [ qw(number) ],
25         size => [ qw(count) ],
26         max => [ qw(count) ],
27         max_flush_count => [ qw(count) ],
28         max_flush_size => [ qw(count) ],
29         rtimeout => [ qw(seconds) ],
30         sniff_rtimeout => [ qw(seconds) ],
31         try => [ qw(count) ],
32         use_bulk_autoflush => [ qw(0|1) ],
33         use_indexing_optimizations => [ qw(0|1) ],
34         csv_encoded_fields => [ qw(fields) ],
35         csv_object_fields => [ qw(fields) ],
36         _es => [ qw(INTERNAL) ],
37         _bulk => [ qw(INTERNAL) ],
38         _scroll => [ qw(INTERNAL) ],
39      },
40      attributes_default => {
41         nodes => [ qw(http://localhost:9200) ],
42         cxn_pool => 'Sniff',
43         from => 0,
44         size => 10,
45         max => 0,
46         index => '*',
47         type => '*',
48         rtimeout => 60,
49         sniff_rtimeout => 3,
50         try => 3,
51         max_flush_count => 1_000,
52         max_flush_size => 1_000_000,
53         use_bulk_autoflush => 1,
54         use_indexing_optimizations => 0,
55      },
56      commands => {
57         open => [ qw(nodes_list|OPTIONAL cxn_pool|OPTIONAL) ],
58         open_bulk_mode => [ qw(index|OPTIONAL type|OPTIONAL) ],
59         open_scroll_scan_mode => [ qw(index|OPTIONAL size|OPTIONAL) ],
60         open_scroll => [ qw(index|OPTIONAL size|OPTIONAL type|OPTIONAL query|OPTIONAL) ],
61         close_scroll => [ ],
62         total_scroll => [ ],
63         next_scroll => [ qw(count|OPTIONAL) ],
64         index_document => [ qw(document index|OPTIONAL type|OPTIONAL hash|OPTIONAL id|OPTIONAL) ],
65         update_document => [ qw(document id index|OPTIONAL type|OPTIONAL hash|OPTIONAL) ],
66         index_bulk => [ qw(document index|OPTIONAL type|OPTIONAL hash|OPTIONAL id|OPTIONAL) ],
67         bulk_flush => [ ],
68         query => [ qw($query_hash index|OPTIONAL type|OPTIONAL hash|OPTIONAL) ],
69         count => [ qw(index|OPTIONAL type|OPTIONAL) ],
70         get_from_id => [ qw(id index|OPTIONAL type|OPTIONAL) ],
71         www_search => [ qw(query index|OPTIONAL type|OPTIONAL) ],
72         delete_index => [ qw(index|indices_list) ],
73         update_alias => [ qw(new_index alias) ],
74         delete_document => [ qw(index type id) ],
75         delete_by_query => [ qw($query_hash index type) ],
76         show_indices => [ qw(string_filter|OPTIONAL) ],
77         show_nodes => [ ],
78         show_health => [ ],
79         show_recovery => [ ],
80         list_indices => [ ],
81         get_indices => [ ],
82         get_index => [ qw(index|indices_list) ],
83         list_index_types => [ qw(index) ],
84         list_index_fields => [ qw(index) ],
85         list_indices_version => [ qw(index|indices_list) ],
86         open_index => [ qw(index|indices_list) ],
87         close_index => [ qw(index|indices_list) ],
88         get_aliases => [ qw(index) ],
89         put_alias => [ qw(index alias) ],
90         delete_alias => [ qw(index alias) ],
91         is_mapping_exists => [ qw(index mapping) ],
92         get_mappings => [ qw(index type|OPTIONAL) ],
93         create_index => [ qw(index) ],
94         create_index_with_mappings => [ qw(index mappings) ],
95         info => [ qw(nodes_list|OPTIONAL) ],
96         version => [ qw(nodes_list|OPTIONAL) ],
97         get_templates => [ ],
98         list_templates => [ ],
99         get_template => [ qw(name) ],
100         put_template => [ qw(name template) ],
101         put_template_from_json_file => [ qw(file) ],
102         update_template_from_json_file => [ qw(file) ],
103         get_settings => [ qw(index|indices_list|OPTIONAL name|names_list|OPTIONAL) ],
104         put_settings => [ qw(settings_hash index|indices_list|OPTIONAL) ],
105         set_index_number_of_replicas => [ qw(index|indices_list number) ],
106         set_index_refresh_interval => [ qw(index|indices_list number) ],
107         get_index_number_of_replicas => [ qw(index|indices) ],
108         get_index_refresh_interval => [ qw(index|indices_list) ],
109         get_index_number_of_shards => [ qw(index|indices_list) ],
110         delete_template => [ qw(name) ],
111         is_index_exists => [ qw(index) ],
112         is_type_exists => [ qw(index type) ],
113         is_document_exists => [ qw(index type document) ],
114         parse_error_string => [ qw(string) ],
115         refresh_index => [ qw(index) ],
116         export_as_csv => [ qw(index size|OPTIONAL) ],
117         import_from_csv => [ qw(input_csv index|OPTIONAL type|OPTIONAL hash|OPTIONAL) ],
118         get_stats_process => [ ],
119         get_process => [ ],
120         get_cluster_state => [ ],
121         get_cluster_health => [ ],
122         get_cluster_settings => [ ],
123         put_cluster_settings => [ qw(settings) ],
124         count_green_indices => [ ],
125         count_yellow_indices => [ ],
126         count_red_indices => [ ],
127         list_green_indices => [ ],
128         list_yellow_indices => [ ],
129         list_red_indices => [ ],
130         count_indices => [ ],
131         list_indices_status => [ ],
132         count_shards => [ ],
133         count_size => [ ],
134         count_total_size => [ ],
135         count_count => [ ],
136         list_datatypes => [ ],
137         get_hits_total => [ ],
138         disable_shard_allocation => [ ],
139         enable_shard_allocation => [ ],
140         flush_synced => [ ],
141         create_snapshot_repository => [ qw(body repository_name|OPTIONAL) ],
142         create_shared_fs_snapshot_repository => [ qw(location
143            repository_name|OPTIONAL) ],
144         get_snapshot_repositories => [ ],
145         get_snapshot_status => [ ],
146         delete_snapshot_repository => [ qw(repository_name) ],
147         create_snapshot => [ qw(snapshot_name|OPTIONAL repository_name|OPTIONAL
148            body|OPTIONAL) ],
149         create_snapshot_for_indices => [ qw(indices snapshot_name|OPTIONAL
150            repository_name|OPTIONAL) ],
151         is_snapshot_finished => [ ],
152         get_snapshot_state => [ ],
153         get_snapshot => [ qw(snapshot_name|OPTIONAL repository_name|OPTIONAL) ],
154         delete_snapshot => [ qw(snapshot_name repository_name) ],
155         restore_snapshot => [ qw(snapshot_name repository_name body|OPTIONAL) ],
156         restore_snapshot_for_indices => [ qw(indices snapshot_name repository_name) ],
157      },
158      require_modules => {
159         'Metabrik::String::Json' => [ ],
160         'Metabrik::File::Csv' => [ ],
161         'Metabrik::File::Json' => [ ],
162         'Metabrik::File::Dump' => [ ],
163         'Metabrik::Format::Number' => [ ],
164         'Search::Elasticsearch' => [ ],
165      },
166   };
167}
168
169sub brik_preinit {
170   my $self = shift;
171
172   eval("use Search::Elasticsearch;");
173   if ($Search::Elasticsearch::VERSION < 5) {
174      $self->log->error("brik_preinit: please upgrade Search::Elasticsearch module ".
175         "with: run perl::module install Search::Elasticsearch");
176   }
177
178   return $self->SUPER::brik_preinit;
179}
180
181sub open {
182   my $self = shift;
183   my ($nodes, $cxn_pool) = @_;
184
185   $nodes ||= $self->nodes;
186   $cxn_pool ||= $self->cxn_pool;
187   $self->brik_help_run_undef_arg('open', $nodes) or return;
188   $self->brik_help_run_undef_arg('open', $cxn_pool) or return;
189   $self->brik_help_run_invalid_arg('open', $nodes, 'ARRAY') or return;
190   $self->brik_help_run_empty_array_arg('open', $nodes) or return;
191
192   for my $node (@$nodes) {
193      if ($node !~ m{https?://}) {
194         return $self->log->error("open: invalid node[$node], must start with http(s)");
195      }
196   }
197
198   my $timeout = $self->rtimeout;
199
200   my $nodes_str = join('|', @$nodes);
201   $self->log->verbose("open: using nodes [$nodes_str]");
202
203   #
204   # Timeout description here:
205   #
206   # Search::Elasticsearch::Role::Cxn
207   #
208
209   my $es = Search::Elasticsearch->new(
210      nodes => $nodes,
211      cxn_pool => $cxn_pool,
212      timeout => $timeout,
213      max_retries => $self->try,
214      retry_on_timeout => 1,
215      sniff_timeout => $self->sniff_rtimeout, # seconds, default 1
216      request_timeout => 60,  # seconds, default 30
217      ping_timeout => 5,  # seconds, default 2
218      dead_timeout => 120,  # seconds, detault 60
219      max_dead_timeout => 3600,  # seconds, default 3600
220      sniff_request_timeout => 15, # seconds, default 2
221   );
222   if (! defined($es)) {
223      return $self->log->error("open: failed");
224   }
225
226   $self->_es($es);
227
228   return $nodes;
229}
230
231#
232# Search::Elasticsearch::Client::5_0::Bulk
233#
234sub open_bulk_mode {
235   my $self = shift;
236   my ($index, $type) = @_;
237
238   $index ||= $self->index;
239   $type ||= $self->type;
240   my $es = $self->_es;
241   $self->brik_help_run_undef_arg('open', $es) or return;
242   $self->brik_help_run_undef_arg('open_bulk_mode', $index) or return;
243   $self->brik_help_run_undef_arg('open_bulk_mode', $type) or return;
244
245   my %args = (
246      index => $index,
247      type => $type,
248   );
249
250   if ($self->use_bulk_autoflush) {
251      my $max_count = $self->max_flush_count || 1_000;
252      my $max_size = $self->max_flush_size || 1_000_000;
253
254      $args{max_count} = $max_count;
255      $args{max_size} = $max_size;
256      $args{max_time} = 0;
257
258      $self->log->info("open_bulk_mode: opening with max_flush_count [$max_count] and ".
259         "max_flush_size [$max_size]");
260   }
261   else {
262      $args{max_count} = 0;
263      $args{max_size} = 0;
264      $args{max_time} = 0;
265      $args{on_error} = undef;
266      #$args{on_success} = sub {
267         #my ($action, $response, $i) = @_;
268      #};
269
270      $self->log->info("open_bulk_mode: opening without automatic flushing");
271   }
272
273   my $bulk = $es->bulk_helper(%args);
274   if (! defined($bulk)) {
275      return $self->log->error("open_bulk_mode: failed");
276   }
277
278   $self->_bulk($bulk);
279
280   return $self->nodes;
281}
282
283sub open_scroll_scan_mode {
284   my $self = shift;
285   my ($index, $size) = @_;
286
287   my $version = $self->version or return;
288   if ($version ge "5.0.0") {
289      return $self->log->error("open_scroll_scan_mode: Command not supported for ES version ".
290         "$version, try open_scroll Command instead");
291   }
292
293   $index ||= $self->index;
294   $size ||= $self->size;
295   my $es = $self->_es;
296   $self->brik_help_run_undef_arg('open', $es) or return;
297   $self->brik_help_run_undef_arg('open_scroll_scan_mode', $index) or return;
298   $self->brik_help_run_undef_arg('open_scroll_scan_mode', $size) or return;
299
300   my $scroll = $es->scroll_helper(
301      index => $index,
302      search_type => 'scan',
303      size => $size,
304   );
305   if (! defined($scroll)) {
306      return $self->log->error("open_scroll_scan_mode: failed");
307   }
308
309   $self->_scroll($scroll);
310
311   return $self->nodes;
312}
313
314#
315# Search::Elasticsearch::Client::5_0::Scroll
316#
317sub open_scroll {
318   my $self = shift;
319   my ($index, $size, $type, $query) = @_;
320
321   my $version = $self->version or return;
322   if ($version lt "5.0.0") {
323      return $self->log->error("open_scroll: Command not supported for ES version ".
324         "$version, try open_scroll_scan_mode Command instead");
325   }
326
327   $query ||= { query => { match_all => {} } };
328   $index ||= $self->index;
329   $type ||= $self->type;
330   $size ||= $self->size;
331   my $es = $self->_es;
332   $self->brik_help_run_undef_arg('open', $es) or return;
333   $self->brik_help_run_undef_arg('open_scroll', $index) or return;
334   $self->brik_help_run_undef_arg('open_scroll', $size) or return;
335
336   my $timeout = $self->rtimeout;
337
338   my %args = (
339      scroll => "${timeout}s",
340      scroll_in_qs => 1,  # By default (0), pass scroll_id in request body. When 1, pass
341                          # it in query string.
342      index => $index,
343      size => $size,
344      body => $query,
345   );
346   if ($type ne '*') {
347      $args{type} = $type;
348   }
349
350   #
351   # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html
352   #
353   my $scroll = $es->scroll_helper(%args);
354   if (! defined($scroll)) {
355      return $self->log->error("open_scroll: failed");
356   }
357
358   $self->_scroll($scroll);
359
360   $self->log->info("open_scroll: opened with size [$size] and timeout [${timeout}s]");
361
362   return $self->nodes;
363}
364
365#
366# Search::Elasticsearch::Client::5_0::Scroll
367#
368sub close_scroll {
369   my $self = shift;
370
371   my $scroll = $self->_scroll;
372   if (! defined($scroll)) {
373      return 1;
374   }
375
376   $scroll->finish;
377   $self->_scroll(undef);
378
379   return 1;
380}
381
382sub total_scroll {
383   my $self = shift;
384
385   my $scroll = $self->_scroll;
386   $self->brik_help_run_undef_arg('open_scroll', $scroll) or return;
387
388   my $total;
389   eval {
390      $total = $scroll->total;
391   };
392   if ($@) {
393      chomp($@);
394      return $self->log->error("total_scroll: failed with: [$@]");
395   }
396
397   return $total;
398}
399
400sub next_scroll {
401   my $self = shift;
402   my ($count) = @_;
403
404   $count ||= 1;
405
406   my $scroll = $self->_scroll;
407   $self->brik_help_run_undef_arg('open_scroll', $scroll) or return;
408
409   my $next;
410   eval {
411      if ($count > 1) {
412         my @docs = $scroll->next($count);
413         if (@docs > 0) {
414            $next = \@docs;
415         }
416      }
417      else {
418         $next = $scroll->next;
419      }
420   };
421   if ($@) {
422      chomp($@);
423      return $self->log->error("next_scroll: failed with: [$@]");
424   }
425
426   return $next;
427}
428
429#
430# Search::Elasticsearch::Client::5_0::Direct
431#
432sub index_document {
433   my $self = shift;
434   my ($doc, $index, $type, $hash, $id) = @_;
435
436   $index ||= $self->index;
437   $type ||= $self->type;
438   my $es = $self->_es;
439   $self->brik_help_run_undef_arg('open', $es) or return;
440   $self->brik_help_run_undef_arg('index_document', $doc) or return;
441   $self->brik_help_run_invalid_arg('index_document', $doc, 'HASH') or return;
442   $self->brik_help_set_undef_arg('index', $index) or return;
443   $self->brik_help_set_undef_arg('type', $type) or return;
444
445   my %args = (
446      index => $index,
447      type => $type,
448      body => $doc,
449   );
450   if (defined($id)) {
451      $args{id} = $id;
452   }
453
454   if (defined($hash)) {
455      $self->brik_help_run_invalid_arg('index_document', $hash, 'HASH') or return;
456      %args = ( %args, %$hash );
457   }
458
459   my $r;
460   eval {
461      $r = $es->index(%args);
462   };
463   if ($@) {
464      chomp($@);
465      return $self->log->error("index_document: index failed for index [$index]: [$@]");
466   }
467
468   return $r;
469}
470
471#
472# Search::Elasticsearch::Client::5_0::Direct
473#
474sub update_document {
475   my $self = shift;
476   my ($doc, $id, $index, $type, $hash) = @_;
477
478   $index ||= $self->index;
479   $type ||= $self->type;
480   my $es = $self->_es;
481   $self->brik_help_run_undef_arg('open', $es) or return;
482   $self->brik_help_run_undef_arg('update_document', $doc) or return;
483   $self->brik_help_run_invalid_arg('update_document', $doc, 'HASH') or return;
484   $self->brik_help_run_undef_arg('update_document', $id) or return;
485   $self->brik_help_set_undef_arg('index', $index) or return;
486   $self->brik_help_set_undef_arg('type', $type) or return;
487
488   my %args = (
489      id => $id,
490      index => $index,
491      type => $type,
492      body => { doc => $doc },
493   );
494
495   if (defined($hash)) {
496      $self->brik_help_run_invalid_arg('update_document', $hash, 'HASH') or return;
497      %args = ( %args, %$hash );
498   }
499
500   my $r;
501   eval {
502      $r = $es->update(%args);
503   };
504   if ($@) {
505      chomp($@);
506      return $self->log->error("update_document: index failed for index [$index]: [$@]");
507   }
508
509   return $r;
510}
511
512#
513# Search::Elasticsearch::Client::5_0::Bulk
514#
515sub index_bulk {
516   my $self = shift;
517   my ($doc, $index, $type, $hash, $id) = @_;
518
519   my $bulk = $self->_bulk;
520   $index ||= $self->index;
521   $type ||= $self->type;
522   $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
523   $self->brik_help_run_undef_arg('index_bulk', $doc) or return;
524   $self->brik_help_set_undef_arg('index', $index) or return;
525   $self->brik_help_set_undef_arg('type', $type) or return;
526
527   my %args = (
528      source => $doc,
529   );
530   if (defined($id)) {
531      $args{id} = $id;
532   }
533
534   if (defined($hash)) {
535      $self->brik_help_run_invalid_arg('index_bulk', $hash, 'HASH') or return;
536      %args = ( %args, %$hash );
537   }
538
539   my $r;
540   eval {
541      $r = $bulk->add_action(index => \%args);
542   };
543   if ($@) {
544      chomp($@);
545      my $p = $self->parse_error_string($@);
546      if (defined($p) && exists($p->{class})) {
547         my $class = $p->{class};
548         my $code = $p->{code};
549         my $node = $p->{node};
550         return $self->log->error("index_bulk: failed for index [$index] with error ".
551            "[$class] code [$code] for node [$node]");
552      }
553      else {
554         return $self->log->error("index_bulk: index failed for index [$index]: [$@]");
555      }
556   }
557
558   return $r;
559}
560
561sub bulk_flush {
562   my $self = shift;
563
564   my $bulk = $self->_bulk;
565   $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
566
567   my $try = $self->try;
568
569RETRY:
570
571   my $r;
572   eval {
573      $r = $bulk->flush;
574   };
575   if ($@) {
576      chomp($@);
577      if (--$try == 0) {
578         my $p = $self->parse_error_string($@);
579         if (defined($p) && exists($p->{class})) {
580            my $class = $p->{class};
581            my $code = $p->{code};
582            my $node = $p->{node};
583            return $self->log->error("bulk_flush: failed after [$try] tries with error ".
584               "[$class] code [$code] for node [$node]");
585         }
586         else {
587            return $self->log->error("bulk_flush: failed after [$try]: [$@]");
588         }
589      }
590      $self->log->warning("bulk_flush: sleeping 10 seconds before retry cause error ".
591               "[$@]");
592      sleep 10;
593      goto RETRY;
594   }
595
596   return $r;
597}
598
599#
600# Search::Elasticsearch::Client::2_0::Direct
601# Search::Elasticsearch::Client::5_0::Direct
602#
603sub count {
604   my $self = shift;
605   my ($index, $type) = @_;
606
607   $index ||= $self->index;
608   $type ||= $self->type;
609   my $es = $self->_es;
610   $self->brik_help_run_undef_arg('open', $es) or return;
611
612   my %args = ();
613   if (defined($index) && $index ne '*') {
614      $args{index} = $index;
615   }
616   if (defined($type) && $type ne '*') {
617      $args{type} = $type;
618   }
619
620   #$args{body} = {
621      #query => {
622         #match => { title => 'Elasticsearch clients' },
623      #},
624   #}
625
626   my $r;
627   my $version = $self->version or return;
628   if ($version ge "5.0.0") {
629      eval {
630         $r = $es->count(%args);
631      };
632   }
633   else {
634      eval {
635         $r = $es->search(
636            index => $index,
637            type => $type,
638            search_type => 'count',
639            body => {
640               query => {
641                  match_all => {},
642               },
643            },
644         );
645      };
646   }
647   if ($@) {
648      chomp($@);
649      return $self->log->error("count: count failed for index [$index]: [$@]");
650   }
651
652   if ($version ge "5.0.0") {
653      if (exists($r->{count})) {
654         return $r->{count};
655      }
656   }
657   elsif (exists($r->{hits}) && exists($r->{hits}{total})) {
658      return $r->{hits}{total};
659   }
660
661   return $self->log->error("count: nothing found");
662}
663
664#
665# https://www.elastic.co/guide/en/elasticsearch/reference/current/full-text-queries.html
666# https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html
667#
668# Example: my $q = { query => { term => { ip => "192.168.57.19" } } }
669#
670sub query {
671   my $self = shift;
672   my ($query, $index, $type, $hash) = @_;
673
674   $index ||= $self->index;
675   $type ||= $self->type;
676   my $es = $self->_es;
677   $self->brik_help_run_undef_arg('open', $es) or return;
678   $self->brik_help_run_undef_arg('query', $query) or return;
679   $self->brik_help_set_undef_arg('index', $index) or return;
680   $self->brik_help_set_undef_arg('type', $type) or return;
681   $self->brik_help_run_invalid_arg('query', $query, 'HASH') or return;
682
683   my $timeout = $self->rtimeout;
684
685   my %args = (
686      index => $index,
687      body => $query,
688   );
689
690   if (defined($hash)) {
691      $self->brik_help_run_invalid_arg('query', $hash, 'HASH') or return;
692      %args = ( %args, %$hash );
693   }
694
695   if ($type ne '*') {
696      $args{type} = $type;
697   }
698
699   my $r;
700   eval {
701      $r = $es->search(%args);
702   };
703   if ($@) {
704      chomp($@);
705      return $self->log->error("query: failed for index [$index]: [$@]");
706   }
707
708   return $r;
709}
710
711sub get_from_id {
712   my $self = shift;
713   my ($id, $index, $type) = @_;
714
715   $index ||= $self->index;
716   $type ||= $self->type;
717   my $es = $self->_es;
718   $self->brik_help_run_undef_arg('open', $es) or return;
719   $self->brik_help_run_undef_arg('get_from_id', $id) or return;
720   $self->brik_help_set_undef_arg('index', $index) or return;
721   $self->brik_help_set_undef_arg('type', $type) or return;
722
723   my $r;
724   eval {
725      $r = $es->get(
726         index => $index,
727         type => $type,
728         id => $id,
729      );
730   };
731   if ($@) {
732      chomp($@);
733      return $self->log->error("get_from_id: get failed for index [$index]: [$@]");
734   }
735
736   return $r;
737}
738
739#
740# https://www.elastic.co/guide/en/elasticsearch/reference/current/search-uri-request.html
741#
742sub www_search {
743   my $self = shift;
744   my ($query, $index, $type) = @_;
745
746   $index ||= $self->index;
747   $type ||= $self->type;
748   $self->brik_help_run_undef_arg('www_search', $query) or return;
749   $self->brik_help_set_undef_arg('index', $index) or return;
750   $self->brik_help_set_undef_arg('type', $type) or return;
751
752   my $from = $self->from;
753   my $size = $self->size;
754
755   my $sj = Metabrik::String::Json->new_from_brik_init($self) or return;
756
757   my $nodes = $self->nodes;
758   for my $node (@$nodes) {
759      # http://localhost:9200/INDEX/TYPE/_search/?size=SIZE&q=QUERY
760      my $url = "$node/$index";
761      if ($type ne '*') {
762         $url .= "/$type";
763      }
764      $url .= "/_search/?from=$from&size=$size&q=".$query;
765
766      my $get = $self->SUPER::get($url) or next;
767      my $body = $get->{content};
768
769      my $decoded = $sj->decode($body) or next;
770
771      return $decoded;
772   }
773
774   return;
775}
776
777#
778# Search::Elasticsearch::Client::2_0::Direct::Indices
779#
780sub delete_index {
781   my $self = shift;
782   my ($index) = @_;
783
784   my $es = $self->_es;
785   $self->brik_help_run_undef_arg('open', $es) or return;
786   $self->brik_help_run_undef_arg('delete_index', $index) or return;
787   $self->brik_help_run_invalid_arg('delete_index', $index, 'ARRAY', 'SCALAR') or return;
788
789   my %args = (
790      index => $index,
791   );
792
793   my $r;
794   eval {
795      $r = $es->indices->delete(%args);
796   };
797   if ($@) {
798      chomp($@);
799      return $self->log->error("delete_index: delete failed for index [$index]: [$@]");
800   }
801
802   return $r;
803}
804
805#
806# Search::Elasticsearch::Client::2_0::Direct::Indices
807#
808sub delete_document {
809   my $self = shift;
810   my ($index, $type, $id, $hash) = @_;
811
812   my $es = $self->_es;
813   $self->brik_help_run_undef_arg('open', $es) or return;
814   $self->brik_help_run_undef_arg('delete_document', $index) or return;
815   $self->brik_help_run_undef_arg('delete_document', $type) or return;
816   $self->brik_help_run_undef_arg('delete_document', $id) or return;
817
818   my %args = (
819      index => $index,
820      type => $type,
821      id => $id,
822   );
823
824   if (defined($hash)) {
825      $self->brik_help_run_invalid_arg('delete_document', $hash, 'HASH') or return;
826      %args = ( %args, %$hash );
827   }
828
829   my $r;
830   eval {
831      $r = $es->delete(%args);
832   };
833   if ($@) {
834      chomp($@);
835      return $self->log->error("delete_document: delete failed for index [$index]: [$@]");
836   }
837
838   return $r;
839}
840
841#
842# https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html
843#
844# Example: my $q = { query => { term => { ip => "192.168.57.19" } } }
845#
846sub delete_by_query {
847   my $self = shift;
848   my ($query, $index, $type) = @_;
849
850   my $es = $self->_es;
851   $self->brik_help_run_undef_arg('open', $es) or return;
852   $self->brik_help_run_undef_arg('delete_by_query', $query) or return;
853   $self->brik_help_run_undef_arg('delete_by_query', $index) or return;
854   $self->brik_help_run_undef_arg('delete_by_query', $type) or return;
855   $self->brik_help_run_invalid_arg('delete_by_query', $query, 'HASH') or return;
856
857   my $timeout = $self->rtimeout;
858
859   my %args = (
860      index => $index,
861      type => $type,
862      body => $query,
863   );
864
865   my $r;
866   eval {
867      $r = $es->delete_by_query(%args);
868   };
869   if ($@) {
870      chomp($@);
871      return $self->log->error("delete_by_query: failed for index [$index]: [$@]");
872   }
873
874   # This may fail, we ignore it.
875   $self->refresh_index($index);
876
877   return $r;
878}
879
880#
881# Search::Elasticsearch::Client::2_0::Direct::Cat
882#
883sub show_indices {
884   my $self = shift;
885   my ($string) = @_;
886
887   my $es = $self->_es;
888   $self->brik_help_run_undef_arg('open', $es) or return;
889
890   my $r;
891   eval {
892      $r = $es->cat->indices;
893   };
894   if ($@) {
895      chomp($@);
896      return $self->log->error("show_indices: failed: [$@]");
897   }
898
899   my @lines = split(/\n/, $r);
900
901   if (@lines == 0) {
902      $self->log->warning("show_indices: nothing returned, no index?");
903   }
904
905   my @filtered = ();
906   if (defined($string)) {
907      for (@lines) {
908         if (m{$string}) {
909            push @filtered, $_;
910         }
911      }
912      @lines = @filtered;
913   }
914
915   return \@lines;
916}
917
918#
919# Search::Elasticsearch::Client::2_0::Direct::Cat
920#
921sub show_nodes {
922   my $self = shift;
923
924   my $es = $self->_es;
925   $self->brik_help_run_undef_arg('open', $es) or return;
926
927   my $r;
928   eval {
929      $r = $es->cat->nodes;
930   };
931   if ($@) {
932      chomp($@);
933      return $self->log->error("show_nodes: failed: [$@]");
934   }
935
936   my @lines = split(/\n/, $r);
937
938   if (@lines == 0) {
939      $self->log->warning("show_nodes: nothing returned, no nodes?");
940   }
941
942   return \@lines;
943}
944
945#
946# Search::Elasticsearch::Client::2_0::Direct::Cat
947#
948sub show_health {
949   my $self = shift;
950
951   my $es = $self->_es;
952   $self->brik_help_run_undef_arg('open', $es) or return;
953
954   my $r;
955   eval {
956      $r = $es->cat->health;
957   };
958   if ($@) {
959      chomp($@);
960      return $self->log->error("show_health: failed: [$@]");
961   }
962
963   my @lines = split(/\n/, $r);
964
965   if (@lines == 0) {
966      $self->log->warning("show_health: nothing returned, no recovery?");
967   }
968
969   return \@lines;
970}
971
972#
973# Search::Elasticsearch::Client::2_0::Direct::Cat
974#
975sub show_recovery {
976   my $self = shift;
977
978   my $es = $self->_es;
979   $self->brik_help_run_undef_arg('open', $es) or return;
980
981   my $r;
982   eval {
983      $r = $es->cat->recovery;
984   };
985   if ($@) {
986      chomp($@);
987      return $self->log->error("show_recovery: failed: [$@]");
988   }
989
990   my @lines = split(/\n/, $r);
991
992   if (@lines == 0) {
993      $self->log->warning("show_recovery: nothing returned, no index?");
994   }
995
996   return \@lines;
997}
998
999sub list_indices {
1000   my $self = shift;
1001
1002   my $get = $self->get_indices or return;
1003
1004   my @indices = ();
1005   for (@$get) {
1006      push @indices, $_->{index};
1007   }
1008
1009   return [ sort { $a cmp $b } @indices ];
1010}
1011
1012sub get_indices {
1013   my $self = shift;
1014
1015   my $lines = $self->show_indices or return;
1016   if (@$lines == 0) {
1017      $self->log->warning("get_indices: no index found");
1018      return [];
1019   }
1020
1021   #
1022   # Format depends on ElasticSearch version. We try to detect the format.
1023   #
1024   # 5.0.0:
1025   # "yellow open www-2016-08-14 BmNE9RaBRSCKqB5Oe8yZcw 5 1  146 0 251.8kb 251.8kb"
1026   #
1027   my @indices = ();
1028   for (@$lines) {
1029      my @t = split(/\s+/);
1030      if (@t == 10) {  # Version 5.0.0
1031         my $color = $t[0];
1032         my $state = $t[1];
1033         my $index = $t[2];
1034         my $id = $t[3];
1035         my $shards = $t[4];
1036         my $replicas = $t[5];
1037         my $count = $t[6];
1038         my $count2 = $t[7];
1039         my $total_size = $t[8];
1040         my $size = $t[9];
1041         push @indices, {
1042            color => $color,
1043            state => $state,
1044            index => $index,
1045            id => $id,
1046            shards => $shards,
1047            replicas => $replicas,
1048            count => $count,
1049            total_size => $total_size,
1050            size => $size,
1051         };
1052      }
1053      elsif (@t == 9) {
1054         my $index = $t[2];
1055         push @indices, {
1056            index => $index,
1057         };
1058      }
1059      elsif (@t == 8) {
1060         my $index = $t[1];
1061         push @indices, {
1062            index => $index,
1063         };
1064      }
1065   }
1066
1067   return \@indices;
1068}
1069
1070#
1071# Search::Elasticsearch::Client::5_0::Direct::Indices
1072#
1073sub get_index {
1074   my $self = shift;
1075   my ($index) = @_;
1076 
1077   my $es = $self->_es;
1078   $self->brik_help_run_undef_arg('open', $es) or return;
1079   $self->brik_help_run_undef_arg('get_index', $index) or return;
1080   $self->brik_help_run_invalid_arg('get_index', $index, 'ARRAY', 'SCALAR') or return;
1081
1082   my %args = (
1083      index => $index,
1084   );
1085
1086   my $r;
1087   eval {
1088      $r = $es->indices->get(%args);
1089   };
1090   if ($@) {
1091      chomp($@);
1092      return $self->log->error("get_index: get failed for index [$index]: [$@]");
1093   }
1094
1095   return $r;
1096}
1097
1098sub list_index_types {
1099   my $self = shift;
1100   my ($index) = @_;
1101
1102   my $es = $self->_es;
1103   $self->brik_help_run_undef_arg('open', $es) or return;
1104   $self->brik_help_run_undef_arg('list_index_types', $index) or return;
1105   $self->brik_help_run_invalid_arg('list_index_types', $index, 'SCALAR') or return;
1106
1107   my $r = $self->get_mappings($index) or return;
1108   if (keys %$r > 1) {
1109      return $self->log->error("list_index_types: multiple indices found, choose one");
1110   }
1111
1112   my @types = ();
1113   for my $this_index (keys %$r) {
1114      my $mappings = $r->{$this_index}{mappings};
1115      push @types, keys %$mappings;
1116   }
1117
1118   my %uniq = map { $_ => 1 } @types;
1119
1120   return [ sort { $a cmp $b } keys %uniq ];
1121}
1122
1123#
1124# By default, if you provide only one index and no type,
1125# all types will be merged (including _default_)
1126# If you specify one type (other than _default_), _default_ will be merged to it.
1127#
1128sub list_index_fields {
1129   my $self = shift;
1130   my ($index, $type) = @_;
1131
1132   my $es = $self->_es;
1133   $self->brik_help_run_undef_arg('open', $es) or return;
1134   $self->brik_help_run_undef_arg('list_index_fields', $index) or return;
1135   $self->brik_help_run_invalid_arg('list_index_fields', $index, 'SCALAR') or return;
1136
1137   my $r;
1138   if (defined($type)) {
1139      $r = $self->get_mappings($index, $type) or return;
1140      if (keys %$r > 1) {
1141         return $self->log->error("list_index_fields: multiple indices found, ".
1142            "choose one");
1143      }
1144      # _default_ mapping may not exists.
1145      if ($self->is_mapping_exists($index, '_default_')) {
1146         my $r2 = $self->get_mappings($index, '_default_');
1147         # Merge
1148         for my $this_index (keys %$r2) {
1149            my $default = $r2->{$this_index}{mappings}{'_default_'};
1150            $r->{$this_index}{mappings}{_default_} = $default;
1151         }
1152      }
1153   }
1154   else {
1155      $r = $self->get_mappings($index) or return;
1156      if (keys %$r > 1) {
1157         return $self->log->error("list_index_fields: multiple indices found, ".
1158            "choose one");
1159      }
1160   }
1161
1162   my @fields = ();
1163   for my $this_index (keys %$r) {
1164      my $mappings = $r->{$this_index}{mappings};
1165      for my $this_type (keys %$mappings) {
1166         my $properties = $mappings->{$this_type}{properties};
1167         push @fields, keys %$properties;
1168      }
1169   }
1170
1171   my %uniq = map { $_ => 1 } @fields;
1172
1173   return [ sort { $a cmp $b } keys %uniq ];
1174}
1175
1176sub list_indices_version {
1177   my $self = shift;
1178   my ($index) = @_;
1179
1180   my $es = $self->_es;
1181   $self->brik_help_run_undef_arg('open', $es) or return;
1182   $self->brik_help_run_undef_arg('list_indices_version', $index) or return;
1183   $self->brik_help_run_invalid_arg('list_indices_version', $index, 'ARRAY', 'SCALAR')
1184      or return;
1185
1186   my $r = $self->get_index($index) or return;
1187
1188   my @list = ();
1189   for my $this (keys %$r) {
1190      my $name = $this;
1191      my $version = $r->{$this}{settings}{index}{version}{created};
1192      push @list, {
1193         index => $name,
1194         version => $version,
1195      };
1196   }
1197
1198   return \@list;
1199}
1200
1201sub open_index {
1202   my $self = shift;
1203   my ($index) = @_;
1204
1205   my $es = $self->_es;
1206   $self->brik_help_run_undef_arg('open', $es) or return;
1207   $self->brik_help_run_undef_arg('open_index', $index) or return;
1208   $self->brik_help_run_invalid_arg('open_index', $index, 'ARRAY', 'SCALAR') or return;
1209
1210   my $r;
1211   eval {
1212      $r = $es->indices->open(
1213         index => $index,
1214      );
1215   };
1216   if ($@) {
1217      chomp($@);
1218      return $self->log->error("open_index: failed: [$@]");
1219   }
1220
1221   return $r;
1222}
1223
1224sub close_index {
1225   my $self = shift;
1226   my ($index) = @_;
1227
1228   my $es = $self->_es;
1229   $self->brik_help_run_undef_arg('open', $es) or return;
1230   $self->brik_help_run_undef_arg('close_index', $index) or return;
1231   $self->brik_help_run_invalid_arg('close_index', $index, 'ARRAY', 'SCALAR') or return;
1232
1233   my $r;
1234   eval {
1235      $r = $es->indices->close(
1236         index => $index,
1237      );
1238   };
1239   if ($@) {
1240      chomp($@);
1241      return $self->log->error("close_index: failed: [$@]");
1242   }
1243
1244   return $r;
1245}
1246
1247#
1248# Search::Elasticsearch::Client::5_0::Direct::Indices
1249#
1250sub get_aliases {
1251   my $self = shift;
1252   my ($index) = @_;
1253
1254   $index ||= $self->index;
1255   my $es = $self->_es;
1256   $self->brik_help_run_undef_arg('open', $es) or return;
1257
1258   my %args = (
1259      index => $index,
1260   );
1261
1262   my $r;
1263   eval {
1264      $r = $es->indices->get(%args);
1265   };
1266   if ($@) {
1267      chomp($@);
1268      return $self->log->error("get_aliases: get_aliases failed: [$@]");
1269   }
1270
1271   my %aliases = ();
1272   for my $this (keys %$r) {
1273      $aliases{$this} = $r->{$this}{aliases};
1274   }
1275
1276   return \%aliases;
1277}
1278
1279#
1280# Search::Elasticsearch::Client::5_0::Direct::Indices
1281#
1282sub put_alias {
1283   my $self = shift;
1284   my ($index, $alias) = @_;
1285
1286   my $es = $self->_es;
1287   $self->brik_help_run_undef_arg('open', $es) or return;
1288   $self->brik_help_run_undef_arg('put_alias', $index) or return;
1289   $self->brik_help_run_undef_arg('put_alias', $alias) or return;
1290
1291   my %args = (
1292      index => $index,
1293      name => $alias,
1294   );
1295
1296   my $r;
1297   eval {
1298      $r = $es->indices->put_alias(%args);
1299   };
1300   if ($@) {
1301      chomp($@);
1302      return $self->log->error("put_alias: put_alias failed: [$@]");
1303   }
1304
1305   return $r;
1306}
1307
1308#
1309# Search::Elasticsearch::Client::5_0::Direct::Indices
1310#
1311sub delete_alias {
1312   my $self = shift;
1313   my ($index, $alias) = @_;
1314
1315   my $es = $self->_es;
1316   $self->brik_help_run_undef_arg('open', $es) or return;
1317   $self->brik_help_run_undef_arg('delete_alias', $index) or return;
1318   $self->brik_help_run_undef_arg('delete_alias', $alias) or return;
1319
1320   my %args = (
1321      index => $index,
1322      name => $alias,
1323   );
1324
1325   my $r;
1326   eval {
1327      $r = $es->indices->delete_alias(%args);
1328   };
1329   if ($@) {
1330      chomp($@);
1331      return $self->log->error("delete_alias: delete_alias failed: [$@]");
1332   }
1333
1334   return $r;
1335}
1336
1337sub update_alias {
1338   my $self = shift;
1339   my ($new_index, $alias) = @_;
1340
1341   my $es = $self->_es;
1342   $self->brik_help_run_undef_arg('open', $es) or return;
1343   $self->brik_help_run_undef_arg('update_alias', $new_index) or return;
1344   $self->brik_help_run_undef_arg('update_alias', $alias) or return;
1345
1346   # Search for previous index with that alias, if any.
1347   my $prev_index;
1348   my $aliases = $self->get_aliases or return;
1349   while (my ($k, $v) = each %$aliases) {
1350      for my $this (keys %$v) {
1351         if ($this eq $alias) {
1352            $prev_index = $k;
1353            last;
1354         }
1355      }
1356      last if $prev_index;
1357   }
1358
1359   # Delete previous alias if it exists.
1360   if (defined($prev_index)) {
1361      $self->delete_alias($prev_index, $alias) or return;
1362   }
1363
1364   return $self->put_alias($new_index, $alias);
1365}
1366
1367sub is_mapping_exists {
1368   my $self = shift;
1369   my ($index, $mapping) = @_;
1370
1371   $self->brik_help_run_undef_arg('is_mapping_exists', $index) or return;
1372   $self->brik_help_run_undef_arg('is_mapping_exists', $mapping) or return;
1373
1374   if (! $self->is_index_exists($index)) {
1375      return 0;
1376   }
1377
1378   my $all = $self->get_mappings($index) or return;
1379   for my $this_index (keys %$all) {
1380      my $mappings = $all->{$this_index}{mappings};
1381      for my $this_mapping (keys %$mappings) {
1382         if ($this_mapping eq $mapping) {
1383            return 1;
1384         }
1385      }
1386   }
1387
1388   return 0;
1389}
1390
1391#
1392# Search::Elasticsearch::Client::2_0::Direct::Indices
1393#
1394sub get_mappings {
1395   my $self = shift;
1396   my ($index, $type) = @_;
1397
1398   my $es = $self->_es;
1399   $self->brik_help_run_undef_arg('open', $es) or return;
1400   $self->brik_help_run_undef_arg('get_mappings', $index) or return;
1401   $self->brik_help_run_invalid_arg('get_mappings', $index, 'ARRAY', 'SCALAR') or return;
1402
1403   my %args = (
1404      index => $index,
1405      type => $type,
1406   );
1407
1408   my $r;
1409   eval {
1410      $r = $es->indices->get_mapping(%args);
1411   };
1412   if ($@) {
1413      chomp($@);
1414      return $self->log->error("get_mappings: get_mapping failed for index [$index]: ".
1415         "[$@]");
1416   }
1417
1418   return $r;
1419}
1420
1421#
1422# Search::Elasticsearch::Client::2_0::Direct::Indices
1423#
1424sub create_index {
1425   my $self = shift;
1426   my ($index, $shards_count) = @_;
1427
1428   my $es = $self->_es;
1429   $self->brik_help_run_undef_arg('open', $es) or return;
1430   $self->brik_help_run_undef_arg('create_index', $index) or return;
1431         
1432   my $r;
1433   eval {
1434      $r = $es->indices->create(
1435         index => $index,
1436      );
1437   };
1438   if ($@) {
1439      chomp($@);
1440      return $self->log->error("create_index: create failed for index [$index]: [$@]");
1441   }
1442   
1443   return $r;
1444}
1445
1446#
1447# https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html
1448#
1449sub create_index_with_mappings {
1450   my $self = shift;
1451   my ($index, $mappings) = @_;
1452
1453   my $es = $self->_es;
1454   $self->brik_help_run_undef_arg('open', $es) or return;
1455   $self->brik_help_run_undef_arg('create_index_with_mappings', $index) or return;
1456   $self->brik_help_run_undef_arg('create_index_with_mappings', $mappings) or return;
1457   $self->brik_help_run_invalid_arg('create_index_with_mappings', $mappings, 'HASH') or return;
1458
1459   my $r;
1460   eval {
1461      $r = $es->indices->create(
1462         index => $index,
1463         body => {
1464            mappings => $mappings,
1465         },
1466      );
1467   };
1468   if ($@) {
1469      chomp($@);
1470      return $self->log->error("create_index_with_mappings: create failed for index [$index]: [$@]");
1471   }
1472
1473   return $r;
1474}
1475
1476# GET http://localhost:9200/
1477sub info {
1478   my $self = shift;
1479   my ($nodes) = @_;
1480
1481   $nodes ||= $self->nodes;
1482   $self->brik_help_run_undef_arg('info', $nodes) or return;
1483   $self->brik_help_run_invalid_arg('info', $nodes, 'ARRAY') or return;
1484   $self->brik_help_run_empty_array_arg('info', $nodes) or return;
1485
1486   my $first = $nodes->[0];
1487
1488   $self->get($first) or return;
1489
1490   return $self->content;
1491}
1492
1493sub version {
1494   my $self = shift;
1495   my ($nodes) = @_;
1496
1497   $nodes ||= $self->nodes;
1498   $self->brik_help_run_undef_arg('version', $nodes) or return;
1499   $self->brik_help_run_invalid_arg('version', $nodes, 'ARRAY') or return;
1500   $self->brik_help_run_empty_array_arg('version', $nodes) or return;
1501
1502   my $first = $nodes->[0];
1503
1504   $self->get($first) or return;
1505   my $content = $self->content or return;
1506
1507   return $content->{version}{number};
1508}
1509
1510#
1511# Search::Elasticsearch::Client::2_0::Direct::Indices
1512#
1513sub get_templates {
1514   my $self = shift;
1515
1516   my $es = $self->_es;
1517   $self->brik_help_run_undef_arg('open', $es) or return;
1518
1519   my $r;
1520   eval {
1521      $r = $es->indices->get_template;
1522   };
1523   if ($@) {
1524      chomp($@);
1525      return $self->log->error("get_templates: failed: [$@]");
1526   }
1527
1528   return $r;
1529}
1530
1531sub list_templates {
1532   my $self = shift;
1533
1534   my $content = $self->get_templates or return;
1535
1536   return [ sort { $a cmp $b } keys %$content ];
1537}
1538
1539#
1540# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
1541#
1542sub get_template {
1543   my $self = shift;
1544   my ($template) = @_;
1545
1546   my $es = $self->_es;
1547   $self->brik_help_run_undef_arg('open', $es) or return;
1548   $self->brik_help_run_undef_arg('get_template', $template) or return;
1549
1550   my $r;
1551   eval {
1552      $r = $es->indices->get_template(
1553         name => $template,
1554      );
1555   };
1556   if ($@) {
1557      chomp($@);
1558      return $self->log->error("get_template: template failed for name [$template]: [$@]");
1559   }
1560
1561   return $r;
1562}
1563
1564#
1565# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
1566#
1567sub put_template {
1568   my $self = shift;
1569   my ($name, $template) = @_;
1570
1571   my $es = $self->_es;
1572   $self->brik_help_run_undef_arg('open', $es) or return;
1573   $self->brik_help_run_undef_arg('put_template', $name) or return;
1574   $self->brik_help_run_undef_arg('put_template', $template) or return;
1575   $self->brik_help_run_invalid_arg('put_template', $template, 'HASH') or return;
1576
1577   my $r;
1578   eval {
1579      $r = $es->indices->put_template(
1580         name => $name,
1581         body => $template,
1582      );
1583   };
1584   if ($@) {
1585      chomp($@);
1586      return $self->log->error("put_template: template failed for name [$name]: [$@]");
1587   }
1588
1589   return $r;
1590}
1591
1592sub put_template_from_json_file {
1593   my $self = shift;
1594   my ($json_file) = @_;
1595
1596   my $es = $self->_es;
1597   $self->brik_help_run_undef_arg('open', $es) or return;
1598   $self->brik_help_run_undef_arg('put_template_from_json_file', $json_file) or return;
1599   $self->brik_help_run_file_not_found('put_template_from_json_file', $json_file) or return;
1600
1601   my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
1602   my $data = $fj->read($json_file) or return;
1603
1604   if (! exists($data->{template})) {
1605      return $self->log->error("put_template_from_json_file: no template name found");
1606   }
1607
1608   my $name = $data->{template};
1609
1610   return $self->put_template($name, $data);
1611}
1612
1613sub update_template_from_json_file {
1614   my $self = shift;
1615   my ($json_file) = @_;
1616
1617   my $es = $self->_es;
1618   $self->brik_help_run_undef_arg('open', $es) or return;
1619   $self->brik_help_run_undef_arg('update_template_from_json_file', $json_file) or return;
1620   $self->brik_help_run_file_not_found('update_template_from_json_file', $json_file) or return;
1621
1622   my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
1623   my $data = $fj->read($json_file) or return;
1624
1625   if (! exists($data->{template})) {
1626      return $self->log->error("put_template_from_json_file: no template name found");
1627   }
1628
1629   my $name = $data->{template};
1630
1631   $self->delete_template($name);  # We ignore errors, template may not exist.
1632
1633   return $self->put_template($name, $data);
1634}
1635
1636#
1637# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-settings.html
1638# Search::Elasticsearch::Client::2_0::Direct::Indices
1639#
1640sub get_settings {
1641   my $self = shift;
1642   my ($indices, $names) = @_;
1643
1644   my $es = $self->_es;
1645   $self->brik_help_run_undef_arg('open', $es) or return;
1646
1647   my %args = ();
1648   if (defined($indices)) {
1649      $self->brik_help_run_undef_arg('get_settings', $indices) or return;
1650      my $ref = $self->brik_help_run_invalid_arg('get_settings', $indices, 'ARRAY', 'SCALAR')
1651         or return;
1652      $args{index} = $indices;
1653   }
1654   if (defined($names)) {
1655      $self->brik_help_run_file_not_found('get_settings', $names) or return;
1656      my $ref = $self->brik_help_run_invalid_arg('get_settings', $names, 'ARRAY', 'SCALAR')
1657         or return;
1658      $args{name} = $names;
1659   }
1660
1661   my $r;
1662   eval {
1663      $r = $es->indices->get_settings(%args);
1664   };
1665   if ($@) {
1666      chomp($@);
1667      return $self->log->error("get_settings: failed: [$@]");
1668   }
1669
1670   return $r;
1671}
1672
1673#
1674# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-settings.html
1675# Search::Elasticsearch::Client::2_0::Direct::Indices
1676#
1677# Example:
1678#
1679# run client::elasticsearch put_settings "{ index => { refresh_interval => -1 } }"
1680#
1681# XXX: should be renamed to put_index_settings
1682#
1683sub put_settings {
1684   my $self = shift;
1685   my ($settings, $indices) = @_;
1686
1687   my $es = $self->_es;
1688   $self->brik_help_run_undef_arg('open', $es) or return;
1689   $self->brik_help_run_undef_arg('put_settings', $settings) or return;
1690   $self->brik_help_run_invalid_arg('put_settings', $settings, 'HASH') or return;
1691
1692   my %args = (
1693      body => $settings,
1694   );
1695   if (defined($indices)) {
1696      $self->brik_help_run_undef_arg('put_settings', $indices) or return;
1697      my $ref = $self->brik_help_run_invalid_arg('put_settings', $indices, 'ARRAY', 'SCALAR')
1698         or return;
1699      $args{index} = $indices;
1700   }
1701
1702   my $r;
1703   eval {
1704      $r = $es->indices->put_settings(%args);
1705   };
1706   if ($@) {
1707      chomp($@);
1708      return $self->log->error("put_settings: failed: [$@]");
1709   }
1710
1711   return $r;
1712}
1713
1714sub set_index_number_of_replicas {
1715   my $self = shift;
1716   my ($indices, $number) = @_;
1717
1718   my $es = $self->_es;
1719   $self->brik_help_run_undef_arg('open', $es) or return;
1720   $self->brik_help_run_undef_arg('set_index_number_of_replicas', $indices) or return;
1721   $self->brik_help_run_invalid_arg('set_index_number_of_replicas', $indices, 'ARRAY', 'SCALAR')
1722      or return;
1723
1724   my $settings = { number_of_replicas => $number };
1725
1726   return $self->put_settings($settings, $indices);
1727}
1728
1729sub set_index_refresh_interval {
1730   my $self = shift;
1731   my ($indices, $number) = @_;
1732
1733   my $es = $self->_es;
1734   $self->brik_help_run_undef_arg('open', $es) or return;
1735   $self->brik_help_run_undef_arg('set_index_refresh_interval', $indices) or return;
1736   $self->brik_help_run_invalid_arg('set_index_refresh_interval', $indices, 'ARRAY', 'SCALAR')
1737      or return;
1738
1739   # If there is a meaningful value not postfixed with a unity,
1740   # we default to add a `s' for a number of seconds.
1741   if ($number =~ /^\d+$/ && $number > 0) {
1742      $number .= 's';
1743   }
1744
1745   my $settings = { refresh_interval => $number };
1746
1747   return $self->put_settings($settings, $indices);
1748}
1749
1750sub get_index_number_of_replicas {
1751   my $self = shift;
1752   my ($indices) = @_;
1753
1754   my $es = $self->_es;
1755   $self->brik_help_run_undef_arg('open', $es) or return;
1756   $self->brik_help_run_undef_arg('get_index_number_of_replicas', $indices) or return;
1757   $self->brik_help_run_invalid_arg('get_index_number_of_replicas', $indices, 'ARRAY', 'SCALAR')
1758      or return;
1759
1760   my $settings = $self->get_settings($indices);
1761
1762   my %indices = ();
1763   for (keys %$settings) {
1764      $indices{$_} = $settings->{$_}{settings}{index}{number_of_replicas};
1765   }
1766
1767   return \%indices;
1768}
1769
1770sub get_index_refresh_interval {
1771   my $self = shift;
1772   my ($indices, $number) = @_;
1773
1774   my $es = $self->_es;
1775   $self->brik_help_run_undef_arg('open', $es) or return;
1776   $self->brik_help_run_undef_arg('get_index_refresh_interval', $indices) or return;
1777   $self->brik_help_run_invalid_arg('get_index_refresh_interval', $indices, 'ARRAY', 'SCALAR')
1778      or return;
1779
1780   my $settings = $self->get_settings($indices);
1781
1782   my %indices = ();
1783   for (keys %$settings) {
1784      $indices{$_} = $settings->{$_}{settings}{index}{refresh_interval};
1785   }
1786
1787   return \%indices;
1788}
1789
1790sub get_index_number_of_shards {
1791   my $self = shift;
1792   my ($indices, $number) = @_;
1793
1794   my $es = $self->_es;
1795   $self->brik_help_run_undef_arg('open', $es) or return;
1796   $self->brik_help_run_undef_arg('get_index_number_of_shards', $indices) or return;
1797   $self->brik_help_run_invalid_arg('get_index_number_of_shards', $indices, 'ARRAY', 'SCALAR')
1798      or return;
1799
1800   my $settings = $self->get_settings($indices);
1801
1802   my %indices = ();
1803   for (keys %$settings) {
1804      $indices{$_} = $settings->{$_}{settings}{index}{number_of_shards};
1805   }
1806
1807   return \%indices;
1808}
1809
1810#
1811# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
1812#
1813sub delete_template {
1814   my $self = shift;
1815   my ($name) = @_;
1816
1817   my $es = $self->_es;
1818   $self->brik_help_run_undef_arg('open', $es) or return;
1819   $self->brik_help_run_undef_arg('delete_template', $name) or return;
1820
1821   my $r;
1822   eval {
1823      $r = $es->indices->delete_template(
1824         name => $name,
1825      );
1826   };
1827   if ($@) {
1828      chomp($@);
1829      return $self->log->error("delete_template: failed for name [$name]: [$@]");
1830   }
1831
1832   return $r;
1833}
1834
1835#
1836# Return a boolean to state for index existence
1837#
1838sub is_index_exists {
1839   my $self = shift;
1840   my ($index) = @_;
1841
1842   my $es = $self->_es;
1843   $self->brik_help_run_undef_arg('open', $es) or return;
1844   $self->brik_help_run_undef_arg('is_index_exists', $index) or return;
1845
1846   my $r;
1847   eval {
1848      $r = $es->indices->exists(
1849         index => $index,
1850      );
1851   };
1852   if ($@) {
1853      chomp($@);
1854      return $self->log->error("is_index_exists: failed for index [$index]: [$@]");
1855   }
1856
1857   return $r ? 1 : 0;
1858}
1859
1860#
1861# Return a boolean to state for index with type existence
1862#
1863sub is_type_exists {
1864   my $self = shift;
1865   my ($index, $type) = @_;
1866
1867   my $es = $self->_es;
1868   $self->brik_help_run_undef_arg('open', $es) or return;
1869   $self->brik_help_run_undef_arg('is_type_exists', $index) or return;
1870   $self->brik_help_run_undef_arg('is_type_exists', $type) or return;
1871
1872   my $r;
1873   eval {
1874      $r = $es->indices->exists_type(
1875         index => $index,
1876         type => $type,
1877      );
1878   };
1879   if ($@) {
1880      chomp($@);
1881      return $self->log->error("is_type_exists: failed for index [$index] and ".
1882         "type [$type]: [$@]");
1883   }
1884
1885   return $r ? 1 : 0;
1886}
1887
1888#
1889# Return a boolean to state for document existence
1890#
1891sub is_document_exists {
1892   my $self = shift;
1893   my ($index, $type, $document) = @_;
1894
1895   my $es = $self->_es;
1896   $self->brik_help_run_undef_arg('open', $es) or return;
1897   $self->brik_help_run_undef_arg('is_document_exists', $index) or return;
1898   $self->brik_help_run_undef_arg('is_document_exists', $type) or return;
1899   $self->brik_help_run_undef_arg('is_document_exists', $document) or return;
1900   $self->brik_help_run_invalid_arg('is_document_exists', $document, 'HASH') or return;
1901
1902   my $r;
1903   eval {
1904      $r = $es->exists(
1905         index => $index,
1906         type => $type,
1907         %$document,
1908      );
1909   };
1910   if ($@) {
1911      chomp($@);
1912      return $self->log->error("is_document_exists: failed for index [$index] and ".
1913         "type [$type]: [$@]");
1914   }
1915
1916   return $r ? 1 : 0;
1917}
1918
1919sub parse_error_string {
1920   my $self = shift;
1921   my ($string) = @_;
1922
1923   $self->brik_help_run_undef_arg('parse_error_string', $string) or return;
1924
1925   # [Timeout] ** [http://X.Y.Z.1:9200]-[599] Timed out while waiting for socket to become ready for reading, called from sub Search::Elasticsearch::Role::Client::Direct::__ANON__ at /usr/local/lib/perl5/site_perl/Metabrik/Client/Elasticsearch.pm line 1466. With vars: {'status_code' => 599,'request' => {'body' => undef,'qs' => {},'ignore' => [],'serialize' => 'std','path' => '/index-thing/_refresh','method' => 'POST'}}
1926
1927   my ($class, $node, $code, $message, $dump) = $string =~
1928      m{^\[([^]]+)\] \*\* \[([^]]+)\]\-\[(\d+)\] (.+)\. With vars: (.+)$};
1929
1930   if (defined($dump) && length($dump)) {
1931      my $sd = Metabrik::String::Dump->new_from_brik_init($self) or return;
1932      $dump = $sd->decode($dump);
1933   }
1934
1935   # Sanity check
1936   if (defined($node) && $node =~ m{^http} && $code =~ m{^\d+$}
1937   &&  defined($dump) && ref($dump) eq 'HASH') {
1938      return {
1939         class => $class,
1940         node => $node,
1941         code => $code,
1942         message => $message,
1943         dump => $dump,
1944      };
1945   }
1946
1947   # Were not able to decode, we return as-is.
1948   return {
1949      message => $string,
1950   };
1951}
1952
1953#
1954# Refresh an index to receive latest additions
1955#
1956# Search::Elasticsearch::Client::5_0::Direct::Indices
1957# https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-refresh.html
1958#
1959sub refresh_index {
1960   my $self = shift;
1961   my ($index) = @_;
1962
1963   my $es = $self->_es;
1964   $self->brik_help_run_undef_arg('open', $es) or return;
1965   $self->brik_help_run_undef_arg('refresh_index', $index) or return;
1966
1967   my $try = $self->try;
1968
1969RETRY:
1970
1971   my $r;
1972   eval {
1973      $r = $es->indices->refresh(
1974         index => $index,
1975      );
1976   };
1977   if ($@) {
1978      if (--$try == 0) {
1979         chomp($@);
1980         my $p = $self->parse_error_string($@);
1981         if (defined($p) && exists($p->{class})) {
1982            my $class = $p->{class};
1983            my $code = $p->{code};
1984            my $node = $p->{node};
1985            return $self->log->error("refresh_index: failed for index [$index] ".
1986               "after [$try] tries with error [$class] code [$code] for node [$node]");
1987         }
1988         else {
1989            return $self->log->error("refresh_index: failed for index [$index] ".
1990               "after [$try]: [$@]");
1991         }
1992      }
1993      sleep 60;
1994      goto RETRY;
1995   }
1996
1997   return $r;
1998}
1999
2000sub export_as_csv {
2001   my $self = shift;
2002   my ($index, $size) = @_;
2003
2004   $size ||= 10_000;
2005   my $es = $self->_es;
2006   $self->brik_help_run_undef_arg('open', $es) or return;
2007   $self->brik_help_run_undef_arg('export_as_csv', $index) or return;
2008   $self->brik_help_run_undef_arg('export_as_csv', $size) or return;
2009
2010   my $max = $self->max;
2011
2012   my $scroll;
2013   my $version = $self->version or return;
2014   if ($version lt "5.0.0") {
2015      $scroll = $self->open_scroll_scan_mode($index, $size) or return;
2016   }
2017   else {
2018      $scroll = $self->open_scroll($index, $size) or return;
2019   }
2020
2021   my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;
2022
2023   my $fc = Metabrik::File::Csv->new_from_brik_init($self) or return;
2024   $fc->separator(',');
2025   $fc->escape('\\');
2026   $fc->append(1);
2027   $fc->first_line_is_header(0);
2028   $fc->write_header(1);
2029   $fc->use_quoting(1);
2030   $fc->encoded_fields($self->csv_encoded_fields);
2031   $fc->object_fields($self->csv_object_fields);
2032
2033   my $total = $self->total_scroll;
2034   $self->log->info("export_as_csv: total [$total] for index [$index]");
2035
2036   my $h = {};
2037   my %types = ();
2038   my $read = 0;
2039   my $skipped = 0;
2040   my $exported = 0;
2041   my $start = time();
2042   my $done = 'output.exported';
2043   my $start_time = time();
2044   my %chunk = ();
2045   while (my $next = $self->next_scroll(10000)) {
2046      for my $this (@$next) {
2047         $read++;
2048         my $id = $this->{_id};
2049         my $doc = $this->{_source};
2050         my $type = $this->{_type};
2051         if (! exists($types{$type})) {
2052            my $fields = $self->list_index_fields($index, $type) or return;
2053            #$types{$type}{header} = [ '_id', sort { $a cmp $b } keys %$doc ];
2054            $types{$type}{header} = [ '_id', @$fields ];
2055            $types{$type}{output} = "$index:$type.csv";
2056            $done = $types{$type}{output_exported} = "$index:$type.csv.exported";
2057
2058            # Verify it has not been exported yet
2059            if (-f $types{$type}{output_exported}) {
2060               return $self->log->error("export_as_csv: export already done for index ".
2061                  "[$index] with type [$type] and file [$index:$type.csv]");
2062            }
2063
2064            $self->log->info("export_as_csv: exporting to file [$index:$type.csv] ".
2065               "for new type [$type], using chunk size of [$size]");
2066         }
2067
2068         $h->{_id} = $id;
2069
2070         for my $k (keys %$doc) {
2071            $h->{$k} = $doc->{$k};
2072         }
2073
2074         $fc->header($types{$type}{header});
2075
2076         push @{$chunk{$type}}, $h;
2077         if (@{$chunk{$type}} > 999) {
2078            my $r = $fc->write($chunk{$type}, $types{$type}{output});
2079            if (!defined($r)) {
2080               $self->log->warning("export_as_csv: unable to process entry, skipping");
2081               $skipped++;
2082               next;
2083            }
2084            $chunk{$type} = [];
2085         }
2086
2087         # Log a status sometimes.
2088         if (! (++$exported % 100_000)) {
2089            my $now = time();
2090            $self->log->info("export_as_csv: fetched [$exported/$total] elements in ".
2091               ($now - $start)." second(s) from index [$index]");
2092            $start = time();
2093         }
2094
2095         # Limit export to specified maximum
2096         if ($max > 0 && $exported >= $max) {
2097            $self->log->info("export_as_csv: max export reached [$exported] for index ".
2098               "[$index], stopping");
2099            last;
2100         }
2101      }
2102   }
2103
2104   # Process remaining data waiting to be written
2105   for my $type (keys %types) {
2106      if (@{$chunk{$type}} > 0) {
2107         $fc->write($chunk{$type}, $types{$type}{output});
2108      }
2109   }
2110
2111   $self->close_scroll;
2112
2113   my $stop_time = time();
2114   my $duration = $stop_time - $start_time;
2115   my $eps = $exported;
2116   if ($duration > 0) {
2117      $eps = $exported / $duration;
2118   }
2119
2120   my $result = {
2121      read => $read,
2122      exported => $exported,
2123      skipped => $read - $exported,
2124      total_count => $total,
2125      complete => ($exported == $total) ? 1 : 0,
2126      duration => $duration,
2127      eps => $eps, 
2128   };
2129
2130   # Say the file has been processed, and put resulting stats.
2131   $fd->write($result, $done) or return;
2132
2133   return $result;
2134}
2135
2136#
2137# Optimization instructions:
2138# https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html
2139#
2140sub import_from_csv {
2141   my $self = shift;
2142   my ($input_csv, $index, $type, $hash) = @_;
2143
2144   my $es = $self->_es;
2145   $self->brik_help_run_undef_arg('open', $es) or return;
2146   $self->brik_help_run_undef_arg('import_from_csv', $input_csv) or return;
2147   $self->brik_help_run_file_not_found('import_from_csv', $input_csv) or return;
2148
2149   # If index and/or types are not defined, we try to get them from input filename
2150   if (! defined($index) || ! defined($type)) {
2151      # Example: index-DATE:type.csv
2152      if ($input_csv =~ m{^(.+):(.+)\.csv(?:.*)?$}) {
2153         my ($this_index, $this_type) = $input_csv =~ m{^(.+):(.+)\.csv(?:.*)?$};
2154         $index ||= $this_index;
2155         $type ||= $this_type;
2156      }
2157   }
2158
2159   # Verify it has not been indexed yet
2160   my $done = "$input_csv.imported";
2161   if (-f $done) {
2162      $self->log->info("import_from_csv: import already done for file [$input_csv]");
2163      return 0;
2164   }
2165
2166   # And default to Attributes if guess failed.
2167   $index ||= $self->index;
2168   $type ||= $self->type;
2169   $self->brik_help_set_undef_arg('index', $index) or return;
2170   $self->brik_help_set_undef_arg('type', $type) or return;
2171
2172   if ($index eq '*') {
2173      return $self->log->error("import_from_csv: cannot import to invalid index [$index]");
2174   }
2175   if ($type eq '*') {
2176      return $self->log->error("import_from_csv: cannot import to invalid type [$type]");
2177   }
2178
2179   $self->log->debug("input [$input_csv]");
2180   $self->log->debug("index [$index]");
2181   $self->log->debug("type [$type]");
2182
2183   my $count_before = 0;
2184   if ($self->is_index_exists($index)) {
2185      $count_before = $self->count($index, $type);
2186      if (! defined($count_before)) {
2187         return;
2188      }
2189      $self->log->info("import_from_csv: current index [$index] count is ".
2190         "[$count_before]");
2191   }
2192
2193   my $max = $self->max;
2194
2195   $self->open_bulk_mode($index, $type) or return;
2196
2197   $self->log->info("import_from_csv: importing file [$input_csv] to index [$index] ".
2198      "with type [$type]");
2199
2200   my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;
2201
2202   my $fc = Metabrik::File::Csv->new_from_brik_init($self) or return;
2203   $fc->separator(',');
2204   $fc->escape('\\');
2205   $fc->first_line_is_header(1);
2206   $fc->encoded_fields($self->csv_encoded_fields);
2207   $fc->object_fields($self->csv_object_fields);
2208
2209   my $refresh_interval;
2210   my $number_of_replicas;
2211   my $start = time();
2212   my $speed_settings = {};
2213   my $imported = 0;
2214   my $first = 1;
2215   my $read = 0;
2216   my $skipped_chunks = 0;
2217   my $start_time = time();
2218   while (my $this = $fc->read_next($input_csv)) {
2219      $read++;
2220
2221      my $h = {};
2222      my $id = $this->{_id};
2223      delete $this->{_id};
2224      for my $k (keys %$this) {
2225         my $value = $this->{$k};
2226         # We keep only fields when they have a value.
2227         # No need to index data that is empty.
2228         if (defined($value) && length($value)) {
2229            $h->{$k} = $value;
2230         }
2231      }
2232
2233      my $r = $self->index_bulk($h, $index, $type, $hash, $id);
2234      if (! defined($r)) {
2235         $self->log->error("import_from_csv: bulk processing failed for index [$index] ".
2236            "at read [$read], skipping chunk");
2237         $skipped_chunks++;
2238         next;
2239      }
2240
2241      # Gather index settings, and set values for speed.
2242      # We don't do it earlier, cause we need index to be created,
2243      # and it should have been done from index_bulk Command.
2244      if ($first && $self->is_index_exists($index)) {
2245         # Save current values so we can restore them at the end of Command.
2246         # We ignore errors here, this is non-blocking for indexing.
2247         $refresh_interval = $self->get_index_refresh_interval($index);
2248         $refresh_interval = $refresh_interval->{$index};
2249         $number_of_replicas = $self->get_index_number_of_replicas($index);
2250         $number_of_replicas = $number_of_replicas->{$index};
2251         if ($self->use_indexing_optimizations) {
2252            $self->set_index_number_of_replicas($index, 0);
2253         }
2254         $self->set_index_refresh_interval($index, -1);
2255         $first = 0;
2256      }
2257
2258      # Log a status sometimes.
2259      if (! (++$imported % 100_000)) {
2260         my $now = time();
2261         $self->log->info("import_from_csv: imported [$imported] entries in ".
2262            ($now - $start)." second(s) to index [$index]");
2263         $start = time();
2264      }
2265
2266      # Limit import to specified maximum
2267      if ($max > 0 && $imported >= $max) {
2268         $self->log->info("import_from_csv: max import reached [$imported] for ".
2269            "index [$index], stopping");
2270         last;
2271      }
2272   }
2273
2274   $self->bulk_flush;
2275
2276   my $stop_time = time();
2277   my $duration = $stop_time - $start_time;
2278   my $eps = $imported / ($duration || 1);  # Avoid divide by zero error.
2279
2280   $self->refresh_index($index);
2281
2282   my $count_current = $self->count($index, $type) or return;
2283   $self->log->info("import_from_csv: after index [$index] count is [$count_current]");
2284
2285   my $skipped = 0;
2286   my $complete = (($count_current - $count_before) == $read) ? 1 : 0;
2287   if ($complete) {  # If complete, import has been retried, and everything is now ok.
2288      $imported = $read;
2289   }
2290   else {
2291      $skipped = $read - ($count_current - $count_before);
2292   }
2293
2294   my $result = {
2295      read => $read,
2296      imported => $imported,
2297      skipped => $skipped,
2298      previous_count => $count_before,
2299      current_count => $count_current,
2300      complete => $complete,
2301      duration => $duration,
2302      eps => $eps,
2303   };
2304
2305   # Say the file has been processed, and put resulting stats.
2306   $fd->write($result, $done) or return;
2307
2308   # Restore previous settings, if any
2309   if (defined($refresh_interval)) {
2310      $self->set_index_refresh_interval($index, $refresh_interval);
2311   }
2312   if (defined($number_of_replicas) && $self->use_indexing_optimizations) {
2313      $self->set_index_number_of_replicas($index, $number_of_replicas);
2314   }
2315
2316   return $result;
2317}
2318
2319#
2320# http://localhost:9200/_nodes/stats/process?pretty
2321#
2322# Search::Elasticsearch::Client::2_0::Direct::Nodes
2323#
2324sub get_stats_process {
2325   my $self = shift;
2326
2327   my $es = $self->_es;
2328   $self->brik_help_run_undef_arg('open', $es) or return;
2329
2330   my $r;
2331   eval {
2332      $r = $es->nodes->stats(
2333         metric => [ qw(process) ],
2334      );
2335   };
2336   if ($@) {
2337      chomp($@);
2338      return $self->log->error("get_stats_process: failed: [$@]");
2339   }
2340
2341   return $r;
2342}
2343
2344#
2345# curl http://localhost:9200/_nodes/process?pretty
2346#
2347# Search::Elasticsearch::Client::2_0::Direct::Nodes
2348#
2349sub get_process {
2350   my $self = shift;
2351
2352   my $es = $self->_es;
2353   $self->brik_help_run_undef_arg('open', $es) or return;
2354
2355   my $r;
2356   eval {
2357      $r = $es->nodes->info(
2358         metric => [ qw(process) ],
2359      );
2360   };
2361   if ($@) {
2362      chomp($@);
2363      return $self->log->error("get_process: failed: [$@]");
2364   }
2365
2366   return $r;
2367}
2368
2369#
2370# Search::Elasticsearch::Client::2_0::Direct::Cluster
2371#
2372sub get_cluster_state {
2373   my $self = shift;
2374
2375   my $es = $self->_es;
2376   $self->brik_help_run_undef_arg('open', $es) or return;
2377
2378   my $r;
2379   eval {
2380      $r = $es->cluster->state;
2381   };
2382   if ($@) {
2383      chomp($@);
2384      return $self->log->error("get_cluster_state: failed: [$@]");
2385   }
2386
2387   return $r;
2388}
2389
2390#
2391# Search::Elasticsearch::Client::2_0::Direct::Cluster
2392#
2393sub get_cluster_health {
2394   my $self = shift;
2395
2396   my $es = $self->_es;
2397   $self->brik_help_run_undef_arg('open', $es) or return;
2398
2399   my $r;
2400   eval {
2401      $r = $es->cluster->health;
2402   };
2403   if ($@) {
2404      chomp($@);
2405      return $self->log->error("get_cluster_health: failed: [$@]");
2406   }
2407
2408   return $r;
2409}
2410
2411#
2412# Search::Elasticsearch::Client::2_0::Direct::Cluster
2413#
2414sub get_cluster_settings {
2415   my $self = shift;
2416
2417   my $es = $self->_es;
2418   $self->brik_help_run_undef_arg('open', $es) or return;
2419
2420   my $r;
2421   eval {
2422      $r = $es->cluster->get_settings;
2423   };
2424   if ($@) {
2425      chomp($@);
2426      return $self->log->error("get_cluster_settings: failed: [$@]");
2427   }
2428
2429   return $r;
2430}
2431
2432#
2433# Search::Elasticsearch::Client::2_0::Direct::Cluster
2434#
2435sub put_cluster_settings {
2436   my $self = shift;
2437   my ($settings) = @_;
2438
2439   my $es = $self->_es;
2440   $self->brik_help_run_undef_arg('open', $es) or return;
2441   $self->brik_help_run_undef_arg('put_cluster_settings', $settings) or return;
2442   $self->brik_help_run_invalid_arg('put_cluster_settings', $settings, 'HASH') or return;
2443
2444   my %args = (
2445      body => $settings,
2446   );
2447
2448   my $r;
2449   eval {
2450      $r = $es->cluster->put_settings(%args);
2451   };
2452   if ($@) {
2453      chomp($@);
2454      return $self->log->error("put_cluster_settings: failed: [$@]");
2455   }
2456
2457   return $r;
2458}
2459
2460sub count_green_indices {
2461   my $self = shift;
2462
2463   my $get = $self->show_indices or return;
2464
2465   my $count = 0;
2466   for (@$get) {
2467      if (/^\s*green\s+/) {
2468         $count++;
2469      }
2470   }
2471
2472   return $count;
2473}
2474
2475sub count_yellow_indices {
2476   my $self = shift;
2477
2478   my $get = $self->show_indices or return;
2479
2480   my $count = 0;
2481   for (@$get) {
2482      if (/^\s*yellow\s+/) {
2483         $count++;
2484      }
2485   }
2486
2487   return $count;
2488}
2489
2490sub count_red_indices {
2491   my $self = shift;
2492
2493   my $get = $self->show_indices or return;
2494
2495   my $count = 0;
2496   for (@$get) {
2497      if (/^\s*red\s+/) {
2498         $count++;
2499      }
2500   }
2501
2502   return $count;
2503}
2504
2505sub count_indices {
2506   my $self = shift;
2507
2508   my $get = $self->show_indices or return;
2509
2510   return scalar @$get;
2511}
2512
2513sub list_indices_status {
2514   my $self = shift;
2515
2516   my $get = $self->show_indices or return;
2517
2518   my $count_red = 0;
2519   my $count_yellow = 0;
2520   my $count_green = 0;
2521   for (@$get) {
2522      if (/^\s*red\s+/) {
2523         $count_red++;
2524      }
2525      elsif (/^\s*yellow\s+/) {
2526         $count_yellow++;
2527      }
2528      elsif (/^\s*green\s+/) {
2529         $count_green++;
2530      }
2531   }
2532
2533   return {
2534      red => $count_red,
2535      yellow => $count_yellow,
2536      green => $count_green,
2537   };
2538}
2539
2540sub count_shards {
2541   my $self = shift;
2542
2543   my $indices = $self->get_indices or return;
2544
2545   my $count = 0;
2546   for (@$indices) {
2547      $count += $_->{shards};
2548   }
2549
2550   return $count;
2551}
2552
2553sub count_size {
2554   my $self = shift;
2555
2556   my $indices = $self->get_indices or return;
2557
2558   my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
2559   $fn->kibi_suffix("kb");
2560   $fn->mebi_suffix("mb");
2561   $fn->gibi_suffix("gb");
2562   $fn->kilo_suffix("KB");
2563   $fn->mega_suffix("MB");
2564   $fn->giga_suffix("GB");
2565
2566   my $size = 0;
2567   for (@$indices) {
2568      $size += $fn->to_number($_->{size});
2569   }
2570
2571   return $fn->from_number($size);
2572}
2573
2574sub count_total_size {
2575   my $self = shift;
2576
2577   my $indices = $self->get_indices or return;
2578
2579   my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
2580   $fn->kibi_suffix("kb");
2581   $fn->mebi_suffix("mb");
2582   $fn->gibi_suffix("gb");
2583   $fn->kilo_suffix("KB");
2584   $fn->mega_suffix("MB");
2585   $fn->giga_suffix("GB");
2586
2587   my $size = 0;
2588   for (@$indices) {
2589      $size += $fn->to_number($_->{total_size});
2590   }
2591
2592   return $fn->from_number($size);
2593}
2594
2595sub count_count {
2596   my $self = shift;
2597
2598   my $indices = $self->get_indices or return;
2599
2600   my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
2601   $fn->kilo_suffix('k');
2602   $fn->mega_suffix('m');
2603   $fn->giga_suffix('M');
2604
2605   my $count = 0;
2606   for (@$indices) {
2607      $count += $_->{count};
2608   }
2609
2610   return $fn->from_number($count);
2611}
2612
2613sub list_green_indices {
2614   my $self = shift;
2615
2616   my $get = $self->get_indices or return;
2617
2618   my @indices = ();
2619   for (@$get) {
2620      if ($_->{color} eq 'green') {
2621         push @indices, $_->{index};
2622      }
2623   }
2624
2625   return \@indices;
2626}
2627
2628sub list_yellow_indices {
2629   my $self = shift;
2630
2631   my $get = $self->get_indices or return;
2632
2633   my @indices = ();
2634   for (@$get) {
2635      if ($_->{color} eq 'yellow') {
2636         push @indices, $_->{index};
2637      }
2638   }
2639
2640   return \@indices;
2641}
2642
2643sub list_red_indices {
2644   my $self = shift;
2645
2646   my $get = $self->get_indices or return;
2647
2648   my @indices = ();
2649   for (@$get) {
2650      if ($_->{color} eq 'red') {
2651         push @indices, $_->{index};
2652      }
2653   }
2654
2655   return \@indices;
2656}
2657
2658#
2659# https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html
2660#
2661sub list_datatypes {
2662   my $self = shift;
2663
2664   return {
2665      core => [ qw(string long integer short byte double float data boolean binary) ],
2666   };
2667}
2668
2669#
2670# Return total hits for last www_search
2671#
2672sub get_hits_total {
2673   my $self = shift;
2674   my ($run) = @_;
2675
2676   $self->brik_help_run_undef_arg('get_hits_total', $run) or return;
2677
2678   if (ref($run) eq 'HASH') {
2679      if (exists($run->{hits}) && exists($run->{hits}{total})) {
2680         return $run->{hits}{total};
2681      }
2682   }
2683
2684   return $self->log->error("get_hits_total: last Command not compatible");
2685}
2686
2687sub disable_shard_allocation {
2688   my $self = shift;
2689
2690   my $settings = {
2691      persistent => {
2692         'cluster.routing.allocation.enable' => 'none',
2693      }
2694   };
2695
2696   return $self->put_cluster_settings($settings);
2697}
2698
2699sub enable_shard_allocation {
2700   my $self = shift;
2701
2702   my $settings = {
2703      persistent => { 
2704         'cluster.routing.allocation.enable' => 'all',
2705      }
2706   };
2707
2708   return $self->put_cluster_settings($settings);
2709}
2710
2711sub flush_synced {
2712   my $self = shift;
2713
2714   my $es = $self->_es;
2715   $self->brik_help_run_undef_arg('open', $es) or return;
2716
2717   my $r;
2718   eval {
2719      $r = $es->indices->flush_synced;
2720   };
2721   if ($@) {
2722      chomp($@);
2723      return $self->log->error("flush_synced: failed: [$@]");
2724   }
2725
2726   return $r;
2727}
2728
2729#
2730# https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html
2731#
2732# run client::elasticsearch create_snapshot_repository myrepo
2733#      "{ type => 'fs', settings => { compress => 'true', location => '/path/' } }"
2734#
2735# You have to set path.repo in elasticsearch.yml like:
2736# path.repo: ["/home/gomor/es-backups"]
2737#
2738# Search::Elasticsearch::Client::2_0::Direct::Snapshot
2739#
2740sub create_snapshot_repository {
2741   my $self = shift;
2742   my ($body, $repository_name) = @_;
2743
2744   my $es = $self->_es;
2745   $self->brik_help_run_undef_arg('open', $es) or return;
2746   $self->brik_help_run_undef_arg('create_snapshot_repository', $body) or return;
2747
2748   $repository_name ||= 'repository';
2749
2750   my %args = (
2751      repository => $repository_name,
2752      body => $body,
2753   );
2754
2755   my $r;
2756   eval {
2757      $r = $es->snapshot->create_repository(%args);
2758   };
2759   if ($@) {
2760      chomp($@);
2761      return $self->log->error("create_snapshot_repository: failed: [$@]");
2762   }
2763
2764   return $r;
2765}
2766
2767sub create_shared_fs_snapshot_repository {
2768   my $self = shift;
2769   my ($location, $repository_name) = @_;
2770
2771   $repository_name ||= 'repository';
2772   $self->brik_help_run_undef_arg('create_shared_fs_snapshot_repository', $location) or return;
2773
2774   if ($location !~ m{^/}) {
2775      return $self->log->error("create_shared_fs_snapshot_repository: you have to give ".
2776         "a full directory path, this one is invalid [$location]");
2777   }
2778
2779   my $body = {
2780      type => 'fs',
2781      settings => {
2782         compress => 'true',
2783         location => $location,
2784      },
2785   };
2786
2787   return $self->create_snapshot_repository($body, $repository_name);
2788}
2789
2790#
2791# Search::Elasticsearch::Client::2_0::Direct::Snapshot
2792#
2793sub get_snapshot_repositories {
2794   my $self = shift;
2795
2796   my $es = $self->_es;
2797   $self->brik_help_run_undef_arg('open', $es) or return;
2798
2799   my $r;
2800   eval {
2801      $r = $es->snapshot->get_repository;
2802   };
2803   if ($@) {
2804      chomp($@);
2805      return $self->log->error("get_snapshot_repositories: failed: [$@]");
2806   }
2807
2808   return $r;
2809}
2810
2811#
2812# Search::Elasticsearch::Client::2_0::Direct::Snapshot
2813#
2814sub get_snapshot_status {
2815   my $self = shift;
2816
2817   my $es = $self->_es;
2818   $self->brik_help_run_undef_arg('open', $es) or return;
2819
2820   my $r;
2821   eval {
2822      $r = $es->snapshot->status;
2823   };
2824   if ($@) {
2825      chomp($@);
2826      return $self->log->error("get_snapshot_status: failed: [$@]");
2827   }
2828
2829   return $r;
2830}
2831
2832#
2833# Search::Elasticsearch::Client::5_0::Direct::Snapshot
2834#
2835sub create_snapshot {
2836   my $self = shift;
2837   my ($snapshot_name, $repository_name, $body) = @_;
2838
2839   my $es = $self->_es;
2840   $self->brik_help_run_undef_arg('open', $es) or return;
2841
2842   $snapshot_name ||= 'snapshot';
2843   $repository_name ||= 'repository';
2844
2845   my %args = (
2846      repository => $repository_name,
2847      snapshot => $snapshot_name,
2848   );
2849   if (defined($body)) {
2850      $args{body} = $body;
2851   }
2852
2853   my $r;
2854   eval {
2855      $r = $es->snapshot->create(%args);
2856   };
2857   if ($@) {
2858      chomp($@);
2859      return $self->log->error("create_snapshot: failed: [$@]");
2860   }
2861
2862   return $r;
2863}
2864
2865sub create_snapshot_for_indices {
2866   my $self = shift;
2867   my ($indices, $snapshot_name, $repository_name) = @_;
2868
2869   $self->brik_help_run_undef_arg('create_snapshot_for_indices', $indices) or return;
2870
2871   $snapshot_name ||= 'snapshot';
2872   $repository_name ||= 'repository';
2873
2874   my $body = {
2875      indices => $indices,
2876   };
2877
2878   return $self->create_snapshot($snapshot_name, $repository_name, $body);
2879}
2880
2881sub is_snapshot_finished {
2882   my $self = shift;
2883
2884   my $status = $self->get_snapshot_status or return;
2885
2886   if (@{$status->{snapshots}} == 0) {
2887      return 1;
2888   }
2889
2890   return 0;
2891}
2892
2893sub get_snapshot_state {
2894   my $self = shift;
2895
2896   if ($self->is_snapshot_finished) {
2897      return $self->log->info("get_snapshot_state: is already finished");
2898   }
2899
2900   my $status = $self->get_snapshot_status or return;
2901
2902   my @indices_done = ();
2903   my @indices_not_done = ();
2904
2905   my $list = $status->{snapshots};
2906   for my $snapshot (@$list) {
2907      my $indices = $snapshot->{indices};
2908      for my $index (@$indices) {
2909         my $done = $index->{shards_stats}{done};
2910         if ($done) {
2911            push @indices_done, $index;
2912         }
2913         else {
2914            push @indices_not_done, $index;
2915         }
2916      }
2917   }
2918
2919   return { done => \@indices_done, not_done => \@indices_not_done };
2920}
2921
2922sub verify_snapshot_repository {
2923}
2924
2925sub delete_snapshot_repository {
2926   my $self = shift;
2927   my ($repository_name) = @_;
2928
2929   my $es = $self->_es;
2930   $self->brik_help_run_undef_arg('open', $es) or return;
2931   $self->brik_help_run_undef_arg('delete_snapshot_repository', $repository_name) or return;
2932
2933   my $r;
2934   eval {
2935      $r = $es->snapshot->delete_repository(
2936         repository => $repository_name,
2937      );
2938   };
2939   if ($@) {
2940      chomp($@);
2941      return $self->log->error("delete_snapshot_repository: failed: [$@]");
2942   }
2943
2944   return $r;
2945}
2946
2947sub get_snapshot {
2948   my $self = shift;
2949   my ($snapshot_name, $repository_name) = @_;
2950
2951   my $es = $self->_es;
2952   $self->brik_help_run_undef_arg('open', $es) or return;
2953
2954   $snapshot_name ||= 'snapshot';
2955   $repository_name ||= 'repository';
2956
2957   my $r;
2958   eval {
2959      $r = $es->snapshot->get(
2960         repository => $repository_name,
2961         snapshot => $snapshot_name,
2962      );
2963   };
2964   if ($@) {
2965      chomp($@);
2966      return $self->log->error("get_snapshot: failed: [$@]");
2967   }
2968
2969   return $r;
2970}
2971
2972#
2973# Search::Elasticsearch::Client::5_0::Direct::Snapshot
2974#
2975sub delete_snapshot {
2976   my $self = shift;
2977   my ($snapshot_name, $repository_name) = @_;
2978
2979   my $es = $self->_es;
2980   $self->brik_help_run_undef_arg('open', $es) or return;
2981   $self->brik_help_run_undef_arg('delete_snapshot', $snapshot_name) or return;
2982   $self->brik_help_run_undef_arg('delete_snapshot', $repository_name) or return;
2983
2984   my $timeout = $self->rtimeout;
2985
2986   my $r;
2987   eval {
2988      $r = $es->snapshot->delete(
2989         repository => $repository_name,
2990         snapshot => $snapshot_name,
2991         master_timeout => "${timeout}s",
2992      );
2993   };
2994   if ($@) {
2995      chomp($@);
2996      return $self->log->error("delete_snapshot: failed: [$@]");
2997   }
2998
2999   return $r;
3000}
3001
3002#
3003# https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html
3004#
3005sub restore_snapshot {
3006   my $self = shift;
3007   my ($snapshot_name, $repository_name, $body) = @_;
3008
3009   my $es = $self->_es;
3010   $snapshot_name ||= 'snapshot';
3011   $repository_name ||= 'repository';
3012   $self->brik_help_run_undef_arg('open', $es) or return;
3013   $self->brik_help_run_undef_arg('restore_snapshot', $snapshot_name) or return;
3014   $self->brik_help_run_undef_arg('restore_snapshot', $repository_name) or return;
3015
3016   my %args = (
3017      repository => $repository_name,
3018      snapshot => $snapshot_name,
3019   );
3020   if (defined($body)) {
3021      $args{body} = $body;
3022   }
3023
3024   my $r;
3025   eval {
3026      $r = $es->snapshot->restore(%args);
3027   };
3028   if ($@) {
3029      chomp($@);
3030      return $self->log->error("restore_snapshot: failed: [$@]");
3031   }
3032
3033   return $r;
3034}
3035
3036sub restore_snapshot_for_indices {
3037   my $self = shift;
3038   my ($indices, $snapshot_name, $repository_name) = @_;
3039
3040   $snapshot_name ||= 'snapshot';
3041   $repository_name ||= 'repository';
3042   $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $indices) or return;
3043   $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $snapshot_name) or return;
3044   $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $repository_name) or return;
3045
3046   my $body = {
3047      indices => $indices,
3048   };
3049
3050   return $self->restore_snapshot($snapshot_name, $repository_name, $body);
3051}
3052
3053# shard occupation
3054#
3055# curl -XGET "http://127.0.0.1:9200/_cat/shards?v
3056# Or https://www.elastic.co/guide/en/elasticsearch/reference/1.6/cluster-nodes-stats.html
3057#
3058# disk occuption:
3059# curl -XGET http://127.0.0.1:9200/_cat/nodes?h=ip,h,diskAvail,diskTotal
3060#
3061#
3062# Who is master: curl -XGET http://127.0.0.1:9200/_cat/master?v
3063#
3064
30651;
3066
3067__END__
3068
3069=head1 NAME
3070
3071Metabrik::Client::Elasticsearch - client::elasticsearch Brik
3072
3073=head1 SYNOPSIS
3074
3075   host:~> my $q = { term => { ip => "192.168.57.19" } }
3076   host:~> run client::elasticsearch open
3077   host:~> run client::elasticsearch query $q data-*
3078
3079=head1 DESCRIPTION
3080
3081Template to write a new Metabrik Brik.
3082
3083=head1 COPYRIGHT AND LICENSE
3084
3085Copyright (c) 2014-2017, Patrice E<lt>GomoRE<gt> Auffret
3086
3087You may distribute this module under the terms of The BSD 3-Clause License.
3088See LICENSE file in the source distribution archive.
3089
3090=head1 AUTHOR
3091
3092Patrice E<lt>GomoRE<gt> Auffret
3093
3094=cut
Note: See TracBrowser for help on using the repository browser.