source: repository/lib/Metabrik/Client/Elasticsearch.pm

Last change on this file was 992:6bd6acfc81d5, checked in by GomoR <gomor@…>, 2 weeks ago
  • update: copyright notice
  • new: support for Kali Linux operating system
  • remove: api::onyphe is no more included, it use handled in its own repository
File size: 108.8 KB
Line 
1#
2# $Id$
3#
4# client::elasticsearch Brik
5#
6package Metabrik::Client::Elasticsearch;
7use strict;
8use warnings;
9
10use base qw(Metabrik::Client::Rest);
11
12sub brik_properties {
13   return {
14      revision => '$Revision$',
15      tags => [ qw(unstable es es) ],
16      author => 'GomoR <GomoR[at]metabrik.org>',
17      license => 'http://opensource.org/licenses/BSD-3-Clause',
18      attributes => {
19         datadir => [ qw(datadir) ],
20         nodes => [ qw(node_list) ],
21         cxn_pool => [ qw(Sniff|Static|Static::NoPing) ],
22         date => [ qw(date) ],
23         index => [ qw(index) ],
24         type => [ qw(type) ],
25         from => [ qw(number) ],
26         size => [ qw(count) ],
27         max => [ qw(count) ],
28         max_flush_count => [ qw(count) ],
29         max_flush_size => [ qw(count) ],
30         rtimeout => [ qw(seconds) ],
31         sniff_rtimeout => [ qw(seconds) ],
32         try => [ qw(count) ],
33         use_bulk_autoflush => [ qw(0|1) ],
34         use_indexing_optimizations => [ qw(0|1) ],
35         use_ignore_id => [ qw(0|1) ],
36         csv_header => [ qw(fields) ],
37         csv_encoded_fields => [ qw(fields) ],
38         csv_object_fields => [ qw(fields) ],
39         encoding => [ qw(utf8|ascii) ],
40         _es => [ qw(INTERNAL) ],
41         _bulk => [ qw(INTERNAL) ],
42         _scroll => [ qw(INTERNAL) ],
43      },
44      attributes_default => {
45         nodes => [ qw(http://localhost:9200) ],
46         cxn_pool => 'Sniff',
47         from => 0,
48         size => 10,
49         max => 0,
50         index => '*',
51         type => '*',
52         rtimeout => 60,
53         sniff_rtimeout => 3,
54         try => 3,
55         max_flush_count => 1_000,
56         max_flush_size => 1_000_000,
57         use_bulk_autoflush => 1,
58         use_indexing_optimizations => 0,
59         use_ignore_id => 0,
60         encoding => 'utf8',
61      },
62      commands => {
63         open => [ qw(nodes_list|OPTIONAL cxn_pool|OPTIONAL) ],
64         open_bulk_mode => [ qw(index|OPTIONAL type|OPTIONAL) ],
65         open_scroll_scan_mode => [ qw(index|OPTIONAL size|OPTIONAL) ],
66         open_scroll => [ qw(index|OPTIONAL size|OPTIONAL type|OPTIONAL query|OPTIONAL) ],
67         close_scroll => [ ],
68         total_scroll => [ ],
69         next_scroll => [ qw(count|OPTIONAL) ],
70         reindex => [ qw(index_source index_destination type_destination|OPTIONAL) ],
71         get_reindex_tasks => [ ],
72         cancel_reindex_task => [ qw(id) ],
73         get_taskid => [ qw(id) ],
74         show_reindex_progress => [ ],
75         loop_show_reindex_progress => [ qw(seconds|OPTIONAL) ],
76         index_document => [ qw(document index|OPTIONAL type|OPTIONAL hash|OPTIONAL id|OPTIONAL) ],
77         index_bulk => [ qw(document index|OPTIONAL type|OPTIONAL hash|OPTIONAL id|OPTIONAL) ],
78         index_bulk_from_list => [ qw(document_list index|OPTIONAL type|OPTIONAL hash|OPTIONAL) ],
79         clean_deleted_from_index => [ qw(index) ],
80         update_document => [ qw(document id index|OPTIONAL type|OPTIONAL hash|OPTIONAL) ],
81         update_document_bulk => [ qw(document index|OPTIONAL type|OPTIONAL hash|OPTIONAL id|OPTIONAL) ],
82         bulk_flush => [ qw(index|OPTIONAL) ],
83         query => [ qw($query_hash index|OPTIONAL type|OPTIONAL hash|OPTIONAL) ],
84         count => [ qw(index|OPTIONAL type|OPTIONAL) ],
85         get_from_id => [ qw(id index|OPTIONAL type|OPTIONAL) ],
86         www_search => [ qw(query index|OPTIONAL type|OPTIONAL) ],
87         delete_index => [ qw(index|indices_list) ],
88         update_alias => [ qw(new_index alias) ],
89         delete_document => [ qw(index type id) ],
90         delete_by_query => [ qw($query_hash index type proceed|OPTIONAL) ],
91         show_indices => [ qw(string_filter|OPTIONAL) ],
92         show_nodes => [ ],
93         show_health => [ ],
94         show_recovery => [ ],
95         show_allocation => [ ],
96         list_indices => [ qw(regex|OPTIONAL) ],
97         get_indices => [ qw(string_filter|OPTIONAL) ],
98         get_index => [ qw(index|indices_list) ],
99         get_index_stats => [ qw(index) ],
100         list_index_types => [ qw(index) ],
101         list_index_fields => [ qw(index) ],
102         list_indices_version => [ qw(index|indices_list) ],
103         open_index => [ qw(index|indices_list) ],
104         close_index => [ qw(index|indices_list) ],
105         get_aliases => [ qw(index) ],
106         put_alias => [ qw(index alias) ],
107         delete_alias => [ qw(index alias) ],
108         is_mapping_exists => [ qw(index mapping) ],
109         get_mappings => [ qw(index type|OPTIONAL) ],
110         create_index => [ qw(index shards|OPTIONAL) ],
111         create_index_with_mappings => [ qw(index mappings) ],
112         info => [ qw(nodes_list|OPTIONAL) ],
113         version => [ qw(nodes_list|OPTIONAL) ],
114         get_templates => [ ],
115         list_templates => [ ],
116         get_template => [ qw(name) ],
117         put_mapping => [ qw(index type mapping) ],
118         put_mapping_from_json_file => [ qw(index type file) ],
119         update_mapping_from_json_file => [ qw(file index type) ],
120         put_template => [ qw(name template) ],
121         put_template_from_json_file => [ qw(file) ],
122         update_template_from_json_file => [ qw(file) ],
123         get_settings => [ qw(index|indices_list|OPTIONAL name|names_list|OPTIONAL) ],
124         put_settings => [ qw(settings_hash index|indices_list|OPTIONAL) ],
125         set_index_readonly => [ qw(index|indices_list boolean|OPTIONAL) ],
126         reset_index_readonly => [ qw(index|indices_list|OPTIONAL) ],
127         list_index_readonly => [ ],
128         set_index_number_of_replicas => [ qw(index|indices_list number) ],
129         set_index_refresh_interval => [ qw(index|indices_list number) ],
130         get_index_settings => [ qw(index|indices_list) ],
131         get_index_readonly => [ qw(index|indices_list) ],
132         get_index_number_of_replicas => [ qw(index|indices) ],
133         get_index_refresh_interval => [ qw(index|indices_list) ],
134         get_index_number_of_shards => [ qw(index|indices_list) ],
135         delete_template => [ qw(name) ],
136         is_index_exists => [ qw(index) ],
137         is_type_exists => [ qw(index type) ],
138         is_document_exists => [ qw(index type document) ],
139         parse_error_string => [ qw(string) ],
140         refresh_index => [ qw(index) ],
141         export_as => [ qw(format index size|OPTIONAL callback|OPTIONAL) ],
142         export_as_csv => [ qw(index size|OPTIONAL callback|OPTIONAL) ],
143         export_as_json => [ qw(index size|OPTIONAL callback|OPTIONAL) ],
144         import_from => [ qw(format input index|OPTIONAL type|OPTIONAL hash|OPTIONAL callback|OPTIONAL) ],
145         import_from_csv => [ qw(input index|OPTIONAL type|OPTIONAL hash|OPTIONAL callback|OPTIONAL) ],
146         import_from_json => [ qw(input index|OPTIONAL type|OPTIONAL hash|OPTIONAL callback|OPTIONAL) ],
147         import_from_csv_worker => [ qw(input_csv index|OPTIONAL type|OPTIONAL hash|OPTIONAL callback|OPTIONAL) ],
148         get_stats_process => [ ],
149         get_process => [ ],
150         get_cluster_state => [ ],
151         get_cluster_health => [ ],
152         get_cluster_settings => [ ],
153         put_cluster_settings => [ qw(settings) ],
154         count_green_indices => [ ],
155         count_yellow_indices => [ ],
156         count_red_indices => [ ],
157         list_green_indices => [ ],
158         list_yellow_indices => [ ],
159         list_red_indices => [ ],
160         count_indices => [ ],
161         list_indices_status => [ ],
162         count_shards => [ ],
163         count_size => [ qw(string_filter|OPTIONAL) ],
164         count_total_size => [ qw(string_filter|OPTIONAL) ],
165         count_count => [ ],
166         list_datatypes => [ ],
167         get_hits_total => [ ],
168         disable_shard_allocation => [ ],
169         enable_shard_allocation => [ ],
170         flush_synced => [ ],
171         create_snapshot_repository => [ qw(body repository_name|OPTIONAL) ],
172         create_shared_fs_snapshot_repository => [ qw(location
173            repository_name|OPTIONAL) ],
174         get_snapshot_repositories => [ ],
175         get_snapshot_status => [ ],
176         delete_snapshot_repository => [ qw(repository_name) ],
177         create_snapshot => [ qw(snapshot_name|OPTIONAL repository_name|OPTIONAL
178            body|OPTIONAL) ],
179         create_snapshot_for_indices => [ qw(indices snapshot_name|OPTIONAL
180            repository_name|OPTIONAL) ],
181         is_snapshot_finished => [ ],
182         get_snapshot_state => [ ],
183         get_snapshot => [ qw(snapshot_name|OPTIONAL repository_name|OPTIONAL) ],
184         delete_snapshot => [ qw(snapshot_name repository_name) ],
185         restore_snapshot => [ qw(snapshot_name repository_name body|OPTIONAL) ],
186         restore_snapshot_for_indices => [ qw(indices snapshot_name repository_name) ],
187      },
188      require_modules => {
189         'Metabrik::String::Json' => [ ],
190         'Metabrik::File::Csv' => [ ],
191         'Metabrik::File::Json' => [ ],
192         'Metabrik::File::Dump' => [ ],
193         'Metabrik::Format::Number' => [ ],
194         'Metabrik::Worker::Parallel' => [ ],
195         'Search::Elasticsearch' => [ ],
196      },
197   };
198}
199
200sub brik_preinit {
201   my $self = shift;
202
203   eval("use Search::Elasticsearch;");
204   if ($Search::Elasticsearch::VERSION < 5) {
205      $self->log->error("brik_preinit: please upgrade Search::Elasticsearch module ".
206         "with: run perl::module install Search::Elasticsearch");
207   }
208
209   return $self->SUPER::brik_preinit;
210}
211
212sub open {
213   my $self = shift;
214   my ($nodes, $cxn_pool) = @_;
215
216   $nodes ||= $self->nodes;
217   $cxn_pool ||= $self->cxn_pool;
218   $self->brik_help_run_undef_arg('open', $nodes) or return;
219   $self->brik_help_run_undef_arg('open', $cxn_pool) or return;
220   $self->brik_help_run_invalid_arg('open', $nodes, 'ARRAY') or return;
221   $self->brik_help_run_empty_array_arg('open', $nodes) or return;
222
223   for my $node (@$nodes) {
224      if ($node !~ m{https?://}) {
225         return $self->log->error("open: invalid node[$node], must start with http(s)");
226      }
227   }
228
229   my $timeout = $self->rtimeout;
230
231   my $nodes_str = join('|', @$nodes);
232   $self->log->debug("open: using nodes [$nodes_str]");
233
234   #
235   # Timeout description here:
236   #
237   # Search::Elasticsearch::Role::Cxn
238   #
239
240   my $es = Search::Elasticsearch->new(
241      nodes => $nodes,
242      cxn_pool => $cxn_pool,
243      timeout => $timeout,
244      max_retries => $self->try,
245      retry_on_timeout => 1,
246      sniff_timeout => $self->sniff_rtimeout, # seconds, default 1
247      request_timeout => 60,  # seconds, default 30
248      ping_timeout => 5,  # seconds, default 2
249      dead_timeout => 120,  # seconds, detault 60
250      max_dead_timeout => 3600,  # seconds, default 3600
251      sniff_request_timeout => 15, # seconds, default 2
252      #trace_to => 'Stderr',  # For debug purposes
253   );
254   if (! defined($es)) {
255      return $self->log->error("open: failed");
256   }
257
258   $self->_es($es);
259
260   return $nodes;
261}
262
263#
264# Search::Elasticsearch::Client::5_0::Bulk
265#
266sub open_bulk_mode {
267   my $self = shift;
268   my ($index, $type) = @_;
269
270   $index ||= $self->index;
271   $type ||= $self->type;
272   my $es = $self->_es;
273   $self->brik_help_run_undef_arg('open', $es) or return;
274   $self->brik_help_run_undef_arg('open_bulk_mode', $index) or return;
275   $self->brik_help_run_undef_arg('open_bulk_mode', $type) or return;
276
277   my %args = (
278      index => $index,
279      type => $type,
280      on_error => sub {
281         #my ($action, $response, $i) = @_;
282
283         #print Data::Dumper::Dumper($action)."\n";
284         #print Data::Dumper::Dumper($response)."\n";
285         #print Data::Dumper::Dumper($i)."\n";
286         print Data::Dumper::Dumper(\@_)."\n";
287      },
288   );
289
290   if ($self->use_bulk_autoflush) {
291      my $max_count = $self->max_flush_count || 1_000;
292      my $max_size = $self->max_flush_size || 1_000_000;
293
294      $args{max_count} = $max_count;
295      $args{max_size} = $max_size;
296      $args{max_time} = 0;
297
298      $self->log->info("open_bulk_mode: opening with max_flush_count [$max_count] and ".
299         "max_flush_size [$max_size]");
300   }
301   else {
302      $args{max_count} = 0;
303      $args{max_size} = 0;
304      $args{max_time} = 0;
305      $args{on_error} = undef;
306      #$args{on_success} = sub {
307         #my ($action, $response, $i) = @_;
308      #};
309
310      $self->log->info("open_bulk_mode: opening without automatic flushing");
311   }
312
313   my $bulk;
314   eval {
315      $bulk = $es->bulk_helper(%args);
316   };
317   if ($@) {
318      chomp($@);
319      return $self->log->error("open_bulk_mode: failed: [$@]");
320   }
321
322   $self->_bulk($bulk);
323
324   return $self->nodes;
325}
326
327sub open_scroll_scan_mode {
328   my $self = shift;
329   my ($index, $size) = @_;
330
331   my $version = $self->version or return;
332   if ($version ge "5.0.0") {
333      return $self->log->error("open_scroll_scan_mode: Command not supported for ES version ".
334         "$version, try open_scroll Command instead");
335   }
336
337   $index ||= $self->index;
338   $size ||= $self->size;
339   my $es = $self->_es;
340   $self->brik_help_run_undef_arg('open', $es) or return;
341   $self->brik_help_run_undef_arg('open_scroll_scan_mode', $index) or return;
342   $self->brik_help_run_undef_arg('open_scroll_scan_mode', $size) or return;
343
344   my $scroll;
345   eval {
346      $scroll = $es->scroll_helper(
347         index => $index,
348         search_type => 'scan',
349         size => $size,
350      );
351   };
352   if ($@) {
353      chomp($@);
354      return $self->log->error("open_scroll_scan_mode: failed: $@");
355   }
356
357   $self->_scroll($scroll);
358
359   return $self->nodes;
360}
361
362#
363# Search::Elasticsearch::Client::5_0::Scroll
364#
365sub open_scroll {
366   my $self = shift;
367   my ($index, $size, $type, $query) = @_;
368
369   my $version = $self->version or return;
370   if ($version lt "5.0.0") {
371      return $self->log->error("open_scroll: Command not supported for ES version ".
372         "$version, try open_scroll_scan_mode Command instead");
373   }
374
375   $query ||= { query => { match_all => {} } };
376   $index ||= $self->index;
377   $type ||= $self->type;
378   $size ||= $self->size;
379   my $es = $self->_es;
380   $self->brik_help_run_undef_arg('open', $es) or return;
381   $self->brik_help_run_undef_arg('open_scroll', $index) or return;
382   $self->brik_help_run_undef_arg('open_scroll', $size) or return;
383
384   my $timeout = $self->rtimeout;
385
386   my %args = (
387      scroll => "${timeout}s",
388      scroll_in_qs => 1,  # By default (0), pass scroll_id in request body. When 1, pass
389                          # it in query string.
390      index => $index,
391      size => $size,
392      body => $query,
393   );
394   if ($type ne '*') {
395      $args{type} = $type;
396   }
397
398   #
399   # https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html
400   #
401   my $scroll;
402   eval {
403      $scroll = $es->scroll_helper(%args);
404   };
405   if ($@) {
406      chomp($@);
407      return $self->log->error("open_scroll: failed: $@");
408   }
409
410   $self->_scroll($scroll);
411
412   $self->log->verbose("open_scroll: opened with size [$size] and timeout [${timeout}s]");
413
414   return $self->nodes;
415}
416
417#
418# Search::Elasticsearch::Client::5_0::Scroll
419#
420sub close_scroll {
421   my $self = shift;
422
423   my $scroll = $self->_scroll;
424   if (! defined($scroll)) {
425      return 1;
426   }
427
428   $scroll->finish;
429   $self->_scroll(undef);
430
431   return 1;
432}
433
434sub total_scroll {
435   my $self = shift;
436
437   my $scroll = $self->_scroll;
438   $self->brik_help_run_undef_arg('open_scroll', $scroll) or return;
439
440   my $total;
441   eval {
442      $total = $scroll->total;
443   };
444   if ($@) {
445      chomp($@);
446      return $self->log->error("total_scroll: failed with: [$@]");
447   }
448
449   return $total;
450}
451
452sub next_scroll {
453   my $self = shift;
454   my ($count) = @_;
455
456   $count ||= 1;
457
458   my $scroll = $self->_scroll;
459   $self->brik_help_run_undef_arg('open_scroll', $scroll) or return;
460
461   my $next;
462   eval {
463      if ($count > 1) {
464         my @docs = $scroll->next($count);
465         if (@docs > 0) {
466            $next = \@docs;
467         }
468      }
469      else {
470         $next = $scroll->next;
471      }
472   };
473   if ($@) {
474      chomp($@);
475      return $self->log->error("next_scroll: failed with: [$@]");
476   }
477
478   return $next;
479}
480
481#
482# Search::Elasticsearch::Client::5_0::Direct
483#
484sub index_document {
485   my $self = shift;
486   my ($doc, $index, $type, $hash, $id) = @_;
487
488   $index ||= $self->index;
489   $type ||= $self->type;
490   my $es = $self->_es;
491   $self->brik_help_run_undef_arg('open', $es) or return;
492   $self->brik_help_run_undef_arg('index_document', $doc) or return;
493   $self->brik_help_run_invalid_arg('index_document', $doc, 'HASH') or return;
494   $self->brik_help_set_undef_arg('index', $index) or return;
495   $self->brik_help_set_undef_arg('type', $type) or return;
496
497   my %args = (
498      index => $index,
499      type => $type,
500      body => $doc,
501   );
502   if (defined($id)) {
503      $args{id} = $id;
504   }
505
506   if (defined($hash)) {
507      $self->brik_help_run_invalid_arg('index_document', $hash, 'HASH')
508         or return;
509      my $this_hash = { %$hash };
510      if (defined($hash->{routing}) && defined($doc->{$hash->{routing}})) {
511         $this_hash->{routing} = $doc->{$hash->{routing}};
512      }
513      %args = ( %args, %$this_hash );
514   }
515
516   my $r;
517   eval {
518      $r = $es->index(%args);
519   };
520   if ($@) {
521      chomp($@);
522      return $self->log->error("index_document: index failed for ".
523         "index [$index]: [$@]");
524   }
525
526   return $r;
527}
528
529#
530# https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html
531#
532sub reindex {
533   my $self = shift;
534   my ($index, $new, $type) = @_;
535
536   my $es = $self->_es;
537   $self->brik_help_run_undef_arg('open', $es) or return;
538   $self->brik_help_run_undef_arg('reindex', $index) or return;
539   $self->brik_help_run_undef_arg('reindex', $new) or return;
540
541   my %args = (
542      body => {
543         conflicts => 'proceed',
544         source => { index => $index },
545         dest => { index => $new },
546      },
547      wait_for_completion => 'false',  # Immediately return the task.
548   );
549
550   # Change the type for destination doc
551   if (defined($type)) {
552      $args{body}{dest}{type} = $type;
553   }
554
555   my $r;
556   eval {
557      $r = $es->reindex(%args);
558   };
559   if ($@) {
560      chomp($@);
561      return $self->log->error("reindex: reindex failed for index [$index]: [$@]");
562   }
563
564   return $r;
565}
566
567#
568# List reindex tasks
569#
570# curl -X GET "localhost:9200/_tasks?detailed=true&actions=*reindex" | jq .
571#
572# Cancel reindex task
573#
574# curl -X POST "localhost:9200/_tasks/7VelPnOxQm21HtuJNFUAvQ:120914725/_cancel" | jq .
575#
576
577#
578# Search::Elasticsearch::Client::6_0::Direct::Tasks
579#
580sub get_reindex_tasks {
581   my $self = shift;
582
583   my $es = $self->_es;
584   $self->brik_help_run_undef_arg('open', $es) or return;
585
586   my $t = $es->tasks;
587
588   my $list = $t->list;
589   my $nodes = $list->{nodes};
590   if (! defined($nodes)) {
591      return $self->log->error("get_reindex_tasks: no nodes found");
592   }
593
594   my %tasks = ();
595   for my $node (keys %$nodes) {
596      for my $id (keys %{$nodes->{$node}}) {
597         my $tasks = $nodes->{$node}{tasks};
598         for my $task (keys %$tasks) {
599            my $action = $tasks->{$task}{action};
600            if ($action eq 'indices:data/write/reindex' && !exists($tasks{$task})) {
601               $tasks{$task} = $tasks->{$task};
602            }
603         }
604      }
605   }
606
607   return \%tasks;
608}
609
610sub cancel_reindex_task {
611   my $self = shift;
612   my ($id) = @_;
613
614   my $es = $self->_es;
615   $self->brik_help_run_undef_arg('open', $es) or return;
616   $self->brik_help_run_undef_arg('cancel_reindex_task', $id) or return;
617
618   my $t = $es->tasks;
619
620   return $t->cancel(task_id => $id);
621}
622
623sub get_taskid {
624   my $self = shift;
625   my ($id) = @_;
626
627   my $es = $self->_es;
628   $self->brik_help_run_undef_arg('open', $es) or return;
629   $self->brik_help_run_undef_arg('get_taskid', $id) or return;
630
631   my $t = $es->tasks;
632
633   return $t->get(task_id => $id);
634}
635
636sub show_reindex_progress {
637   my $self = shift;
638
639   my $es = $self->_es;
640   $self->brik_help_run_undef_arg('open', $es) or return;
641
642   my $tasks = $self->get_reindex_tasks or return;
643   if (! keys %$tasks) {
644      $self->log->info("show_reindex_progress: no reindex task in progress");
645      return 0;
646   }
647
648   for my $id (keys %$tasks) {
649      my $task = $self->get_taskid($id) or next;
650
651      my $status = $task->{task}{status};
652      my $desc = $task->{task}{description};
653      my $total = $status->{total};
654      my $created = $status->{created};
655      my $deleted = $status->{deleted};
656      my $updated = $status->{updated};
657
658      my $perc = ($created + $deleted + $updated) / $total * 100;
659
660      printf("> Task [%s]: %.02f%%\n", $desc, $perc);
661      print "created[$created] deleted[$deleted] updated[$updated] total[$total]\n";
662   }
663
664   return 1;
665}
666
667sub loop_show_reindex_progress {
668   my $self = shift;
669   my ($sec) = @_;
670
671   $sec ||= 60;
672   my $es = $self->_es;
673   $self->brik_help_run_undef_arg('open', $es) or return;
674
675   while (1) {
676      $self->show_reindex_progress or return;
677      sleep($sec);
678   }
679
680   return 1;
681}
682
683sub reindex_with_mapping_from_json_file {
684   my $self = shift;
685   my ($index, $new, $file) = @_;
686
687   my $es = $self->_es;
688   $self->brik_help_run_undef_arg('open', $es) or return;
689   $self->brik_help_run_undef_arg('reindex_with_mapping_from_json_file', $index)
690      or return;
691   $self->brik_help_run_undef_arg('reindex_with_mapping_from_json_file', $new) or return;
692   $self->brik_help_run_undef_arg('reindex_with_mapping_from_json_file', $file) or return;
693   $self->brik_help_run_file_not_found('reindex_with_mapping_from_json_file', $file)
694      or return;
695
696   my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
697   my $json = $fj->read($file) or return;
698
699   return $self->reindex($index, $new, $json);
700}
701
702#
703# Search::Elasticsearch::Client::5_0::Direct
704#
705# To execute this Command using routing requires to use the correct field
706# value directly in $hash->{routing}. We cannot "guess" it from arguments,
707# this would be a little bit complicated to do in an efficient way.
708#
709sub update_document {
710   my $self = shift;
711   my ($doc, $id, $index, $type, $hash) = @_;
712
713   $index ||= $self->index;
714   $type ||= $self->type;
715   my $es = $self->_es;
716   $self->brik_help_run_undef_arg('open', $es) or return;
717   $self->brik_help_run_undef_arg('update_document', $doc) or return;
718   $self->brik_help_run_invalid_arg('update_document', $doc, 'HASH') or return;
719   $self->brik_help_run_undef_arg('update_document', $id) or return;
720   $self->brik_help_set_undef_arg('index', $index) or return;
721   $self->brik_help_set_undef_arg('type', $type) or return;
722
723   my %args = (
724      id => $id,
725      index => $index,
726      type => $type,
727      body => { doc => $doc },
728   );
729
730   if (defined($hash)) {
731      $self->brik_help_run_invalid_arg('update_document', $hash, 'HASH')
732         or return;
733      %args = ( %args, %$hash );
734   }
735
736   my $r;
737   eval {
738      $r = $es->update(%args);
739   };
740   if ($@) {
741      chomp($@);
742      return $self->log->error("update_document: index failed for index [$index]: [$@]");
743   }
744
745   return $r;
746}
747
748#
749# Search::Elasticsearch::Client::5_0::Bulk
750#
751sub index_bulk {
752   my $self = shift;
753   my ($doc, $index, $type, $hash, $id) = @_;
754
755   my $bulk = $self->_bulk;
756   $index ||= $self->index;
757   $type ||= $self->type;
758   $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
759   $self->brik_help_run_undef_arg('index_bulk', $doc) or return;
760   $self->brik_help_set_undef_arg('index', $index) or return;
761   $self->brik_help_set_undef_arg('type', $type) or return;
762
763   my %args = (
764      source => $doc,
765   );
766   if (defined($id)) {
767      $args{id} = $id;
768   }
769
770   if (defined($hash)) {
771      $self->brik_help_run_invalid_arg('index_bulk', $hash, 'HASH') or return;
772      my $this_hash = { %$hash };
773      if (defined($hash->{routing}) && defined($doc->{$hash->{routing}})) {
774         $this_hash->{routing} = $doc->{$hash->{routing}};
775      }
776      %args = ( %args, %$this_hash );
777   }
778
779   my $r;
780   eval {
781      $r = $bulk->add_action(index => \%args);
782   };
783   if ($@) {
784      chomp($@);
785      my $p = $self->parse_error_string($@);
786      if (defined($p) && exists($p->{class})) {
787         my $class = $p->{class};
788         my $code = $p->{code};
789         my $node = $p->{node};
790         return $self->log->error("index_bulk: failed for index [$index] with error ".
791            "[$class] code [$code] for node [$node]");
792      }
793      else {
794         return $self->log->error("index_bulk: index failed for index [$index]: [$@]");
795      }
796   }
797
798   return $r;
799}
800
801#
802# Allows to index multiple docs at one time
803# $bulk->index({ source => $doc1 }, { source => $doc2 }, ...);
804#
805sub index_bulk_from_list {
806   my $self = shift;
807   my ($list, $index, $type, $hash) = @_;
808
809   my $bulk = $self->_bulk;
810   $index ||= $self->index;
811   $type ||= $self->type;
812   $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
813   $self->brik_help_run_undef_arg('index_bulk_from_list', $list) or return;
814   $self->brik_help_run_invalid_arg('index_bulk_from_list', $list, 'ARRAY')
815      or return;
816   $self->brik_help_run_empty_array_arg('index_bulk_from_list', $list)
817      or return;
818   $self->brik_help_set_undef_arg('index', $index) or return;
819   $self->brik_help_set_undef_arg('type', $type) or return;
820
821   if (defined($hash)) {
822      $self->brik_help_run_invalid_arg('index_bulk_from_list', $hash, 'HASH')
823         or return;
824   }
825
826   my @args = ();
827   for my $doc (@$list) {
828      my %args = (
829         source => $doc,
830      );
831      if (defined($hash)) {
832         my $this_hash = { %$hash };
833         if (defined($hash->{routing}) && defined($doc->{$hash->{routing}})) {
834            $this_hash->{routing} = $doc->{$hash->{routing}};
835         }
836         %args = ( %args, %$this_hash );
837      }
838      push @args, \%args;
839   }
840
841   my $r;
842   eval {
843      $r = $bulk->index(@args);
844   };
845   if ($@) {
846      chomp($@);
847      my $p = $self->parse_error_string($@);
848      if (defined($p) && exists($p->{class})) {
849         my $class = $p->{class};
850         my $code = $p->{code};
851         my $node = $p->{node};
852         return $self->log->error("index_bulk: failed for index [$index] with error ".
853            "[$class] code [$code] for node [$node]");
854      }
855      else {
856         return $self->log->error("index_bulk: index failed for index [$index]: [$@]");
857      }
858   }
859
860   return $r;
861}
862
863#
864# https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-forcemerge.html
865#
866sub clean_deleted_from_index {
867   my $self = shift;
868   my ($index) = @_;
869
870   $self->brik_help_run_undef_arg('clean_deleted_from_index', $index) or return;
871
872   my $es = $self->_es;
873   $self->brik_help_run_undef_arg('open', $es) or return;
874
875   my $indices = $self->_es->indices;
876
877   my $r;
878   eval {
879      $r = $indices->forcemerge(
880         index => $index,
881         only_expunge_deletes => 'true',
882      );
883   };
884   if ($@) {
885      chomp($@);
886      my $p = $self->parse_error_string($@);
887      if (defined($p) && exists($p->{class})) {
888         my $class = $p->{class};
889         my $code = $p->{code};
890         my $node = $p->{node};
891         return $self->log->error("clean_deleted_from_index: failed for index ".
892            "[$index] with error [$class] code [$code] for node [$node]");
893      }
894      else {
895         return $self->log->error("clean_deleted_from_index: index failed for ".
896            "index [$index]: [$@]");
897      }
898   }
899
900   return $r;
901}
902
903#
904# To execute this Command using routing requires to use the correct field
905# value directly in $hash->{routing}. We cannot "guess" it from arguments,
906# this would be a little bit complicated to do in an efficient way.
907#
908sub update_document_bulk {
909   my $self = shift;
910   my ($doc, $index, $type, $hash, $id) = @_;
911
912   my $bulk = $self->_bulk;
913   $index ||= $self->index;
914   $type ||= $self->type;
915   $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
916   $self->brik_help_run_undef_arg('update_document_bulk', $doc) or return;
917   $self->brik_help_set_undef_arg('index', $index) or return;
918   $self->brik_help_set_undef_arg('type', $type) or return;
919
920   my %args = (
921      index => $index,
922      type => $type,
923      doc => $doc,
924   );
925   if (defined($id)) {
926      $args{id} = $id;
927   }
928
929   if (defined($hash)) {
930      $self->brik_help_run_invalid_arg('update_document_bulk', $hash, 'HASH')
931         or return;
932      %args = ( %args, %$hash );
933   }
934
935   my $r;
936   eval {
937      $r = $bulk->update(\%args);
938   };
939   if ($@) {
940      chomp($@);
941      my $p = $self->parse_error_string($@);
942      if (defined($p) && exists($p->{class})) {
943         my $class = $p->{class};
944         my $code = $p->{code};
945         my $node = $p->{node};
946         return $self->log->error("update_document_bulk: failed for index [$index] ".
947            "with error [$class] code [$code] for node [$node]");
948      }
949      else {
950         return $self->log->error("update_document_bulk: index failed for ".
951            "index [$index]: [$@]");
952      }
953   }
954
955   return $r;
956}
957
958#
959# We may have to call refresh_index after a bulk_flush, so we give an additional
960# optional Argument for given index.
961#
962sub bulk_flush {
963   my $self = shift;
964   my ($index) = @_;
965
966   my $bulk = $self->_bulk;
967   $self->brik_help_run_undef_arg('open_bulk_mode', $bulk) or return;
968
969   my $try = $self->try;
970
971RETRY:
972
973   my $r;
974   eval {
975      $r = $bulk->flush;
976   };
977   if ($@) {
978      chomp($@);
979      if (--$try == 0) {
980         my $p = $self->parse_error_string($@);
981         if (defined($p) && exists($p->{class})) {
982            my $class = $p->{class};
983            my $code = $p->{code};
984            my $node = $p->{node};
985            return $self->log->error("bulk_flush: failed after [$try] tries with error ".
986               "[$class] code [$code] for node [$node]");
987         }
988         else {
989            return $self->log->error("bulk_flush: failed after [$try]: [$@]");
990         }
991      }
992      $self->log->warning("bulk_flush: sleeping 10 seconds before retry cause error ".
993               "[$@]");
994      sleep 10;
995      goto RETRY;
996   }
997
998   if (defined($index)) {
999      $self->refresh_index($index);
1000   }
1001
1002   return $r;
1003}
1004
1005#
1006# Search::Elasticsearch::Client::2_0::Direct
1007# Search::Elasticsearch::Client::5_0::Direct
1008#
1009sub count {
1010   my $self = shift;
1011   my ($index, $type) = @_;
1012
1013   $index ||= $self->index;
1014   $type ||= $self->type;
1015   my $es = $self->_es;
1016   $self->brik_help_run_undef_arg('open', $es) or return;
1017
1018   my %args = ();
1019   if (defined($index) && $index ne '*') {
1020      $args{index} = $index;
1021   }
1022   if (defined($type) && $type ne '*') {
1023      $args{type} = $type;
1024   }
1025
1026   #$args{body} = {
1027      #query => {
1028         #match => { title => 'Elasticsearch clients' },
1029      #},
1030   #}
1031
1032   my $r;
1033   my $version = $self->version or return;
1034   if ($version ge "5.0.0") {
1035      eval {
1036         $r = $es->count(%args);
1037      };
1038   }
1039   else {
1040      eval {
1041         $r = $es->search(
1042            index => $index,
1043            type => $type,
1044            search_type => 'count',
1045            body => {
1046               query => {
1047                  match_all => {},
1048               },
1049            },
1050         );
1051      };
1052   }
1053   if ($@) {
1054      chomp($@);
1055      return $self->log->error("count: count failed for index [$index]: [$@]");
1056   }
1057
1058   if ($version ge "5.0.0") {
1059      if (exists($r->{count})) {
1060         return $r->{count};
1061      }
1062   }
1063   elsif (exists($r->{hits}) && exists($r->{hits}{total})) {
1064      return $r->{hits}{total};
1065   }
1066
1067   return $self->log->error("count: nothing found");
1068}
1069
1070#
1071# https://www.elastic.co/guide/en/elasticsearch/reference/current/full-text-queries.html
1072# https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html
1073#
1074# Example: my $q = { query => { term => { ip => "192.168.57.19" } } }
1075#
1076# To perform a query using routing requires to use the correct field
1077# value directly in $hash->{routing}. We cannot "guess" it from $q,
1078# this would be a little bit complicated to do in an efficient way.
1079#
1080sub query {
1081   my $self = shift;
1082   my ($query, $index, $type, $hash) = @_;
1083
1084   $index ||= $self->index;
1085   $type ||= $self->type;
1086   my $es = $self->_es;
1087   $self->brik_help_run_undef_arg('open', $es) or return;
1088   $self->brik_help_run_undef_arg('query', $query) or return;
1089   $self->brik_help_set_undef_arg('index', $index) or return;
1090   $self->brik_help_set_undef_arg('type', $type) or return;
1091   $self->brik_help_run_invalid_arg('query', $query, 'HASH') or return;
1092
1093   my $timeout = $self->rtimeout;
1094
1095   my %args = (
1096      index => $index,
1097      body => $query,
1098   );
1099
1100   if (defined($hash)) {
1101      $self->brik_help_run_invalid_arg('query', $hash, 'HASH') or return;
1102      %args = ( %args, %$hash );
1103   }
1104
1105   if ($type ne '*') {
1106      $args{type} = $type;
1107   }
1108
1109   my $r;
1110   eval {
1111      $r = $es->search(%args);
1112   };
1113   if ($@) {
1114      chomp($@);
1115      return $self->log->error("query: failed for index [$index]: [$@]");
1116   }
1117
1118   return $r;
1119}
1120
1121sub get_from_id {
1122   my $self = shift;
1123   my ($id, $index, $type) = @_;
1124
1125   $index ||= $self->index;
1126   $type ||= $self->type;
1127   my $es = $self->_es;
1128   $self->brik_help_run_undef_arg('open', $es) or return;
1129   $self->brik_help_run_undef_arg('get_from_id', $id) or return;
1130   $self->brik_help_set_undef_arg('index', $index) or return;
1131   $self->brik_help_set_undef_arg('type', $type) or return;
1132
1133   my $r;
1134   eval {
1135      $r = $es->get(
1136         index => $index,
1137         type => $type,
1138         id => $id,
1139      );
1140   };
1141   if ($@) {
1142      chomp($@);
1143      return $self->log->error("get_from_id: get failed for index [$index]: [$@]");
1144   }
1145
1146   return $r;
1147}
1148
1149#
1150# https://www.elastic.co/guide/en/elasticsearch/reference/current/search-uri-request.html
1151#
1152sub www_search {
1153   my $self = shift;
1154   my ($query, $index, $type) = @_;
1155
1156   $index ||= $self->index;
1157   $type ||= $self->type;
1158   $self->brik_help_run_undef_arg('www_search', $query) or return;
1159   $self->brik_help_set_undef_arg('index', $index) or return;
1160   $self->brik_help_set_undef_arg('type', $type) or return;
1161
1162   my $from = $self->from;
1163   my $size = $self->size;
1164
1165   my $sj = Metabrik::String::Json->new_from_brik_init($self) or return;
1166
1167   my $nodes = $self->nodes;
1168   for my $node (@$nodes) {
1169      # http://localhost:9200/INDEX/TYPE/_search/?size=SIZE&q=QUERY
1170      my $url = "$node/$index";
1171      if ($type ne '*') {
1172         $url .= "/$type";
1173      }
1174      $url .= "/_search/?from=$from&size=$size&q=".$query;
1175
1176      my $get = $self->SUPER::get($url) or next;
1177      my $body = $get->{content};
1178
1179      my $decoded = $sj->decode($body) or next;
1180
1181      return $decoded;
1182   }
1183
1184   return;
1185}
1186
1187#
1188# Search::Elasticsearch::Client::2_0::Direct::Indices
1189#
1190sub delete_index {
1191   my $self = shift;
1192   my ($index) = @_;
1193
1194   my $es = $self->_es;
1195   $self->brik_help_run_undef_arg('open', $es) or return;
1196   $self->brik_help_run_undef_arg('delete_index', $index) or return;
1197   $self->brik_help_run_invalid_arg('delete_index', $index, 'ARRAY', 'SCALAR') or return;
1198
1199   my %args = (
1200      index => $index,
1201   );
1202
1203   my $r;
1204   eval {
1205      $r = $es->indices->delete(%args);
1206   };
1207   if ($@) {
1208      chomp($@);
1209      return $self->log->error("delete_index: delete failed for index [$index]: [$@]");
1210   }
1211
1212   return $r;
1213}
1214
1215#
1216# Search::Elasticsearch::Client::2_0::Direct::Indices
1217#
1218# To execute this Command using routing requires to use the correct field
1219# value directly in $hash->{routing}. We cannot "guess" it from arguments,
1220# this would be a little bit complicated to do in an efficient way.
1221#
1222sub delete_document {
1223   my $self = shift;
1224   my ($index, $type, $id, $hash) = @_;
1225
1226   my $es = $self->_es;
1227   $self->brik_help_run_undef_arg('open', $es) or return;
1228   $self->brik_help_run_undef_arg('delete_document', $index) or return;
1229   $self->brik_help_run_undef_arg('delete_document', $type) or return;
1230   $self->brik_help_run_undef_arg('delete_document', $id) or return;
1231
1232   my %args = (
1233      index => $index,
1234      type => $type,
1235      id => $id,
1236   );
1237
1238   if (defined($hash)) {
1239      $self->brik_help_run_invalid_arg('delete_document', $hash, 'HASH')
1240         or return;
1241      %args = ( %args, %$hash );
1242   }
1243
1244   my $r;
1245   eval {
1246      $r = $es->delete(%args);
1247   };
1248   if ($@) {
1249      chomp($@);
1250      return $self->log->error("delete_document: delete failed for index [$index]: [$@]");
1251   }
1252
1253   return $r;
1254}
1255
1256#
1257# https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html
1258#
1259# Example: my $q = { query => { term => { ip => "192.168.57.19" } } }
1260#
1261sub delete_by_query {
1262   my $self = shift;
1263   my ($query, $index, $type, $proceed) = @_;
1264
1265   my $es = $self->_es;
1266   $self->brik_help_run_undef_arg('open', $es) or return;
1267   $self->brik_help_run_undef_arg('delete_by_query', $query) or return;
1268   $self->brik_help_run_undef_arg('delete_by_query', $index) or return;
1269   $self->brik_help_run_undef_arg('delete_by_query', $type) or return;
1270   $self->brik_help_run_invalid_arg('delete_by_query', $query, 'HASH') or return;
1271
1272   my $timeout = $self->rtimeout;
1273
1274   my %args = (
1275      index => $index,
1276      type => $type,
1277      body => $query,
1278   );
1279
1280   if (defined($proceed) && $proceed) {
1281      $args{conflicts} = 'proceed';
1282   }
1283
1284   my $r;
1285   eval {
1286      $r = $es->delete_by_query(%args);
1287   };
1288   if ($@) {
1289      chomp($@);
1290      return $self->log->error("delete_by_query: failed for index [$index]: [$@]");
1291   }
1292
1293   # This may fail, we ignore it.
1294   $self->refresh_index($index);
1295
1296   return $r;
1297}
1298
1299#
1300# Search::Elasticsearch::Client::2_0::Direct::Cat
1301#
1302# https://www.elastic.co/guide/en/elasticsearch/reference/current/cat-indices.html
1303#
1304sub show_indices {
1305   my $self = shift;
1306   my ($string) = @_;
1307
1308   my $es = $self->_es;
1309   $self->brik_help_run_undef_arg('open', $es) or return;
1310
1311   my $r;
1312   eval {
1313      $r = $es->cat->indices;
1314   };
1315   if ($@) {
1316      chomp($@);
1317      return $self->log->error("show_indices: failed: [$@]");
1318   }
1319
1320   my @lines = split(/\n/, $r);
1321
1322   if (@lines == 0) {
1323      $self->log->warning("show_indices: nothing returned, no index?");
1324   }
1325
1326   my @filtered = ();
1327   if (defined($string)) {
1328      for (@lines) {
1329         if (m{$string}) {
1330            push @filtered, $_;
1331         }
1332      }
1333      @lines = @filtered;
1334   }
1335
1336   return \@lines;
1337}
1338
1339#
1340# Search::Elasticsearch::Client::2_0::Direct::Cat
1341#
1342sub show_nodes {
1343   my $self = shift;
1344
1345   my $es = $self->_es;
1346   $self->brik_help_run_undef_arg('open', $es) or return;
1347
1348   my $r;
1349   eval {
1350      $r = $es->cat->nodes;
1351   };
1352   if ($@) {
1353      chomp($@);
1354      return $self->log->error("show_nodes: failed: [$@]");
1355   }
1356
1357   my @lines = split(/\n/, $r);
1358
1359   if (@lines == 0) {
1360      $self->log->warning("show_nodes: nothing returned, no nodes?");
1361   }
1362
1363   return \@lines;
1364}
1365
1366#
1367# Search::Elasticsearch::Client::2_0::Direct::Cat
1368#
1369sub show_health {
1370   my $self = shift;
1371
1372   my $es = $self->_es;
1373   $self->brik_help_run_undef_arg('open', $es) or return;
1374
1375   my $r;
1376   eval {
1377      $r = $es->cat->health;
1378   };
1379   if ($@) {
1380      chomp($@);
1381      return $self->log->error("show_health: failed: [$@]");
1382   }
1383
1384   my @lines = split(/\n/, $r);
1385
1386   if (@lines == 0) {
1387      $self->log->warning("show_health: nothing returned, no recovery?");
1388   }
1389
1390   return \@lines;
1391}
1392
1393#
1394# Search::Elasticsearch::Client::2_0::Direct::Cat
1395#
1396sub show_recovery {
1397   my $self = shift;
1398
1399   my $es = $self->_es;
1400   $self->brik_help_run_undef_arg('open', $es) or return;
1401
1402   my $r;
1403   eval {
1404      $r = $es->cat->recovery;
1405   };
1406   if ($@) {
1407      chomp($@);
1408      return $self->log->error("show_recovery: failed: [$@]");
1409   }
1410
1411   my @lines = split(/\n/, $r);
1412
1413   if (@lines == 0) {
1414      $self->log->warning("show_recovery: nothing returned, no index?");
1415   }
1416
1417   return \@lines;
1418}
1419
1420#
1421# curl -s 'localhost:9200/_cat/allocation?v'
1422#
1423sub show_allocation {
1424   my $self = shift;
1425
1426   my $es = $self->_es;
1427   $self->brik_help_run_undef_arg('open', $es) or return;
1428
1429   my $r;
1430   eval {
1431      $r = $es->cat->allocation;
1432   };
1433   if ($@) {
1434      chomp($@);
1435      return $self->log->error("show_allocation: failed: [$@]");
1436   }
1437
1438   my @lines = split(/\n/, $r);
1439
1440   if (@lines == 0) {
1441      $self->log->warning("show_allocation: nothing returned, no index?");
1442   }
1443
1444   return \@lines;
1445}
1446
1447sub list_indices {
1448   my $self = shift;
1449   my ($regex) = @_;
1450
1451   my $get = $self->get_indices or return;
1452
1453   my @indices = ();
1454   for (@$get) {
1455      if (defined($regex)) {
1456         if ($_->{index} =~ m{$regex}) {
1457            push @indices, $_->{index};
1458         }
1459      }
1460      else {
1461         push @indices, $_->{index};
1462      }
1463   }
1464
1465   return [ sort { $a cmp $b } @indices ];
1466}
1467
1468sub get_indices {
1469   my $self = shift;
1470   my ($string) = @_;
1471
1472   my $lines = $self->show_indices($string) or return;
1473   if (@$lines == 0) {
1474      $self->log->warning("get_indices: no index found");
1475      return [];
1476   }
1477
1478   #
1479   # Format depends on ElasticSearch version. We try to detect the format.
1480   #
1481   # 5.0.0:
1482   # "yellow open www-2016-08-14 BmNE9RaBRSCKqB5Oe8yZcw 5 1  146 0 251.8kb 251.8kb"
1483   #
1484   my @indices = ();
1485   for (@$lines) {
1486      my @t = split(/\s+/);
1487      if (@t == 10) {  # Version 5.0.0
1488         my $color = $t[0];
1489         my $state = $t[1];
1490         my $index = $t[2];
1491         my $id = $t[3];
1492         my $shards = $t[4];
1493         my $replicas = $t[5];
1494         my $count = $t[6];
1495         my $count2 = $t[7];
1496         my $total_size = $t[8];
1497         my $size = $t[9];
1498         push @indices, {
1499            color => $color,
1500            state => $state,
1501            index => $index,
1502            id => $id,
1503            shards => $shards,
1504            replicas => $replicas,
1505            count => $count,
1506            total_size => $total_size,
1507            size => $size,
1508         };
1509      }
1510      elsif (@t == 9) {
1511         my $index = $t[2];
1512         push @indices, {
1513            index => $index,
1514         };
1515      }
1516      elsif (@t == 8) {
1517         my $index = $t[1];
1518         push @indices, {
1519            index => $index,
1520         };
1521      }
1522   }
1523
1524   return \@indices;
1525}
1526
1527#
1528# Search::Elasticsearch::Client::5_0::Direct::Indices
1529#
1530sub get_index {
1531   my $self = shift;
1532   my ($index) = @_;
1533 
1534   my $es = $self->_es;
1535   $self->brik_help_run_undef_arg('open', $es) or return;
1536   $self->brik_help_run_undef_arg('get_index', $index) or return;
1537   $self->brik_help_run_invalid_arg('get_index', $index, 'ARRAY', 'SCALAR') or return;
1538
1539   my %args = (
1540      index => $index,
1541   );
1542
1543   my $r;
1544   eval {
1545      $r = $es->indices->get(%args);
1546   };
1547   if ($@) {
1548      chomp($@);
1549      return $self->log->error("get_index: get failed for index [$index]: [$@]");
1550   }
1551
1552   return $r;
1553}
1554
1555sub get_index_stats {
1556   my $self = shift;
1557   my ($index) = @_;
1558
1559   my $es = $self->_es;
1560   $self->brik_help_run_undef_arg('open', $es) or return;
1561   $self->brik_help_run_undef_arg('get_index', $index) or return;
1562
1563   my %args = (
1564      index => $index,
1565   );
1566
1567   my $r;
1568   eval {
1569      $r = $es->indices->stats(%args);
1570   };
1571   if ($@) {
1572      chomp($@);
1573      return $self->log->error("get_index_stats: get failed for index [$index]: ".
1574         "[$@]");
1575   }
1576
1577   return $r->{indices}{$index};
1578}
1579
1580sub list_index_types {
1581   my $self = shift;
1582   my ($index) = @_;
1583
1584   my $es = $self->_es;
1585   $self->brik_help_run_undef_arg('open', $es) or return;
1586   $self->brik_help_run_undef_arg('list_index_types', $index) or return;
1587   $self->brik_help_run_invalid_arg('list_index_types', $index, 'SCALAR') or return;
1588
1589   my $r = $self->get_mappings($index) or return;
1590   if (keys %$r > 1) {
1591      return $self->log->error("list_index_types: multiple indices found, choose one");
1592   }
1593
1594   my @types = ();
1595   for my $this_index (keys %$r) {
1596      my $mappings = $r->{$this_index}{mappings};
1597      push @types, keys %$mappings;
1598   }
1599
1600   my %uniq = map { $_ => 1 } @types;
1601
1602   return [ sort { $a cmp $b } keys %uniq ];
1603}
1604
1605#
1606# By default, if you provide only one index and no type,
1607# all types will be merged (including _default_)
1608# If you specify one type (other than _default_), _default_ will be merged to it.
1609#
1610sub list_index_fields {
1611   my $self = shift;
1612   my ($index, $type) = @_;
1613
1614   my $es = $self->_es;
1615   $self->brik_help_run_undef_arg('open', $es) or return;
1616   $self->brik_help_run_undef_arg('list_index_fields', $index) or return;
1617   $self->brik_help_run_invalid_arg('list_index_fields', $index, 'SCALAR') or return;
1618
1619   my $r;
1620   if (defined($type)) {
1621      $r = $self->get_mappings($index, $type) or return;
1622      if (keys %$r > 1) {
1623         return $self->log->error("list_index_fields: multiple indices found, ".
1624            "choose one");
1625      }
1626      # _default_ mapping may not exists.
1627      if ($self->is_mapping_exists($index, '_default_')) {
1628         my $r2 = $self->get_mappings($index, '_default_');
1629         # Merge
1630         for my $this_index (keys %$r2) {
1631            my $default = $r2->{$this_index}{mappings}{'_default_'};
1632            $r->{$this_index}{mappings}{_default_} = $default;
1633         }
1634      }
1635   }
1636   else {
1637      $r = $self->get_mappings($index) or return;
1638      if (keys %$r > 1) {
1639         return $self->log->error("list_index_fields: multiple indices found, ".
1640            "choose one");
1641      }
1642   }
1643
1644   my @fields = ();
1645   for my $this_index (keys %$r) {
1646      my $mappings = $r->{$this_index}{mappings};
1647      for my $this_type (keys %$mappings) {
1648         my $properties = $mappings->{$this_type}{properties};
1649         push @fields, keys %$properties;
1650      }
1651   }
1652
1653   my %uniq = map { $_ => 1 } @fields;
1654
1655   return [ sort { $a cmp $b } keys %uniq ];
1656}
1657
1658sub list_indices_version {
1659   my $self = shift;
1660   my ($index) = @_;
1661
1662   my $es = $self->_es;
1663   $self->brik_help_run_undef_arg('open', $es) or return;
1664   $self->brik_help_run_undef_arg('list_indices_version', $index) or return;
1665   $self->brik_help_run_invalid_arg('list_indices_version', $index, 'ARRAY', 'SCALAR')
1666      or return;
1667
1668   my $r = $self->get_index($index) or return;
1669
1670   my @list = ();
1671   for my $this (keys %$r) {
1672      my $name = $this;
1673      my $version = $r->{$this}{settings}{index}{version}{created};
1674      push @list, {
1675         index => $name,
1676         version => $version,
1677      };
1678   }
1679
1680   return \@list;
1681}
1682
1683sub open_index {
1684   my $self = shift;
1685   my ($index) = @_;
1686
1687   my $es = $self->_es;
1688   $self->brik_help_run_undef_arg('open', $es) or return;
1689   $self->brik_help_run_undef_arg('open_index', $index) or return;
1690   $self->brik_help_run_invalid_arg('open_index', $index, 'ARRAY', 'SCALAR') or return;
1691
1692   my $r;
1693   eval {
1694      $r = $es->indices->open(
1695         index => $index,
1696      );
1697   };
1698   if ($@) {
1699      chomp($@);
1700      return $self->log->error("open_index: failed: [$@]");
1701   }
1702
1703   return $r;
1704}
1705
1706sub close_index {
1707   my $self = shift;
1708   my ($index) = @_;
1709
1710   my $es = $self->_es;
1711   $self->brik_help_run_undef_arg('open', $es) or return;
1712   $self->brik_help_run_undef_arg('close_index', $index) or return;
1713   $self->brik_help_run_invalid_arg('close_index', $index, 'ARRAY', 'SCALAR') or return;
1714
1715   my $r;
1716   eval {
1717      $r = $es->indices->close(
1718         index => $index,
1719      );
1720   };
1721   if ($@) {
1722      chomp($@);
1723      return $self->log->error("close_index: failed: [$@]");
1724   }
1725
1726   return $r;
1727}
1728
1729#
1730# Search::Elasticsearch::Client::5_0::Direct::Indices
1731#
1732sub get_aliases {
1733   my $self = shift;
1734   my ($index) = @_;
1735
1736   $index ||= $self->index;
1737   my $es = $self->_es;
1738   $self->brik_help_run_undef_arg('open', $es) or return;
1739
1740   my %args = (
1741      index => $index,
1742   );
1743
1744   my $r;
1745   eval {
1746      $r = $es->indices->get(%args);
1747   };
1748   if ($@) {
1749      chomp($@);
1750      return $self->log->error("get_aliases: get_aliases failed: [$@]");
1751   }
1752
1753   my %aliases = ();
1754   for my $this (keys %$r) {
1755      $aliases{$this} = $r->{$this}{aliases};
1756   }
1757
1758   return \%aliases;
1759}
1760
1761#
1762# Search::Elasticsearch::Client::5_0::Direct::Indices
1763#
1764sub put_alias {
1765   my $self = shift;
1766   my ($index, $alias) = @_;
1767
1768   my $es = $self->_es;
1769   $self->brik_help_run_undef_arg('open', $es) or return;
1770   $self->brik_help_run_undef_arg('put_alias', $index) or return;
1771   $self->brik_help_run_undef_arg('put_alias', $alias) or return;
1772
1773   my %args = (
1774      index => $index,
1775      name => $alias,
1776   );
1777
1778   my $r;
1779   eval {
1780      $r = $es->indices->put_alias(%args);
1781   };
1782   if ($@) {
1783      chomp($@);
1784      return $self->log->error("put_alias: put_alias failed: [$@]");
1785   }
1786
1787   return $r;
1788}
1789
1790#
1791# Search::Elasticsearch::Client::5_0::Direct::Indices
1792#
1793sub delete_alias {
1794   my $self = shift;
1795   my ($index, $alias) = @_;
1796
1797   my $es = $self->_es;
1798   $self->brik_help_run_undef_arg('open', $es) or return;
1799   $self->brik_help_run_undef_arg('delete_alias', $index) or return;
1800   $self->brik_help_run_undef_arg('delete_alias', $alias) or return;
1801
1802   my %args = (
1803      index => $index,
1804      name => $alias,
1805   );
1806
1807   my $r;
1808   eval {
1809      $r = $es->indices->delete_alias(%args);
1810   };
1811   if ($@) {
1812      chomp($@);
1813      return $self->log->error("delete_alias: delete_alias failed: [$@]");
1814   }
1815
1816   return $r;
1817}
1818
1819sub update_alias {
1820   my $self = shift;
1821   my ($new_index, $alias) = @_;
1822
1823   my $es = $self->_es;
1824   $self->brik_help_run_undef_arg('open', $es) or return;
1825   $self->brik_help_run_undef_arg('update_alias', $new_index) or return;
1826   $self->brik_help_run_undef_arg('update_alias', $alias) or return;
1827
1828   # Search for previous index with that alias, if any.
1829   my $prev_index;
1830   my $aliases = $self->get_aliases or return;
1831   while (my ($k, $v) = each %$aliases) {
1832      for my $this (keys %$v) {
1833         if ($this eq $alias) {
1834            $prev_index = $k;
1835            last;
1836         }
1837      }
1838      last if $prev_index;
1839   }
1840
1841   # Delete previous alias if it exists.
1842   if (defined($prev_index)) {
1843      $self->delete_alias($prev_index, $alias) or return;
1844   }
1845
1846   return $self->put_alias($new_index, $alias);
1847}
1848
1849sub is_mapping_exists {
1850   my $self = shift;
1851   my ($index, $mapping) = @_;
1852
1853   $self->brik_help_run_undef_arg('is_mapping_exists', $index) or return;
1854   $self->brik_help_run_undef_arg('is_mapping_exists', $mapping) or return;
1855
1856   if (! $self->is_index_exists($index)) {
1857      return 0;
1858   }
1859
1860   my $all = $self->get_mappings($index) or return;
1861   for my $this_index (keys %$all) {
1862      my $mappings = $all->{$this_index}{mappings};
1863      for my $this_mapping (keys %$mappings) {
1864         if ($this_mapping eq $mapping) {
1865            return 1;
1866         }
1867      }
1868   }
1869
1870   return 0;
1871}
1872
1873#
1874# Search::Elasticsearch::Client::2_0::Direct::Indices
1875#
1876sub get_mappings {
1877   my $self = shift;
1878   my ($index, $type) = @_;
1879
1880   my $es = $self->_es;
1881   $self->brik_help_run_undef_arg('open', $es) or return;
1882   $self->brik_help_run_undef_arg('get_mappings', $index) or return;
1883   $self->brik_help_run_invalid_arg('get_mappings', $index, 'ARRAY', 'SCALAR') or return;
1884
1885   my %args = (
1886      index => $index,
1887      type => $type,
1888   );
1889
1890   my $r;
1891   eval {
1892      $r = $es->indices->get_mapping(%args);
1893   };
1894   if ($@) {
1895      chomp($@);
1896      return $self->log->error("get_mappings: get_mapping failed for index [$index]: ".
1897         "[$@]");
1898   }
1899
1900   return $r;
1901}
1902
1903#
1904# Search::Elasticsearch::Client::2_0::Direct::Indices
1905#
1906sub create_index {
1907   my $self = shift;
1908   my ($index, $shards) = @_;
1909
1910   my $es = $self->_es;
1911   $self->brik_help_run_undef_arg('open', $es) or return;
1912   $self->brik_help_run_undef_arg('create_index', $index) or return;
1913
1914   my %args = (
1915      index => $index,
1916   );
1917
1918   if (defined($shards)) {
1919      $args{body}{settings}{index}{number_of_shards} = $shards;
1920   }
1921
1922   my $r;
1923   eval {
1924      $r = $es->indices->create(%args);
1925   };
1926   if ($@) {
1927      chomp($@);
1928      return $self->log->error("create_index: create failed ".
1929         "for index [$index]: [$@]");
1930   }
1931   
1932   return $r;
1933}
1934
1935#
1936# https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-put-mapping.html
1937#
1938sub create_index_with_mappings {
1939   my $self = shift;
1940   my ($index, $mappings) = @_;
1941
1942   my $es = $self->_es;
1943   $self->brik_help_run_undef_arg('open', $es) or return;
1944   $self->brik_help_run_undef_arg('create_index_with_mappings', $index) or return;
1945   $self->brik_help_run_undef_arg('create_index_with_mappings', $mappings) or return;
1946   $self->brik_help_run_invalid_arg('create_index_with_mappings', $mappings, 'HASH')
1947      or return;
1948
1949   my $r;
1950   eval {
1951      $r = $es->indices->create(
1952         index => $index,
1953         body => {
1954            mappings => $mappings,
1955         },
1956      );
1957   };
1958   if ($@) {
1959      chomp($@);
1960      return $self->log->error("create_index_with_mappings: create failed for ".
1961         "index [$index]: [$@]");
1962   }
1963
1964   return $r;
1965}
1966
1967# GET http://localhost:9200/
1968sub info {
1969   my $self = shift;
1970   my ($nodes) = @_;
1971
1972   $nodes ||= $self->nodes;
1973   $self->brik_help_run_undef_arg('info', $nodes) or return;
1974   $self->brik_help_run_invalid_arg('info', $nodes, 'ARRAY') or return;
1975   $self->brik_help_run_empty_array_arg('info', $nodes) or return;
1976
1977   my $first = $nodes->[0];
1978
1979   $self->get($first) or return;
1980
1981   return $self->content;
1982}
1983
1984sub version {
1985   my $self = shift;
1986   my ($nodes) = @_;
1987
1988   $nodes ||= $self->nodes;
1989   $self->brik_help_run_undef_arg('version', $nodes) or return;
1990   $self->brik_help_run_invalid_arg('version', $nodes, 'ARRAY') or return;
1991   $self->brik_help_run_empty_array_arg('version', $nodes) or return;
1992
1993   my $first = $nodes->[0];
1994
1995   $self->get($first) or return;
1996   my $content = $self->content or return;
1997
1998   return $content->{version}{number};
1999}
2000
2001#
2002# Search::Elasticsearch::Client::2_0::Direct::Indices
2003#
2004sub get_templates {
2005   my $self = shift;
2006
2007   my $es = $self->_es;
2008   $self->brik_help_run_undef_arg('open', $es) or return;
2009
2010   my $r;
2011   eval {
2012      $r = $es->indices->get_template;
2013   };
2014   if ($@) {
2015      chomp($@);
2016      return $self->log->error("get_templates: failed: [$@]");
2017   }
2018
2019   return $r;
2020}
2021
2022sub list_templates {
2023   my $self = shift;
2024
2025   my $content = $self->get_templates or return;
2026
2027   return [ sort { $a cmp $b } keys %$content ];
2028}
2029
2030#
2031# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
2032#
2033sub get_template {
2034   my $self = shift;
2035   my ($template) = @_;
2036
2037   my $es = $self->_es;
2038   $self->brik_help_run_undef_arg('open', $es) or return;
2039   $self->brik_help_run_undef_arg('get_template', $template) or return;
2040
2041   my $r;
2042   eval {
2043      $r = $es->indices->get_template(
2044         name => $template,
2045      );
2046   };
2047   if ($@) {
2048      chomp($@);
2049      return $self->log->error("get_template: template failed for name [$template]: [$@]");
2050   }
2051
2052   return $r;
2053}
2054
2055sub put_mapping {
2056   my $self = shift;
2057   my ($index, $type, $mapping) = @_;
2058
2059   my $es = $self->_es;
2060   $self->brik_help_run_undef_arg('open', $es) or return;
2061   $self->brik_help_run_undef_arg('put_mapping', $index) or return;
2062   $self->brik_help_run_undef_arg('put_mapping', $type) or return;
2063   $self->brik_help_run_undef_arg('put_mapping', $mapping) or return;
2064   $self->brik_help_run_invalid_arg('put_mapping', $mapping, 'HASH')
2065      or return;
2066
2067   my $r;
2068   eval {
2069      $r = $es->indices->put_mapping(
2070         index => $index,
2071         type => $type,
2072         body => $mapping,
2073      );
2074   };
2075   if ($@) {
2076      chomp($@);
2077      return $self->log->error("put_mapping: mapping failed ".
2078         "for index [$index]: [$@]");
2079   }
2080
2081   return $r;
2082}
2083
2084#
2085# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
2086#
2087sub put_template {
2088   my $self = shift;
2089   my ($name, $template) = @_;
2090
2091   my $es = $self->_es;
2092   $self->brik_help_run_undef_arg('open', $es) or return;
2093   $self->brik_help_run_undef_arg('put_template', $name) or return;
2094   $self->brik_help_run_undef_arg('put_template', $template) or return;
2095   $self->brik_help_run_invalid_arg('put_template', $template, 'HASH')
2096      or return;
2097
2098   my $r;
2099   eval {
2100      $r = $es->indices->put_template(
2101         name => $name,
2102         body => $template,
2103      );
2104   };
2105   if ($@) {
2106      chomp($@);
2107      return $self->log->error("put_template: template failed ".
2108         "for name [$name]: [$@]");
2109   }
2110
2111   return $r;
2112}
2113
2114sub put_mapping_from_json_file {
2115   my $self = shift;
2116   my ($index, $type, $json_file) = @_;
2117
2118   my $es = $self->_es;
2119   $self->brik_help_run_undef_arg('open', $es) or return;
2120   $self->brik_help_run_undef_arg('put_mapping_from_json_file', $index)
2121      or return;
2122   $self->brik_help_run_undef_arg('put_mapping_from_json_file', $type)
2123      or return;
2124   $self->brik_help_run_undef_arg('put_mapping_from_json_file', $json_file)
2125      or return;
2126   $self->brik_help_run_file_not_found('put_mapping_from_json_file',
2127      $json_file) or return;
2128
2129   my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
2130   my $data = $fj->read($json_file) or return;
2131
2132   if (! exists($data->{mappings})) {
2133      return $self->log->error("put_mapping_from_json_file: no mapping ".
2134         "data found");
2135   }
2136
2137   return $self->put_mapping($index, $type, $data->{mappings});
2138}
2139
2140sub update_mapping_from_json_file {
2141   my $self = shift;
2142   my ($json_file, $index, $type) = @_;
2143
2144   my $es = $self->_es;
2145   $self->brik_help_run_undef_arg('open', $es) or return;
2146   $self->brik_help_run_undef_arg('update_mapping_from_json_file',
2147      $json_file) or return;
2148   $self->brik_help_run_file_not_found('update_mapping_from_json_file',
2149      $json_file) or return;
2150   $self->brik_help_run_undef_arg('update_mapping_from_json_file',
2151      $type) or return;
2152   $self->brik_help_run_undef_arg('update_mapping_from_json_file',
2153      $index) or return;
2154
2155   my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
2156   my $data = $fj->read($json_file) or return;
2157
2158   if (! exists($data->{mappings})) {
2159      return $self->log->error("update_mapping_from_json_file: ".
2160         "no data found");
2161   }
2162
2163   my $mappings = $data->{mappings};
2164
2165   return $self->put_mapping($index, $type, $mappings);
2166}
2167
2168sub put_template_from_json_file {
2169   my $self = shift;
2170   my ($json_file) = @_;
2171
2172   my $es = $self->_es;
2173   $self->brik_help_run_undef_arg('open', $es) or return;
2174   $self->brik_help_run_undef_arg('put_template_from_json_file', $json_file) or return;
2175   $self->brik_help_run_file_not_found('put_template_from_json_file', $json_file)
2176      or return;
2177
2178   my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
2179   my $data = $fj->read($json_file) or return;
2180
2181   if (! exists($data->{template}) && ! exists($data->{index_patterns})) {
2182      return $self->log->error("put_template_from_json_file: no template name found");
2183   }
2184
2185   my $name = $data->{template} || $data->{index_patterns};
2186
2187   return $self->put_template($name, $data);
2188}
2189
2190sub update_template_from_json_file {
2191   my $self = shift;
2192   my ($json_file) = @_;
2193
2194   my $es = $self->_es;
2195   $self->brik_help_run_undef_arg('open', $es) or return;
2196   $self->brik_help_run_undef_arg('update_template_from_json_file',
2197      $json_file) or return;
2198   $self->brik_help_run_file_not_found('update_template_from_json_file',
2199      $json_file) or return;
2200
2201   my $fj = Metabrik::File::Json->new_from_brik_init($self) or return;
2202   my $data = $fj->read($json_file) or return;
2203
2204   if (! exists($data->{template}) && ! exists($data->{index_patterns})) {
2205      return $self->log->error("put_template_from_json_file: ".
2206         "no template name found");
2207   }
2208
2209   my $name = $data->{template} || $data->{index_patterns};
2210
2211   # We ignore errors, template may not exist.
2212   $self->delete_template($name);
2213
2214   return $self->put_template($name, $data);
2215}
2216
2217#
2218# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-settings.html
2219# Search::Elasticsearch::Client::2_0::Direct::Indices
2220#
2221sub get_settings {
2222   my $self = shift;
2223   my ($indices, $names) = @_;
2224
2225   my $es = $self->_es;
2226   $self->brik_help_run_undef_arg('open', $es) or return;
2227
2228   my %args = ();
2229   if (defined($indices)) {
2230      $self->brik_help_run_undef_arg('get_settings', $indices) or return;
2231      my $ref = $self->brik_help_run_invalid_arg('get_settings', $indices, 'ARRAY', 'SCALAR')
2232         or return;
2233      $args{index} = $indices;
2234   }
2235   if (defined($names)) {
2236      $self->brik_help_run_file_not_found('get_settings', $names) or return;
2237      my $ref = $self->brik_help_run_invalid_arg('get_settings', $names, 'ARRAY', 'SCALAR')
2238         or return;
2239      $args{name} = $names;
2240   }
2241
2242   my $r;
2243   eval {
2244      $r = $es->indices->get_settings(%args);
2245   };
2246   if ($@) {
2247      chomp($@);
2248      return $self->log->error("get_settings: failed: [$@]");
2249   }
2250
2251   return $r;
2252}
2253
2254#
2255# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-settings.html
2256# Search::Elasticsearch::Client::2_0::Direct::Indices
2257#
2258# Example:
2259#
2260# run client::elasticsearch put_settings "{ index => { refresh_interval => -1 } }"
2261#
2262# XXX: should be renamed to put_index_settings
2263#
2264sub put_settings {
2265   my $self = shift;
2266   my ($settings, $indices) = @_;
2267
2268   my $es = $self->_es;
2269   $self->brik_help_run_undef_arg('open', $es) or return;
2270   $self->brik_help_run_undef_arg('put_settings', $settings) or return;
2271   $self->brik_help_run_invalid_arg('put_settings', $settings, 'HASH') or return;
2272
2273   my %args = (
2274      body => $settings,
2275   );
2276   if (defined($indices)) {
2277      $self->brik_help_run_undef_arg('put_settings', $indices) or return;
2278      my $ref = $self->brik_help_run_invalid_arg('put_settings', $indices, 'ARRAY', 'SCALAR')
2279         or return;
2280      $args{index} = $indices;
2281   }
2282
2283   my $r;
2284   eval {
2285      $r = $es->indices->put_settings(%args);
2286   };
2287   if ($@) {
2288      chomp($@);
2289      return $self->log->error("put_settings: failed: [$@]");
2290   }
2291
2292   return $r;
2293}
2294
2295sub set_index_readonly {
2296   my $self = shift;
2297   my ($indices, $bool) = @_;
2298
2299   my $es = $self->_es;
2300   $self->brik_help_run_undef_arg('open', $es) or return;
2301   $self->brik_help_run_undef_arg('set_index_readonly', $indices) or return;
2302   $self->brik_help_run_invalid_arg('set_index_readonly', $indices, 'ARRAY', 'SCALAR')
2303      or return;
2304
2305   if (! defined($bool)) {
2306      $bool = 'true';
2307   }
2308   else {
2309      $bool = $bool ? 'true' : 'false';
2310   }
2311
2312   my $settings = {
2313      'blocks.read_only' => $bool,
2314      'blocks.read_only_allow_delete' => 'true',
2315   };
2316
2317   return $self->put_settings($settings, $indices);
2318}
2319
2320#
2321# curl -XPUT -H "Content-Type: application/json" http://localhost:9200/_all/_settings -d '{"index.blocks.read_only_allow_delete": null}'
2322# PUT synscan-2018-05/_settings
2323# {
2324#  "index": {
2325#    "blocks":{
2326#      "read_only":"false",
2327#      "read_only_allow_delete":"true"
2328#    }
2329#  }
2330#}
2331#
2332#
2333# If it fails with the following error:
2334#
2335# [2018-09-12T13:38:40,012][INFO ][logstash.outputs.elasticsearch] retrying failed action with response code: 403 ({"type"=>"cluster_block_exception", "reason"=>"blocked by: [FORBIDDEN/12/index read-only / allow delete (api)];"})
2336#
2337# Use Kibana dev console and copy/paste both requests:
2338#
2339# PUT _all/_settings
2340# {
2341#    "index": {
2342#       "blocks": {
2343#          "read_only_allow_delete": "false"
2344#       }
2345#    }
2346# }
2347#
2348sub reset_index_readonly {
2349   my $self = shift;
2350   my ($indices) = @_;
2351
2352   $indices ||= '*';
2353   my $es = $self->_es;
2354   $self->brik_help_run_undef_arg('open', $es) or return;
2355   $self->brik_help_run_invalid_arg('reset_index_readonly', $indices,
2356      'ARRAY', 'SCALAR') or return;
2357
2358   my $settings = {
2359      blocks => {
2360         read_only_allow_delete => 'false',
2361      },
2362   };
2363
2364   # Settings on '*' indices should be enough to reset for everyone.
2365   my $r = $self->put_settings($settings, $indices);
2366   #$self->log->info(Data::Dumper::Dumper($r));
2367
2368   return 1;
2369}
2370
2371sub list_index_readonly {
2372   my $self = shift;
2373
2374   my $es = $self->_es;
2375   $self->brik_help_run_undef_arg('open', $es) or return;
2376
2377   my $list = $self->list_indices or return;
2378
2379   my @indices = ();
2380   for my $this (@$list) {
2381      my $ro = $self->get_index_readonly($this) or next;
2382      if (defined($ro->{index}{provided_name})) {
2383         push @indices, $ro->{index}{provided_name};
2384      }
2385   }
2386
2387   return \@indices;
2388}
2389
2390sub set_index_number_of_replicas {
2391   my $self = shift;
2392   my ($indices, $number) = @_;
2393
2394   my $es = $self->_es;
2395   $self->brik_help_run_undef_arg('open', $es) or return;
2396   $self->brik_help_run_undef_arg('set_index_number_of_replicas', $indices) or return;
2397   $self->brik_help_run_invalid_arg('set_index_number_of_replicas', $indices, 'ARRAY', 'SCALAR')
2398      or return;
2399
2400   my $settings = { number_of_replicas => $number };
2401
2402   return $self->put_settings($settings, $indices);
2403}
2404
2405sub set_index_refresh_interval {
2406   my $self = shift;
2407   my ($indices, $number) = @_;
2408
2409   my $es = $self->_es;
2410   $self->brik_help_run_undef_arg('open', $es) or return;
2411   $self->brik_help_run_undef_arg('set_index_refresh_interval', $indices) or return;
2412   $self->brik_help_run_invalid_arg('set_index_refresh_interval', $indices, 'ARRAY', 'SCALAR')
2413      or return;
2414
2415   # If there is a meaningful value not postfixed with a unity,
2416   # we default to add a `s' for a number of seconds.
2417   if ($number =~ /^\d+$/ && $number > 0) {
2418      $number .= 's';
2419   }
2420
2421   my $settings = { refresh_interval => $number };
2422
2423   return $self->put_settings($settings, $indices);
2424}
2425
2426sub get_index_settings {
2427   my $self = shift;
2428   my ($indices) = @_;
2429
2430   my $es = $self->_es;
2431   $self->brik_help_run_undef_arg('open', $es) or return;
2432   $self->brik_help_run_undef_arg('get_index_settings', $indices) or return;
2433   $self->brik_help_run_invalid_arg('get_index_settings', $indices, 'ARRAY', 'SCALAR')
2434      or return;
2435
2436   my $settings = $self->get_settings($indices);
2437
2438   my %indices = ();
2439   for (keys %$settings) {
2440      $indices{$_} = $settings->{$_}{settings};
2441   }
2442
2443   return \%indices;
2444}
2445
2446sub get_index_readonly {
2447   my $self = shift;
2448   my ($indices) = @_;
2449
2450   my $es = $self->_es;
2451   $self->brik_help_run_undef_arg('open', $es) or return;
2452   $self->brik_help_run_undef_arg('get_index_readonly', $indices) or return;
2453   $self->brik_help_run_invalid_arg('get_index_readonly', $indices, 'ARRAY', 'SCALAR')
2454      or return;
2455
2456   my $settings = $self->get_settings($indices);
2457
2458   my %indices = ();
2459   for (keys %$settings) {
2460      #$indices{$_} = $settings->{$_}{settings}{index}{'blocks_write'};
2461      $indices{$_} = $settings->{$_}{settings};
2462   }
2463
2464   return \%indices;
2465}
2466
2467sub get_index_number_of_replicas {
2468   my $self = shift;
2469   my ($indices) = @_;
2470
2471   my $es = $self->_es;
2472   $self->brik_help_run_undef_arg('open', $es) or return;
2473   $self->brik_help_run_undef_arg('get_index_number_of_replicas', $indices) or return;
2474   $self->brik_help_run_invalid_arg('get_index_number_of_replicas', $indices, 'ARRAY', 'SCALAR')
2475      or return;
2476
2477   my $settings = $self->get_settings($indices);
2478
2479   my %indices = ();
2480   for (keys %$settings) {
2481      $indices{$_} = $settings->{$_}{settings}{index}{number_of_replicas};
2482   }
2483
2484   return \%indices;
2485}
2486
2487sub get_index_refresh_interval {
2488   my $self = shift;
2489   my ($indices, $number) = @_;
2490
2491   my $es = $self->_es;
2492   $self->brik_help_run_undef_arg('open', $es) or return;
2493   $self->brik_help_run_undef_arg('get_index_refresh_interval', $indices) or return;
2494   $self->brik_help_run_invalid_arg('get_index_refresh_interval', $indices, 'ARRAY', 'SCALAR')
2495      or return;
2496
2497   my $settings = $self->get_settings($indices);
2498
2499   my %indices = ();
2500   for (keys %$settings) {
2501      $indices{$_} = $settings->{$_}{settings}{index}{refresh_interval};
2502   }
2503
2504   return \%indices;
2505}
2506
2507sub get_index_number_of_shards {
2508   my $self = shift;
2509   my ($indices, $number) = @_;
2510
2511   my $es = $self->_es;
2512   $self->brik_help_run_undef_arg('open', $es) or return;
2513   $self->brik_help_run_undef_arg('get_index_number_of_shards', $indices) or return;
2514   $self->brik_help_run_invalid_arg('get_index_number_of_shards', $indices, 'ARRAY', 'SCALAR')
2515      or return;
2516
2517   my $settings = $self->get_settings($indices);
2518
2519   my %indices = ();
2520   for (keys %$settings) {
2521      $indices{$_} = $settings->{$_}{settings}{index}{number_of_shards};
2522   }
2523
2524   return \%indices;
2525}
2526
2527#
2528# http://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html
2529#
2530sub delete_template {
2531   my $self = shift;
2532   my ($name) = @_;
2533
2534   my $es = $self->_es;
2535   $self->brik_help_run_undef_arg('open', $es) or return;
2536   $self->brik_help_run_undef_arg('delete_template', $name) or return;
2537
2538   my $r;
2539   eval {
2540      $r = $es->indices->delete_template(
2541         name => $name,
2542      );
2543   };
2544   if ($@) {
2545      chomp($@);
2546      return $self->log->error("delete_template: failed for name [$name]: [$@]");
2547   }
2548
2549   return $r;
2550}
2551
2552#
2553# Return a boolean to state for index existence
2554#
2555sub is_index_exists {
2556   my $self = shift;
2557   my ($index) = @_;
2558
2559   my $es = $self->_es;
2560   $self->brik_help_run_undef_arg('open', $es) or return;
2561   $self->brik_help_run_undef_arg('is_index_exists', $index) or return;
2562
2563   my $r;
2564   eval {
2565      $r = $es->indices->exists(
2566         index => $index,
2567      );
2568   };
2569   if ($@) {
2570      chomp($@);
2571      return $self->log->error("is_index_exists: failed for index [$index]: [$@]");
2572   }
2573
2574   return $r ? 1 : 0;
2575}
2576
2577#
2578# Return a boolean to state for index with type existence
2579#
2580sub is_type_exists {
2581   my $self = shift;
2582   my ($index, $type) = @_;
2583
2584   my $es = $self->_es;
2585   $self->brik_help_run_undef_arg('open', $es) or return;
2586   $self->brik_help_run_undef_arg('is_type_exists', $index) or return;
2587   $self->brik_help_run_undef_arg('is_type_exists', $type) or return;
2588
2589   my $r;
2590   eval {
2591      $r = $es->indices->exists_type(
2592         index => $index,
2593         type => $type,
2594      );
2595   };
2596   if ($@) {
2597      chomp($@);
2598      return $self->log->error("is_type_exists: failed for index [$index] and ".
2599         "type [$type]: [$@]");
2600   }
2601
2602   return $r ? 1 : 0;
2603}
2604
2605#
2606# Return a boolean to state for document existence
2607#
2608sub is_document_exists {
2609   my $self = shift;
2610   my ($index, $type, $document) = @_;
2611
2612   my $es = $self->_es;
2613   $self->brik_help_run_undef_arg('open', $es) or return;
2614   $self->brik_help_run_undef_arg('is_document_exists', $index) or return;
2615   $self->brik_help_run_undef_arg('is_document_exists', $type) or return;
2616   $self->brik_help_run_undef_arg('is_document_exists', $document) or return;
2617   $self->brik_help_run_invalid_arg('is_document_exists', $document, 'HASH') or return;
2618
2619   my $r;
2620   eval {
2621      $r = $es->exists(
2622         index => $index,
2623         type => $type,
2624         %$document,
2625      );
2626   };
2627   if ($@) {
2628      chomp($@);
2629      return $self->log->error("is_document_exists: failed for index [$index] and ".
2630         "type [$type]: [$@]");
2631   }
2632
2633   return $r ? 1 : 0;
2634}
2635
2636sub parse_error_string {
2637   my $self = shift;
2638   my ($string) = @_;
2639
2640   $self->brik_help_run_undef_arg('parse_error_string', $string) or return;
2641
2642   # [Timeout] ** [http://X.Y.Z.1:9200]-[599] Timed out while waiting for socket to become ready for reading, called from sub Search::Elasticsearch::Role::Client::Direct::__ANON__ at /usr/local/lib/perl5/site_perl/Metabrik/Client/Elasticsearch.pm line 1466. With vars: {'status_code' => 599,'request' => {'body' => undef,'qs' => {},'ignore' => [],'serialize' => 'std','path' => '/index-thing/_refresh','method' => 'POST'}}
2643
2644   my ($class, $node, $code, $message, $dump) = $string =~
2645      m{^\[([^]]+)\] \*\* \[([^]]+)\]\-\[(\d+)\] (.+)\. With vars: (.+)$};
2646
2647   if (defined($dump) && length($dump)) {
2648      my $sd = Metabrik::String::Dump->new_from_brik_init($self) or return;
2649      $dump = $sd->decode($dump);
2650   }
2651
2652   # Sanity check
2653   if (defined($node) && $node =~ m{^http} && $code =~ m{^\d+$}
2654   &&  defined($dump) && ref($dump) eq 'HASH') {
2655      return {
2656         class => $class,
2657         node => $node,
2658         code => $code,
2659         message => $message,
2660         dump => $dump,
2661      };
2662   }
2663
2664   # Were not able to decode, we return as-is.
2665   return {
2666      message => $string,
2667   };
2668}
2669
2670#
2671# Refresh an index to receive latest additions
2672#
2673# Search::Elasticsearch::Client::5_0::Direct::Indices
2674# https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-refresh.html
2675#
2676sub refresh_index {
2677   my $self = shift;
2678   my ($index) = @_;
2679
2680   my $es = $self->_es;
2681   $self->brik_help_run_undef_arg('open', $es) or return;
2682   $self->brik_help_run_undef_arg('refresh_index', $index) or return;
2683
2684   my $try = $self->try;
2685
2686RETRY:
2687
2688   my $r;
2689   eval {
2690      $r = $es->indices->refresh(
2691         index => $index,
2692      );
2693   };
2694   if ($@) {
2695      if (--$try == 0) {
2696         chomp($@);
2697         my $p = $self->parse_error_string($@);
2698         if (defined($p) && exists($p->{class})) {
2699            my $class = $p->{class};
2700            my $code = $p->{code};
2701            my $node = $p->{node};
2702            return $self->log->error("refresh_index: failed for index [$index] ".
2703               "after [$try] tries with error [$class] code [$code] for node [$node]");
2704         }
2705         else {
2706            return $self->log->error("refresh_index: failed for index [$index] ".
2707               "after [$try]: [$@]");
2708         }
2709      }
2710      sleep 60;
2711      goto RETRY;
2712   }
2713
2714   return $r;
2715}
2716
2717sub export_as {
2718   my $self = shift;
2719   my ($format, $index, $size, $cb) = @_;
2720
2721   $size ||= 10_000;
2722   my $es = $self->_es;
2723   $self->brik_help_run_undef_arg('open', $es) or return;
2724   $self->brik_help_run_undef_arg('export_as', $format) or return;
2725   $self->brik_help_run_undef_arg('export_as', $index) or return;
2726   $self->brik_help_run_undef_arg('export_as', $size) or return;
2727
2728   if ($format ne 'csv' && $format ne 'json') {
2729      return $self->log->error("export_as: unsupported export format ".
2730         "[$format]");
2731   }
2732
2733   my $max = $self->max;
2734   my $datadir = $self->datadir;
2735
2736   $self->log->debug("export_as: selecting scroll Command...");
2737
2738   my $scroll;
2739   my $version = $self->version or return;
2740   if ($version lt "5.0.0") {
2741      $scroll = $self->open_scroll_scan_mode($index, $size) or return;
2742   }
2743   else {
2744      $scroll = $self->open_scroll($index, $size) or return;
2745   }
2746
2747   $self->log->debug("export_as: selecting scroll Command...OK.");
2748
2749   my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;
2750
2751   my $out;
2752   my $csv_header;
2753   if ($format eq 'csv') {
2754      $out = Metabrik::File::Csv->new_from_brik_init($self) or return;
2755      $out->encoding($self->encoding);
2756      $out->separator(',');
2757      $out->escape('\\');
2758      $out->append(1);
2759      $out->first_line_is_header(0);
2760      $out->write_header(1);
2761      $out->use_quoting(1);
2762      if (defined($self->csv_header)) {
2763         my $sorted = [ sort { $a cmp $b } @{$self->csv_header} ];
2764         $out->header($sorted);
2765      }
2766      if (defined($self->csv_encoded_fields)) {
2767         $out->encoded_fields($self->csv_encoded_fields);
2768      }
2769      if (defined($self->csv_object_fields)) {
2770         $out->object_fields($self->csv_object_fields);
2771      }
2772
2773      $csv_header = $out->header;
2774   }
2775   elsif ($format eq 'json') {
2776      $out = Metabrik::File::Json->new_from_brik_init($self) or return;
2777      $out->encoding($self->encoding);
2778   }
2779
2780   my $total = $self->total_scroll;
2781   $self->log->info("export_as: total [$total] for index [$index]");
2782
2783   my %types = ();
2784   my $read = 0;
2785   my $skipped = 0;
2786   my $exported = 0;
2787   my $start = time();
2788   my $done = $datadir."/$index.exported";
2789   my $start_time = time();
2790   my %chunk = ();
2791   while (my $next = $self->next_scroll(10000)) {
2792      for my $this (@$next) {
2793         $read++;
2794
2795         if (defined($cb)) {
2796            $this = $cb->($this);
2797            if (! defined($this)) {
2798               $self->log->error("export_as: callback failed for index ".
2799                  "[$index] at read [$read], skipping single entry");
2800               $skipped++;
2801               next;
2802            }
2803         }
2804
2805         my $id = $this->{_id};
2806         my $doc = $this->{_source};
2807         # Prepare for when types will be removed from ES
2808         my $type = $this->{_type} || 'doc';
2809         if (! exists($types{$type})) {
2810            if ($format eq 'csv') {
2811               # If not given, we guess the CSV fields to use.
2812               if (! defined($csv_header)) {
2813                  my $fields = $self->list_index_fields($index, $type)
2814                     or return;
2815                  $types{$type}{header} = [ '_id', @$fields ];
2816               }
2817               else {
2818                  $types{$type}{header} = [ '_id', @$csv_header ];
2819               }
2820
2821               $types{$type}{output} = $datadir."/$index:$type.csv";
2822            }
2823            elsif ($format eq 'json') {
2824               $types{$type}{output} = $datadir."/$index:$type.json";
2825            }
2826
2827            # Verify it has not been exported yet
2828            if (-f $done) {
2829               return $self->log->error("export_as: export already done ".
2830                  "for index [$index]");
2831            }
2832
2833            $self->log->info("export_as: exporting to file [".
2834               $types{$type}{output}."] for type [$type], using ".
2835               "chunk size of [$size]");
2836         }
2837
2838         my $h = { _id => $id };
2839
2840         for my $k (keys %$doc) {
2841            $h->{$k} = $doc->{$k};
2842         }
2843
2844         if ($format eq 'csv') {
2845            $out->header($types{$type}{header});
2846         }
2847
2848         push @{$chunk{$type}}, $h;
2849         if (@{$chunk{$type}} > 999) {
2850            my $r = $out->write($chunk{$type}, $types{$type}{output});
2851            if (!defined($r)) {
2852               $self->log->warning("export_as: unable to process entry, ".
2853                  "skipping");
2854               $skipped++;
2855               next;
2856            }
2857            $chunk{$type} = [];
2858         }
2859
2860         # Log a status sometimes.
2861         if (! (++$exported % 100_000)) {
2862            my $now = time();
2863            my $perc = sprintf("%.02f", $exported / $total * 100);
2864            $self->log->info("export_as: fetched [$exported/$total] ".
2865               "($perc%) elements in ".($now - $start)." second(s) ".
2866               "from index [$index]");
2867            $start = time();
2868         }
2869
2870         # Limit export to specified maximum
2871         if ($max > 0 && $exported >= $max) {
2872            $self->log->info("export_as: max export reached [$exported] ".
2873               "for index [$index], stopping");
2874            last;
2875         }
2876      }
2877   }
2878
2879   # Process remaining data waiting to be written and build output file list
2880   my %files = ();
2881   for my $type (keys %types) {
2882      if (@{$chunk{$type}} > 0) {
2883         $out->write($chunk{$type}, $types{$type}{output});
2884         $files{$types{$type}{output}}++;
2885      }
2886   }
2887
2888   $self->close_scroll;
2889
2890   my $stop_time = time();
2891   my $duration = $stop_time - $start_time;
2892   my $eps = $exported;
2893   if ($duration > 0) {
2894      $eps = $exported / $duration;
2895   }
2896
2897   my $result = {
2898      read => $read,
2899      exported => $exported,
2900      skipped => $read - $exported,
2901      total_count => $total,
2902      complete => ($exported == $total) ? 1 : 0,
2903      duration => $duration,
2904      eps => $eps, 
2905      files => [ sort { $a cmp $b } keys %files ],
2906   };
2907
2908   # Say the file has been processed, and put resulting stats.
2909   $fd->write($result, $done) or return;
2910
2911   $self->log->info("export_as: done.");
2912
2913   return $result;
2914}
2915
2916sub export_as_csv {
2917   my $self = shift;
2918   my ($index, $size, $cb) = @_;
2919
2920   $size ||= 10_000;
2921   my $es = $self->_es;
2922   $self->brik_help_run_undef_arg('open', $es) or return;
2923   $self->brik_help_run_undef_arg('export_as_csv', $index) or return;
2924   $self->brik_help_run_undef_arg('export_as_csv', $size) or return;
2925
2926   return $self->export_as('csv', $index, $size, $cb);
2927}
2928
2929sub export_as_json {
2930   my $self = shift;
2931   my ($index, $size, $cb) = @_;
2932
2933   $size ||= 10_000;
2934   my $es = $self->_es;
2935   $self->brik_help_run_undef_arg('open', $es) or return;
2936   $self->brik_help_run_undef_arg('export_as_json', $index) or return;
2937   $self->brik_help_run_undef_arg('export_as_json', $size) or return;
2938
2939   return $self->export_as('json', $index, $size, $cb);
2940}
2941
2942#
2943# Optimization instructions:
2944# https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html
2945#
2946sub import_from {
2947   my $self = shift;
2948   my ($format, $input, $index, $type, $hash, $cb) = @_;
2949
2950   my $es = $self->_es;
2951   $self->brik_help_run_undef_arg('open', $es) or return;
2952   $self->brik_help_run_undef_arg('import_from', $format) or return;
2953   $self->brik_help_run_undef_arg('import_from', $input) or return;
2954   $self->brik_help_run_file_not_found('import_from', $input) or return;
2955
2956   if ($format ne 'csv' && $format ne 'json') {
2957      return $self->log->error("import_from: unsupported export format ".
2958         "[$format]");
2959   }
2960
2961   # If index and/or types are not defined, we try to get them from
2962   # input filename
2963   if (! defined($index) || ! defined($type)) {
2964      # Example: index-DATE:type.csv
2965      if ($input =~ m{^(.+):(.+)\.(?:csv|json)(?:.*)?$}) {
2966         my ($this_index, $this_type) = $input =~
2967            m{^(.+):(.+)\.(?:csv|json)(?:.*)?$};
2968         $index ||= $this_index;
2969         $type ||= $this_type;
2970      }
2971   }
2972
2973   # Verify it has not been indexed yet
2974   my $done = "$input.imported";
2975   if (-f $done) {
2976      $self->log->info("import_from: import already done for file ".
2977         "[$input]");
2978      return 0;
2979   }
2980
2981   # And default to Attributes if guess failed.
2982   $index ||= $self->index;
2983   $type ||= $self->type;
2984   $self->brik_help_set_undef_arg('index', $index) or return;
2985   $self->brik_help_set_undef_arg('type', $type) or return;
2986
2987   if ($index eq '*') {
2988      return $self->log->error("import_from: cannot import to invalid ".
2989         "index [$index]");
2990   }
2991   if ($type eq '*') {
2992      return $self->log->error("import_from: cannot import to invalid ".
2993         "type [$type]");
2994   }
2995
2996   $self->log->debug("input [$input]");
2997   $self->log->debug("index [$index]");
2998   $self->log->debug("type [$type]");
2999
3000   my $count_before = 0;
3001   if ($self->is_index_exists($index)) {
3002      $count_before = $self->count($index, $type);
3003      if (! defined($count_before)) {
3004         return;
3005      }
3006      $self->log->info("import_from: current index [$index] count is ".
3007         "[$count_before]");
3008   }
3009
3010   my $max = $self->max;
3011
3012   $self->open_bulk_mode($index, $type) or return;
3013
3014   $self->log->info("import_from: importing file [$input] to index ".
3015      "[$index] with type [$type]");
3016
3017   my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;
3018
3019   my $out;
3020   if ($format eq 'csv') {
3021      $out = Metabrik::File::Csv->new_from_brik_init($self) or return;
3022      $out->encoding($self->encoding);
3023      $out->separator(',');
3024      $out->escape('\\');
3025      $out->first_line_is_header(1);
3026      $out->encoded_fields($self->csv_encoded_fields);
3027      $out->object_fields($self->csv_object_fields);
3028   }
3029   elsif ($format eq 'json') {
3030      $out = Metabrik::File::Json->new_from_brik_init($self) or return;
3031      $out->encoding($self->encoding);
3032   }
3033
3034   my $refresh_interval;
3035   my $number_of_replicas;
3036   my $start = time();
3037   my $speed_settings = {};
3038   my $imported = 0;
3039   my $first = 1;
3040   my $read = 0;
3041   my $skipped_chunks = 0;
3042   my $start_time = time();
3043   while (my $this = $out->read_next($input)) {
3044      $read++;
3045
3046      my $h = {};
3047      my $id = $self->use_ignore_id ? undef : $this->{_id};
3048      delete $this->{_id};
3049      for my $k (keys %$this) {
3050         my $value = $this->{$k};
3051         # We keep only fields when they have a value.
3052         # No need to index data that is empty.
3053         if (defined($value) && length($value)) {
3054            $h->{$k} = $value;
3055         }
3056      }
3057
3058      if (defined($cb)) {
3059         $h = $cb->($h);
3060         if (! defined($h)) {
3061            $self->log->error("import_from: callback failed for ".
3062               "index [$index] at read [$read], skipping single entry");
3063            $skipped_chunks++;
3064            next;
3065         }
3066      }
3067
3068      # Set routing based on the provided field name, if any.
3069      my $this_hash;
3070      if (defined($hash) && defined($hash->{routing})
3071      &&  defined($h->{$hash->{routing}})) {
3072         $this_hash = { %$hash };  # Make a copy to avoid overwriting
3073                                   # user provided value.
3074         $this_hash->{routing} = $h->{$hash->{routing}};
3075      }
3076
3077      #$self->log->info(Data::Dumper::Dumper($h));
3078
3079      my $r;
3080      eval {
3081         $r = $self->index_bulk($h, $index, $type, $this_hash, $id);
3082      };
3083      if ($@) {
3084         chomp($@);
3085         $self->log->warning("import_from: error [$@]");
3086      }
3087      if (! defined($r)) {
3088         $self->log->error("import_from: bulk processing failed for ".
3089            "index [$index] at read [$read], skipping chunk");
3090         $skipped_chunks++;
3091         next;
3092      }
3093
3094      # Gather index settings, and set values for speed.
3095      # We don't do it earlier, cause we need index to be created,
3096      # and it should have been done from index_bulk Command.
3097      if ($first && $self->is_index_exists($index)) {
3098         # Save current values so we can restore them at the end of Command.
3099         # We ignore errors here, this is non-blocking for indexing.
3100         $refresh_interval = $self->get_index_refresh_interval($index);
3101         $refresh_interval = $refresh_interval->{$index};
3102         $number_of_replicas = $self->get_index_number_of_replicas($index);
3103         $number_of_replicas = $number_of_replicas->{$index};
3104         if ($self->use_indexing_optimizations) {
3105            $self->set_index_number_of_replicas($index, 0);
3106         }
3107         $self->set_index_refresh_interval($index, -1);
3108         $first = 0;
3109      }
3110
3111      # Log a status sometimes.
3112      if (! (++$imported % 100_000)) {
3113         my $now = time();
3114         $self->log->info("import_from: imported [$imported] entries in ".
3115            ($now - $start)." second(s) to index [$index]");
3116         $start = time();
3117      }
3118
3119      # Limit import to specified maximum
3120      if ($max > 0 && $imported >= $max) {
3121         $self->log->info("import_from: max import reached [$imported] for ".
3122            "index [$index], stopping");
3123         last;
3124      }
3125   }
3126
3127   $self->bulk_flush;
3128
3129   my $stop_time = time();
3130   my $duration = $stop_time - $start_time;
3131   my $eps = sprintf("%.02f", $imported / ($duration || 1)); # Avoid divide by zero error.
3132
3133   $self->refresh_index($index);
3134
3135   my $count_current = $self->count($index, $type) or return;
3136   $self->log->info("import_from: after index [$index] count is [$count_current] ".
3137      "at EPS [$eps]");
3138
3139   my $skipped = 0;
3140   my $complete = (($count_current - $count_before) == $read) ? 1 : 0;
3141   if ($complete) {  # If complete, import has been retried, and everything is now ok.
3142      $imported = $read;
3143   }
3144   else {
3145      $skipped = $read - ($count_current - $count_before);
3146   }
3147
3148   my $result = {
3149      read => $read,
3150      imported => $imported,
3151      skipped => $skipped,
3152      previous_count => $count_before,
3153      current_count => $count_current,
3154      complete => $complete,
3155      duration => $duration,
3156      eps => $eps,
3157   };
3158
3159   # Say the file has been processed, and put resulting stats.
3160   $fd->write($result, $done) or return;
3161
3162   # Restore previous settings, if any
3163   if (defined($refresh_interval)) {
3164      $self->set_index_refresh_interval($index, $refresh_interval);
3165   }
3166   if (defined($number_of_replicas) && $self->use_indexing_optimizations) {
3167      $self->set_index_number_of_replicas($index, $number_of_replicas);
3168   }
3169
3170   return $result;
3171}
3172
3173sub import_from_csv {
3174   my $self = shift;
3175   my ($input, $index, $type, $hash, $cb) = @_;
3176
3177   my $es = $self->_es;
3178   $self->brik_help_run_undef_arg('open', $es) or return;
3179   $self->brik_help_run_undef_arg('import_from_csv', $input) or return;
3180   $self->brik_help_run_file_not_found('import_from_csv', $input)
3181      or return;
3182
3183   return $self->import_from('csv', $input, $index, $type, $hash, $cb);
3184}
3185
3186sub import_from_json {
3187   my $self = shift;
3188   my ($input, $index, $type, $hash, $cb) = @_;
3189
3190   my $es = $self->_es;
3191   $self->brik_help_run_undef_arg('open', $es) or return;
3192   $self->brik_help_run_undef_arg('import_from_json', $input) or return;
3193   $self->brik_help_run_file_not_found('import_from_json', $input)
3194      or return;
3195
3196   return $self->import_from('json', $input, $index, $type, $hash, $cb);
3197}
3198
3199#
3200# Same as import_from_csv Command but in worker mode for speed.
3201#
3202sub import_from_csv_worker {
3203   my $self = shift;
3204   my ($input_csv, $index, $type, $hash, $cb) = @_;
3205
3206   my $es = $self->_es;
3207   $self->brik_help_run_undef_arg('open', $es) or return;
3208   $self->brik_help_run_undef_arg('import_from_csv_worker', $input_csv)
3209      or return;
3210   $self->brik_help_run_file_not_found('import_from_csv_worker', $input_csv)
3211      or return;
3212
3213   # If index and/or types are not defined, we try to get them from input filename
3214   if (! defined($index) || ! defined($type)) {
3215      # Example: index-DATE:type.csv
3216      if ($input_csv =~ m{^(.+):(.+)\.csv(?:.*)?$}) {
3217         my ($this_index, $this_type) = $input_csv =~ m{^(.+):(.+)\.csv(?:.*)?$};
3218         $index ||= $this_index;
3219         $type ||= $this_type;
3220      }
3221   }
3222
3223   # Verify it has not been indexed yet
3224   my $done = "$input_csv.imported";
3225   if (-f $done) {
3226      $self->log->info("import_from_csv_worker: import already done for ".
3227         "file [$input_csv]");
3228      return 0;
3229   }
3230
3231   # And default to Attributes if guess failed.
3232   $index ||= $self->index;
3233   $type ||= $self->type;
3234   $self->brik_help_set_undef_arg('index', $index) or return;
3235   $self->brik_help_set_undef_arg('type', $type) or return;
3236
3237   if ($index eq '*') {
3238      return $self->log->error("import_from_csv_worker: cannot import to invalid ".
3239         "index [$index]");
3240   }
3241   if ($type eq '*') {
3242      return $self->log->error("import_from_csv_worker: cannot import to invalid ".
3243         "type [$type]");
3244   }
3245
3246   $self->log->debug("input [$input_csv]");
3247   $self->log->debug("index [$index]");
3248   $self->log->debug("type [$type]");
3249
3250   my $count_before = 0;
3251   if ($self->is_index_exists($index)) {
3252      $count_before = $self->count($index, $type);
3253      if (! defined($count_before)) {
3254         return;
3255      }
3256      $self->log->info("import_from_csv_worker: current index [$index] count is ".
3257         "[$count_before]");
3258   }
3259
3260   my $max = $self->max;
3261
3262   $self->open_bulk_mode($index, $type) or return;
3263
3264   #my $batch = undef;
3265   my $batch = 10_000;
3266
3267   $self->log->info("import_from_csv_worker: importing file [$input_csv] to ".
3268      "index [$index] with type [$type] and batch [$batch]");
3269
3270   my $fd = Metabrik::File::Dump->new_from_brik_init($self) or return;
3271
3272   my $fc = Metabrik::File::Csv->new_from_brik_init($self) or return;
3273   $fc->separator(',');
3274   $fc->escape('\\');
3275   $fc->first_line_is_header(1);
3276   $fc->encoded_fields($self->csv_encoded_fields);
3277   $fc->object_fields($self->csv_object_fields);
3278
3279   my $wp = Metabrik::Worker::Parallel->new_from_brik_init($self) or return;
3280   $wp->pool_size(2);
3281
3282   $wp->create_manager or return;
3283
3284   my $refresh_interval;
3285   my $number_of_replicas;
3286   my $start = time();
3287   my $speed_settings = {};
3288   my $imported = 0;
3289   my $first = 1;
3290   my $read = 0;
3291   my $skipped_chunks = 0;
3292   my $start_time = time();
3293   while (my $list = $fc->read_next($input_csv, $batch)) {
3294
3295      $wp->start(sub {
3296         my @list = ();
3297         for my $this (@$list) {
3298            $read++;
3299
3300            my $h = {};
3301            my $id = $this->{_id};
3302            delete $this->{_id};
3303            for my $k (keys %$this) {
3304               my $value = $this->{$k};
3305               # We keep only fields when they have a value.
3306               # No need to index data that is empty.
3307               if (defined($value) && length($value)) {
3308                  $h->{$k} = $value;
3309               }
3310            }
3311
3312            if (defined($cb)) {
3313               $h = $cb->($h);
3314               if (! defined($h)) {
3315                  $self->log->error("import_from_csv_worker: callback failed for ".
3316                     "index [$index] at read [$read], skipping single entry");
3317                  $skipped_chunks++;
3318                  next;
3319               }
3320            }
3321
3322            push @list, $h;
3323         }
3324
3325         my $r;
3326         eval {
3327            $r = $self->index_bulk_from_list(\@list, $index, $type, $hash);
3328         };
3329         if ($@) {
3330            chomp($@);
3331            $self->log->warning("import_from_csv_worker: error [$@]");
3332         }
3333         if (! defined($r)) {
3334            $self->log->error("import_from_csv_worker: bulk processing failed for ".
3335               "index [$index] at read [$read], skipping chunk");
3336            $skipped_chunks++;
3337            next;
3338         }
3339
3340         # Log a status sometimes.
3341         if (! ($imported % 10_000)) {
3342            my $now = time();
3343            my $diff = sprintf("%.02f", $now - $start);
3344            my $eps = sprintf("%.02f", $imported / $diff);
3345            $self->log->info("import_from_csv_worker: imported [$imported] entries ".
3346               "in [$diff] second(s) to index [$index] at EPS [$eps]");
3347            $start = time();
3348         }
3349
3350         exit(0);
3351      });
3352
3353      # Gather index settings, and set values for speed.
3354      # We don't do it earlier, cause we need index to be created,
3355      # and it should have been done from index_bulk Command.
3356      if ($first && $self->is_index_exists($index)) {
3357         # Save current values so we can restore them at the end of Command.
3358         # We ignore errors here, this is non-blocking for indexing.
3359         $refresh_interval = $self->get_index_refresh_interval($index);
3360         $refresh_interval = $refresh_interval->{$index};
3361         $number_of_replicas = $self->get_index_number_of_replicas($index);
3362         $number_of_replicas = $number_of_replicas->{$index};
3363         if ($self->use_indexing_optimizations) {
3364            $self->set_index_number_of_replicas($index, 0);
3365         }
3366         $self->set_index_refresh_interval($index, -1);
3367         $first = 0;
3368      }
3369
3370      # Log a status sometimes.
3371      #$imported += @$list;
3372      #if (! ($imported % 10_000)) {
3373         #my $now = time();
3374         #my $diff = sprintf("%.02f", $now - $start);
3375         #my $eps = sprintf("%.02f", 10_000 / $diff);
3376         #$self->log->info("import_from_csv_worker: imported [$imported] entries ".
3377            #"in [$diff] second(s) to index [$index] at EPS [$eps]");
3378         #$start = time();
3379      #}
3380
3381      # Limit import to specified maximum
3382      if ($max > 0 && $imported >= $max) {
3383         $self->log->info("import_from_csv_worker: max import reached [$imported] for ".
3384            "index [$index], stopping");
3385         last;
3386      }
3387
3388      last if (@$list < $batch);
3389
3390      $imported += @$list;
3391   }
3392
3393   $wp->stop;
3394
3395   $self->bulk_flush;
3396
3397   my $stop_time = time();
3398   my $duration = $stop_time - $start_time;
3399   my $eps = sprintf("%.02f", $imported / ($duration || 1)); # Avoid divide by zero error.
3400
3401   $self->refresh_index($index);
3402
3403   my $count_current = $self->count($index, $type) or return;
3404   $self->log->info("import_from_csv_worker: after index [$index] count ".
3405      "is [$count_current] at EPS [$eps]");
3406
3407   my $skipped = 0;
3408   my $complete = (($count_current - $count_before) == $read) ? 1 : 0;
3409   if ($complete) {  # If complete, import has been retried, and everything is now ok.
3410      $imported = $read;
3411   }
3412   else {
3413      $skipped = $read - ($count_current - $count_before);
3414   }
3415
3416   my $result = {
3417      read => $read,
3418      imported => $imported,
3419      skipped => $skipped,
3420      previous_count => $count_before,
3421      current_count => $count_current,
3422      complete => $complete,
3423      duration => $duration,
3424      eps => $eps,
3425   };
3426
3427   # Say the file has been processed, and put resulting stats.
3428   $fd->write($result, $done) or return;
3429
3430   # Restore previous settings, if any
3431   if (defined($refresh_interval)) {
3432      $self->set_index_refresh_interval($index, $refresh_interval);
3433   }
3434   if (defined($number_of_replicas) && $self->use_indexing_optimizations) {
3435      $self->set_index_number_of_replicas($index, $number_of_replicas);
3436   }
3437
3438   return $result;
3439}
3440
3441#
3442# http://localhost:9200/_nodes/stats/process?pretty
3443#
3444# Search::Elasticsearch::Client::2_0::Direct::Nodes
3445#
3446sub get_stats_process {
3447   my $self = shift;
3448
3449   my $es = $self->_es;
3450   $self->brik_help_run_undef_arg('open', $es) or return;
3451
3452   my $r;
3453   eval {
3454      $r = $es->nodes->stats(
3455         metric => [ qw(process) ],
3456      );
3457   };
3458   if ($@) {
3459      chomp($@);
3460      return $self->log->error("get_stats_process: failed: [$@]");
3461   }
3462
3463   return $r;
3464}
3465
3466#
3467# curl http://localhost:9200/_nodes/process?pretty
3468#
3469# Search::Elasticsearch::Client::2_0::Direct::Nodes
3470#
3471sub get_process {
3472   my $self = shift;
3473
3474   my $es = $self->_es;
3475   $self->brik_help_run_undef_arg('open', $es) or return;
3476
3477   my $r;
3478   eval {
3479      $r = $es->nodes->info(
3480         metric => [ qw(process) ],
3481      );
3482   };
3483   if ($@) {
3484      chomp($@);
3485      return $self->log->error("get_process: failed: [$@]");
3486   }
3487
3488   return $r;
3489}
3490
3491#
3492# Search::Elasticsearch::Client::2_0::Direct::Cluster
3493#
3494sub get_cluster_state {
3495   my $self = shift;
3496
3497   my $es = $self->_es;
3498   $self->brik_help_run_undef_arg('open', $es) or return;
3499
3500   my $r;
3501   eval {
3502      $r = $es->cluster->state;
3503   };
3504   if ($@) {
3505      chomp($@);
3506      return $self->log->error("get_cluster_state: failed: [$@]");
3507   }
3508
3509   return $r;
3510}
3511
3512#
3513# Search::Elasticsearch::Client::2_0::Direct::Cluster
3514#
3515sub get_cluster_health {
3516   my $self = shift;
3517
3518   my $es = $self->_es;
3519   $self->brik_help_run_undef_arg('open', $es) or return;
3520
3521   my $r;
3522   eval {
3523      $r = $es->cluster->health;
3524   };
3525   if ($@) {
3526      chomp($@);
3527      return $self->log->error("get_cluster_health: failed: [$@]");
3528   }
3529
3530   return $r;
3531}
3532
3533#
3534# Search::Elasticsearch::Client::2_0::Direct::Cluster
3535#
3536sub get_cluster_settings {
3537   my $self = shift;
3538
3539   my $es = $self->_es;
3540   $self->brik_help_run_undef_arg('open', $es) or return;
3541
3542   my $r;
3543   eval {
3544      $r = $es->cluster->get_settings;
3545   };
3546   if ($@) {
3547      chomp($@);
3548      return $self->log->error("get_cluster_settings: failed: [$@]");
3549   }
3550
3551   return $r;
3552}
3553
3554#
3555# Search::Elasticsearch::Client::2_0::Direct::Cluster
3556#
3557sub put_cluster_settings {
3558   my $self = shift;
3559   my ($settings) = @_;
3560
3561   my $es = $self->_es;
3562   $self->brik_help_run_undef_arg('open', $es) or return;
3563   $self->brik_help_run_undef_arg('put_cluster_settings', $settings) or return;
3564   $self->brik_help_run_invalid_arg('put_cluster_settings', $settings, 'HASH') or return;
3565
3566   my %args = (
3567      body => $settings,
3568   );
3569
3570   my $r;
3571   eval {
3572      $r = $es->cluster->put_settings(%args);
3573   };
3574   if ($@) {
3575      chomp($@);
3576      return $self->log->error("put_cluster_settings: failed: [$@]");
3577   }
3578
3579   return $r;
3580}
3581
3582sub count_green_indices {
3583   my $self = shift;
3584
3585   my $get = $self->show_indices or return;
3586
3587   my $count = 0;
3588   for (@$get) {
3589      if (/^\s*green\s+/) {
3590         $count++;
3591      }
3592   }
3593
3594   return $count;
3595}
3596
3597sub count_yellow_indices {
3598   my $self = shift;
3599
3600   my $get = $self->show_indices or return;
3601
3602   my $count = 0;
3603   for (@$get) {
3604      if (/^\s*yellow\s+/) {
3605         $count++;
3606      }
3607   }
3608
3609   return $count;
3610}
3611
3612sub count_red_indices {
3613   my $self = shift;
3614
3615   my $get = $self->show_indices or return;
3616
3617   my $count = 0;
3618   for (@$get) {
3619      if (/^\s*red\s+/) {
3620         $count++;
3621      }
3622   }
3623
3624   return $count;
3625}
3626
3627sub count_indices {
3628   my $self = shift;
3629
3630   my $get = $self->show_indices or return;
3631
3632   return scalar @$get;
3633}
3634
3635sub list_indices_status {
3636   my $self = shift;
3637
3638   my $get = $self->show_indices or return;
3639
3640   my $count_red = 0;
3641   my $count_yellow = 0;
3642   my $count_green = 0;
3643   for (@$get) {
3644      if (/^\s*red\s+/) {
3645         $count_red++;
3646      }
3647      elsif (/^\s*yellow\s+/) {
3648         $count_yellow++;
3649      }
3650      elsif (/^\s*green\s+/) {
3651         $count_green++;
3652      }
3653   }
3654
3655   return {
3656      red => $count_red,
3657      yellow => $count_yellow,
3658      green => $count_green,
3659   };
3660}
3661
3662sub count_shards {
3663   my $self = shift;
3664
3665   my $indices = $self->get_indices or return;
3666
3667   my $count = 0;
3668   for (@$indices) {
3669      $count += $_->{shards};
3670   }
3671
3672   return $count;
3673}
3674
3675sub count_size {
3676   my $self = shift;
3677   my ($string) = @_;
3678
3679   my $indices = $self->get_indices($string) or return;
3680
3681   my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
3682   $fn->kibi_suffix("kb");
3683   $fn->mebi_suffix("mb");
3684   $fn->gibi_suffix("gb");
3685   $fn->kilo_suffix("KB");
3686   $fn->mega_suffix("MB");
3687   $fn->giga_suffix("GB");
3688
3689   my $size = 0;
3690   for (@$indices) {
3691      $size += $fn->to_number($_->{size});
3692   }
3693
3694   return $fn->from_number($size);
3695}
3696
3697sub count_total_size {
3698   my $self = shift;
3699   my ($string) = @_;
3700
3701   my $indices = $self->get_indices($string) or return;
3702
3703   my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
3704   $fn->kibi_suffix("kb");
3705   $fn->mebi_suffix("mb");
3706   $fn->gibi_suffix("gb");
3707   $fn->kilo_suffix("KB");
3708   $fn->mega_suffix("MB");
3709   $fn->giga_suffix("GB");
3710
3711   my $size = 0;
3712   for (@$indices) {
3713      $size += $fn->to_number($_->{total_size});
3714   }
3715
3716   return $fn->from_number($size);
3717}
3718
3719sub count_count {
3720   my $self = shift;
3721
3722   my $indices = $self->get_indices or return;
3723
3724   my $fn = Metabrik::Format::Number->new_from_brik_init($self) or return;
3725   $fn->kilo_suffix('k');
3726   $fn->mega_suffix('m');
3727   $fn->giga_suffix('M');
3728
3729   my $count = 0;
3730   for (@$indices) {
3731      $count += $_->{count};
3732   }
3733
3734   return $fn->from_number($count);
3735}
3736
3737sub list_green_indices {
3738   my $self = shift;
3739
3740   my $get = $self->get_indices or return;
3741
3742   my @indices = ();
3743   for (@$get) {
3744      if ($_->{color} eq 'green') {
3745         push @indices, $_->{index};
3746      }
3747   }
3748
3749   return \@indices;
3750}
3751
3752sub list_yellow_indices {
3753   my $self = shift;
3754
3755   my $get = $self->get_indices or return;
3756
3757   my @indices = ();
3758   for (@$get) {
3759      if ($_->{color} eq 'yellow') {
3760         push @indices, $_->{index};
3761      }
3762   }
3763
3764   return \@indices;
3765}
3766
3767sub list_red_indices {
3768   my $self = shift;
3769
3770   my $get = $self->get_indices or return;
3771
3772   my @indices = ();
3773   for (@$get) {
3774      if ($_->{color} eq 'red') {
3775         push @indices, $_->{index};
3776      }
3777   }
3778
3779   return \@indices;
3780}
3781
3782#
3783# https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html
3784#
3785sub list_datatypes {
3786   my $self = shift;
3787
3788   return {
3789      core => [ qw(string long integer short byte double float data boolean binary) ],
3790   };
3791}
3792
3793#
3794# Return total hits for last www_search
3795#
3796sub get_hits_total {
3797   my $self = shift;
3798   my ($run) = @_;
3799
3800   $self->brik_help_run_undef_arg('get_hits_total', $run) or return;
3801
3802   if (ref($run) eq 'HASH') {
3803      if (exists($run->{hits}) && exists($run->{hits}{total})) {
3804         return $run->{hits}{total};
3805      }
3806   }
3807
3808   return $self->log->error("get_hits_total: last Command not compatible");
3809}
3810
3811sub disable_shard_allocation {
3812   my $self = shift;
3813
3814   my $settings = {
3815      persistent => {
3816         'cluster.routing.allocation.enable' => 'none',
3817      }
3818   };
3819
3820   return $self->put_cluster_settings($settings);
3821}
3822
3823sub enable_shard_allocation {
3824   my $self = shift;
3825
3826   my $settings = {
3827      persistent => { 
3828         'cluster.routing.allocation.enable' => 'all',
3829      }
3830   };
3831
3832   return $self->put_cluster_settings($settings);
3833}
3834
3835sub flush_synced {
3836   my $self = shift;
3837
3838   my $es = $self->_es;
3839   $self->brik_help_run_undef_arg('open', $es) or return;
3840
3841   my $r;
3842   eval {
3843      $r = $es->indices->flush_synced;
3844   };
3845   if ($@) {
3846      chomp($@);
3847      return $self->log->error("flush_synced: failed: [$@]");
3848   }
3849
3850   return $r;
3851}
3852
3853#
3854# https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html
3855#
3856# run client::elasticsearch create_snapshot_repository myrepo
3857#      "{ type => 'fs', settings => { compress => 'true', location => '/path/' } }"
3858#
3859# You have to set path.repo in elasticsearch.yml like:
3860# path.repo: ["/home/gomor/es-backups"]
3861#
3862# Search::Elasticsearch::Client::2_0::Direct::Snapshot
3863#
3864sub create_snapshot_repository {
3865   my $self = shift;
3866   my ($body, $repository_name) = @_;
3867
3868   my $es = $self->_es;
3869   $self->brik_help_run_undef_arg('open', $es) or return;
3870   $self->brik_help_run_undef_arg('create_snapshot_repository', $body) or return;
3871
3872   $repository_name ||= 'repository';
3873
3874   my %args = (
3875      repository => $repository_name,
3876      body => $body,
3877   );
3878
3879   my $r;
3880   eval {
3881      $r = $es->snapshot->create_repository(%args);
3882   };
3883   if ($@) {
3884      chomp($@);
3885      return $self->log->error("create_snapshot_repository: failed: [$@]");
3886   }
3887
3888   return $r;
3889}
3890
3891sub create_shared_fs_snapshot_repository {
3892   my $self = shift;
3893   my ($location, $repository_name) = @_;
3894
3895   $repository_name ||= 'repository';
3896   $self->brik_help_run_undef_arg('create_shared_fs_snapshot_repository', $location) or return;
3897
3898   if ($location !~ m{^/}) {
3899      return $self->log->error("create_shared_fs_snapshot_repository: you have to give ".
3900         "a full directory path, this one is invalid [$location]");
3901   }
3902
3903   my $body = {
3904      type => 'fs',
3905      settings => {
3906         compress => 'true',
3907         location => $location,
3908      },
3909   };
3910
3911   return $self->create_snapshot_repository($body, $repository_name);
3912}
3913
3914#
3915# Search::Elasticsearch::Client::2_0::Direct::Snapshot
3916#
3917sub get_snapshot_repositories {
3918   my $self = shift;
3919
3920   my $es = $self->_es;
3921   $self->brik_help_run_undef_arg('open', $es) or return;
3922
3923   my $r;
3924   eval {
3925      $r = $es->snapshot->get_repository;
3926   };
3927   if ($@) {
3928      chomp($@);
3929      return $self->log->error("get_snapshot_repositories: failed: [$@]");
3930   }
3931
3932   return $r;
3933}
3934
3935#
3936# Search::Elasticsearch::Client::2_0::Direct::Snapshot
3937#
3938sub get_snapshot_status {
3939   my $self = shift;
3940
3941   my $es = $self->_es;
3942   $self->brik_help_run_undef_arg('open', $es) or return;
3943
3944   my $r;
3945   eval {
3946      $r = $es->snapshot->status;
3947   };
3948   if ($@) {
3949      chomp($@);
3950      return $self->log->error("get_snapshot_status: failed: [$@]");
3951   }
3952
3953   return $r;
3954}
3955
3956#
3957# Search::Elasticsearch::Client::5_0::Direct::Snapshot
3958#
3959sub create_snapshot {
3960   my $self = shift;
3961   my ($snapshot_name, $repository_name, $body) = @_;
3962
3963   my $es = $self->_es;
3964   $self->brik_help_run_undef_arg('open', $es) or return;
3965
3966   $snapshot_name ||= 'snapshot';
3967   $repository_name ||= 'repository';
3968
3969   my %args = (
3970      repository => $repository_name,
3971      snapshot => $snapshot_name,
3972   );
3973   if (defined($body)) {
3974      $args{body} = $body;
3975   }
3976
3977   my $r;
3978   eval {
3979      $r = $es->snapshot->create(%args);
3980   };
3981   if ($@) {
3982      chomp($@);
3983      return $self->log->error("create_snapshot: failed: [$@]");
3984   }
3985
3986   return $r;
3987}
3988
3989sub create_snapshot_for_indices {
3990   my $self = shift;
3991   my ($indices, $snapshot_name, $repository_name) = @_;
3992
3993   $self->brik_help_run_undef_arg('create_snapshot_for_indices', $indices) or return;
3994
3995   $snapshot_name ||= 'snapshot';
3996   $repository_name ||= 'repository';
3997
3998   my $body = {
3999      indices => $indices,
4000   };
4001
4002   return $self->create_snapshot($snapshot_name, $repository_name, $body);
4003}
4004
4005sub is_snapshot_finished {
4006   my $self = shift;
4007
4008   my $status = $self->get_snapshot_status or return;
4009
4010   if (@{$status->{snapshots}} == 0) {
4011      return 1;
4012   }
4013
4014   return 0;
4015}
4016
4017sub get_snapshot_state {
4018   my $self = shift;
4019
4020   if ($self->is_snapshot_finished) {
4021      return $self->log->info("get_snapshot_state: is already finished");
4022   }
4023
4024   my $status = $self->get_snapshot_status or return;
4025
4026   my @indices_done = ();
4027   my @indices_not_done = ();
4028
4029   my $list = $status->{snapshots};
4030   for my $snapshot (@$list) {
4031      my $indices = $snapshot->{indices};
4032      for my $index (@$indices) {
4033         my $done = $index->{shards_stats}{done};
4034         if ($done) {
4035            push @indices_done, $index;
4036         }
4037         else {
4038            push @indices_not_done, $index;
4039         }
4040      }
4041   }
4042
4043   return { done => \@indices_done, not_done => \@indices_not_done };
4044}
4045
4046sub verify_snapshot_repository {
4047}
4048
4049sub delete_snapshot_repository {
4050   my $self = shift;
4051   my ($repository_name) = @_;
4052
4053   my $es = $self->_es;
4054   $self->brik_help_run_undef_arg('open', $es) or return;
4055   $self->brik_help_run_undef_arg('delete_snapshot_repository', $repository_name) or return;
4056
4057   my $r;
4058   eval {
4059      $r = $es->snapshot->delete_repository(
4060         repository => $repository_name,
4061      );
4062   };
4063   if ($@) {
4064      chomp($@);
4065      return $self->log->error("delete_snapshot_repository: failed: [$@]");
4066   }
4067
4068   return $r;
4069}
4070
4071sub get_snapshot {
4072   my $self = shift;
4073   my ($snapshot_name, $repository_name) = @_;
4074
4075   my $es = $self->_es;
4076   $self->brik_help_run_undef_arg('open', $es) or return;
4077
4078   $snapshot_name ||= 'snapshot';
4079   $repository_name ||= 'repository';
4080
4081   my $r;
4082   eval {
4083      $r = $es->snapshot->get(
4084         repository => $repository_name,
4085         snapshot => $snapshot_name,
4086      );
4087   };
4088   if ($@) {
4089      chomp($@);
4090      return $self->log->error("get_snapshot: failed: [$@]");
4091   }
4092
4093   return $r;
4094}
4095
4096#
4097# Search::Elasticsearch::Client::5_0::Direct::Snapshot
4098#
4099sub delete_snapshot {
4100   my $self = shift;
4101   my ($snapshot_name, $repository_name) = @_;
4102
4103   my $es = $self->_es;
4104   $self->brik_help_run_undef_arg('open', $es) or return;
4105   $self->brik_help_run_undef_arg('delete_snapshot', $snapshot_name) or return;
4106   $self->brik_help_run_undef_arg('delete_snapshot', $repository_name) or return;
4107
4108   my $timeout = $self->rtimeout;
4109
4110   my $r;
4111   eval {
4112      $r = $es->snapshot->delete(
4113         repository => $repository_name,
4114         snapshot => $snapshot_name,
4115         master_timeout => "${timeout}s",
4116      );
4117   };
4118   if ($@) {
4119      chomp($@);
4120      return $self->log->error("delete_snapshot: failed: [$@]");
4121   }
4122
4123   return $r;
4124}
4125
4126#
4127# https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html
4128#
4129sub restore_snapshot {
4130   my $self = shift;
4131   my ($snapshot_name, $repository_name, $body) = @_;
4132
4133   my $es = $self->_es;
4134   $snapshot_name ||= 'snapshot';
4135   $repository_name ||= 'repository';
4136   $self->brik_help_run_undef_arg('open', $es) or return;
4137   $self->brik_help_run_undef_arg('restore_snapshot', $snapshot_name) or return;
4138   $self->brik_help_run_undef_arg('restore_snapshot', $repository_name) or return;
4139
4140   my %args = (
4141      repository => $repository_name,
4142      snapshot => $snapshot_name,
4143   );
4144   if (defined($body)) {
4145      $args{body} = $body;
4146   }
4147
4148   my $r;
4149   eval {
4150      $r = $es->snapshot->restore(%args);
4151   };
4152   if ($@) {
4153      chomp($@);
4154      return $self->log->error("restore_snapshot: failed: [$@]");
4155   }
4156
4157   return $r;
4158}
4159
4160sub restore_snapshot_for_indices {
4161   my $self = shift;
4162   my ($indices, $snapshot_name, $repository_name) = @_;
4163
4164   $snapshot_name ||= 'snapshot';
4165   $repository_name ||= 'repository';
4166   $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $indices) or return;
4167   $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $snapshot_name) or return;
4168   $self->brik_help_run_undef_arg('restore_snapshot_for_indices', $repository_name) or return;
4169
4170   my $body = {
4171      indices => $indices,
4172   };
4173
4174   return $self->restore_snapshot($snapshot_name, $repository_name, $body);
4175}
4176
4177# shard occupation
4178#
4179# curl -XGET "http://127.0.0.1:9200/_cat/shards?v
4180# Or https://www.elastic.co/guide/en/elasticsearch/reference/1.6/cluster-nodes-stats.html
4181#
4182# disk occuption:
4183# curl -XGET http://127.0.0.1:9200/_cat/nodes?h=ip,h,diskAvail,diskTotal
4184#
4185#
4186# Who is master: curl -XGET http://127.0.0.1:9200/_cat/master?v
4187#
4188
4189# Check memory lock
4190
4191# curl -XGET 'localhost:9200/_nodes?filter_path=**.mlockall&pretty'
4192# {
4193#  "nodes" : {
4194#    "3XXX" : {
4195#      "process" : {
4196#        "mlockall" : true
4197#      }
4198#    }
4199#  }
4200# }
4201
42021;
4203
4204__END__
4205
4206=head1 NAME
4207
4208Metabrik::Client::Elasticsearch - client::elasticsearch Brik
4209
4210=head1 SYNOPSIS
4211
4212   host:~> my $q = { term => { ip => "192.168.57.19" } }
4213   host:~> run client::elasticsearch open
4214   host:~> run client::elasticsearch query $q data-*
4215
4216=head1 DESCRIPTION
4217
4218Template to write a new Metabrik Brik.
4219
4220=head1 COPYRIGHT AND LICENSE
4221
4222Copyright (c) 2014-2019, Patrice E<lt>GomoRE<gt> Auffret
4223
4224You may distribute this module under the terms of The BSD 3-Clause License.
4225See LICENSE file in the source distribution archive.
4226
4227=head1 AUTHOR
4228
4229Patrice E<lt>GomoRE<gt> Auffret
4230
4231=cut
Note: See TracBrowser for help on using the repository browser.